Logstash from scratch: parsing new messages

Jelmer de Reus
ioshark.net
Published in
8 min readAug 2, 2017

--

I want to share my experience of working with Elasticsearch, Logstash and Kibana. In this article you will learn how to parse and enrich log data and get an understanding of the place that Logstash takes in the ELK stack.

I have been setting up and managing Logstash, Elasticsearch and Kibana for an ISP and a University. I had to work with this software to troubleshoot network equipment and analyze network behavior for security purposes. In these years I developed a taste for discovering new ways in which to enrich and use data and I learned what is important about working with logs in the real world.

Installing Logstash or the entire ELK stack is fairly straightforward. There are other resources to help you do that. This article is focused on the configuration of Logstash, which resides in /etc/logstash/ . In this example it is /etc/logstash/conf.d/base.conf .

While it is easy to setup a configuration to transmit everything to Elasticsearch and make it show up in Kibana, this is usually not what you want. Logstash is also there to parse text and enrich data when certain conditions are met. You can influence this process in great detail and that’s a big benefit of Logstash.

More about logs

Generally, in an IT infrastructure, most logs are generated by network equipment. They continuously transmit information about events. These are messages generated when things happen like a network topology change, traffic is being blocked or allowed, interfaces are going up or down and when systems fail or reboot. There can be millions of them every day.

Devices from the same manufacturer often generate similarly structured logs and we will use this to our advantage. I strongly advise to SSH in and take a look at logs that are stored locally on servers and network equipment you manage. Usually you can learn a lot from just a few hundred lines

Input configuration

File input

Usually, logs are persisted to disk before analyzed. Messages have a greater chance of surviving this way in case of trouble like a Logstash server failing to transmit data to another system.

In this example, folder /mylogs contains a folder structure of messages stored in files per day and month of the year. e.g. /mylogs/2017/07/30/messages.log

One way to do this is by rotating daily in your rsyslog configuration. Logstash will read these files as input.

Network (syslog) input

When logs are received on the network you can use the syslog input plugin to specify the interface and port to listen on. The default is “0.0.0.0” and port UDP/514.

The reason for adding a type “syslog” is that this messages is now tagged as a syslog message. You can also use if-statements matching on a type when handling different types of data with Logstash. When you work on the filter and output section of the configuration, you probably want to handles these types differently. An example is working with time-series data like netflow.

Filter configuration

In the filter section of the configuration we will specify what to do with different logs. At this point you need to know what your logs look like. A machine with Elasticsearch, Logstash and Kibana installed generates Linux and ELK related logs. When it receives logs from other devices it needs to be able to parse these logs as well. I will show how you can make a configuration that handles different logs in their own way.

We will be looking at logs that are generated by a Cisco Meraki router. This is a popular router for retailers and small branch offices. It has some security features like an ‘intrusion detection system’ which can generate useful data for analysis — and action!

Grok?

Grok is a filter plugin designed to work with regular expressions to pick out objects from lines of text. Things like the time, the event severity, an IP address or somebody’s username.

In the filter you can specify patterns, match them against certain log messages and thereby translate a line of text into an object with fields that Elasticsearch can index. The filter also allows you to define additional fields and tags when something is matched using the mutate filter plugin. This can be beneficial for both the added information and the way you can manipulate the control flow of Logstash.

Making patterns

In order to make patterns, I usually grab a few hundred lines of logs from a certain device and look what they have in common. You will see that the left side of most of the logs is similar.

This is a typical log mesage from a Cisco Meraki router

Oct 11 12:06:08 172.29.250.65 1476180389.257544558 MX64_RT_01 ids-alerts signature=1:26355:11 priority=1 timestamp=1476180388.532645 dhost=AA:BB:CC:00:11:22 direction=ingress protocol=tcp/ip src=192.2.0.2:80 dst=10.20.30.2:55130 message: BROWSER-PLUGINS Microsoft Windows RDP ActiveX component mstscax use after free attempt

Meraki logs seem to have this part in common:

Oct 11 12:06:08 172.29.250.65 1476180389.257544558 MX64_RT_01 <type>

All of these logs contain the transmit time, the Meraki’s IP address, a timestamp of the event and a hostname in the left part. What’s more, they indicate what the type of this log is: ids-alerts. In order to recognise these messages as Cisco Meraki logs, we can use the hostname which contains MX64 or MX84. Note that it depends on the people whether naming is consistent! There's other ways to organize this.

Using this knowledge, you can make a pattern to parse all messages’ left half first, to store the type of log received in a new field <log_type>, and to save the rest of the message in a field <contents> to do further parsing based on its type. Finally we can store the type of this device in a new field.

The Grok Debugger is a great tool to help you design patterns. You can put a log message in the input, define a pattern and see results updating immediately.

On the same website you can find patterns (go to grok-patterns) described by regex patterns and with common names that you can match to text using the ${pattern name:field name} syntax in which you can optionally specify a name of a new field. ${GREEDYDATA} matches anything so we’ll use that first and name it contents:

The trick is to try to match the contents on the left side of the ${GREEDYDATA} which matches anything. Then the new fields will show up in the output window.

On the left we have a time and an IP address. It turns out you can use the known patterns ${SYSLOGTIMESTAMP} and ${IPORHOST} to match on them.

Note that the contents field now starts with a space. Include a space in your next pattern as well. Using the grok debugger is enormously helpful, but you need to do it a few times to get the hang of it.

Normally, it is better not to try and match the entire message. You parse the parts that messages have in common, and store the rest in contents. Then you can make new matches on that field based on what you know, like the device type and the log type like ids-alerts.

Apart from the given patterns, you can use regex patterns to match strings that do not match on known Grok patterns. In this example ids-alerts (or another log type) is matched by (?<log_type>[a-zA-Z0–9\-]+)

Once you have a pattern matching everything up to and including ids-alerts, a minimal filter configuration that uses this knowledge will look like the following:

The field contents will now contain a specific message that you can parse based on the new field [log_type]. But what types are there?

If you have look at Cisco Meraki logs, you'll know that there's only a couple of options in this field:

  • ids-alerts (detected anomalies or misbehaving systems)
  • flows (network traffic: allowed or denied)
  • events (generic)

The latter has a lot of variation:

  • DHCP offers / releases
  • routing table updates
  • VPN connections
  • failover events

As you might guess, you will make additional patterns for each of these types and match them.

When you have patterns for these types of logs, you can do more specific matching with a few if-statements on the fields [log_type] and [contents]:

Now you understand how you can match more specific logs using some generic fields in a message, it is time for a more complete example.

In the following example configuration you can see how specific patterns are matched on [contents] and how the mutate filter plugin is used to enrich event data with things like priorities and to fix inconsistencies that we know about.

Usually, not many people are familiar with this subject. It takes some time but it is very feasible to parse your own logs when you take some time to look at the things logs have in common. I some readers are a little more confident now to make a practical Logstash configuration.

Output

When you have matched and stored most of the valuable information in new fields, we need to send it to Elasticsearch or another destination. In this example we will send the output of our processed logs to an Elasticsearch instance on the local machine.

Note that a template is used for the indexing of the “syslog” document type. Normally, when you use Grok or add new fields or tags, Elasticsearch will add new fields to its index dynamically. Why make a template?

Indexes are usually created on a daily basis. the index of the next day might not have certain fields depending on the occurrence of certain events that generated these messages. But for visualisations to work in Kibana, you need to have certain fields in every index.

A template contains mapping types and enables us to have fields in an Elasticsearch index as soon as it is created. More on templates here.

Testing and using

When you’re ready to start using Logstash with this new configuration, you need to test whether it is syntactically correct, meaning there are no typos or other problems in the configuration.

(Edited) Note that the configtest is different in newer (5+) Logstash versions:

Only when you have a good look at your logs and parse them carefully, you can extract information that you can use in Kibana dashboards so everybody can use it. Moreover, being able to do this is a very useful skill and does not only apply to log data.

Lastly, I want to say a big thank you to Elastic and the Elastic community that enables us to make use of these beautiful tools.

--

--