By Andrei Petrescu Head of DevOps, CloudHero
One common use case when
sending logs to Elasticsearch is to send different lines of the log file to
different indexes based on matching patterns. In this article, we will go
through the process of setting this up using both Fluentd and Logstash in order
to give you more flexibility and ideas on how to approach the topic.
Additionally, we'll also make use of grok patterns and go
through examples so you can be confident that after reading it you can
replicate this setup to suit your needs.
How to get started with Kubernetes clusters
Firstly, you need to have access to a working Kubernetes
cluster. Then, you can deploy your own on your laptop/PC by using the myriad of
tools that are available right now. We will just name a few:
- Docker for Mac/Windows
- Minikube
- MicroK8s
- K3s
Tutorials on how to set up your Kubernetes cluster can be
found all over the internet. An example on how you can set up your cluster
using Docker for Mac can be found here.
How to prepare the Index Routing
Secondly, you need to have Kubectl and Helm v3 installed
to use the resources posted on our GitHub repo dedicated to this blogpost.
Next, you can clone the repository run the following
command:
$ git clone
https://github.com/cloud-hero/multiple-index-routing.git
Inside, there are 2 folders:
- spitlogs (this contains the Dockerfile and resources to
build the spitlogs image if you don't intend on using the one hosted on our
Docker Hub repository)
- helm-values (this contains
all the necessary Helm values files to deploy our logging stack)
Eventually, in order to generate logs, we need to run the
spitlogs container in its own namespace:
$ kubectl
create namespace spitlogs
$ kubectl -n
spitlogs run spitlogs --image=cloudhero/spitlogs
As you can see, it's a simple bash script that outputs
the same 4 Nginx logs over and over again. More specifically, 2 access and 2
error logs with the timestamp removed so as to not mess with the timestamp
created by fluentd or logstash:
[error]
7359#7359: *17367 upstream timed out (110: Connection timed out) while
connecting to upstream, client: 1.1.1.1, server: 0.0.0.0:443, upstream:
"2.2.2.2:443", bytes from/to client:0/0, bytes from/to upstream:0/0
1.1.1.1 - -
"GET / HTTP/1.1" 200 37427 "https://www.test.com/"
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like
Gecko) Chrome/83.0.4103.97 Safari/537.36" "3.3.3.3,4.4.4.4"
"FyqZnlcMTwjKj8LfT2JwjF3HHs" "0.187" "0.188"
[error]
7359#7359: *17367 no live upstreams while connecting to upstream, client:
5.5.5.5, server: 0.0.0.0:443, upstream: "api.fake.io", bytes from/to
client:0/0, bytes from/to upstream:0/0
1.1.1.1 - -
"GET /testpage2 HTTP/1.1" 200 47002
"https://www.fakepage.com/" "Mozilla/5.0 (Windows NT 6.1;
rv:77.0) Gecko/20100101 Firefox/77.0" "6.6.6.6,7.7.7.7"
"tEhE9doovS3TCfW2tSwf4aN6IU" "0.833" "0.836"
Then, we want to send error logs to an Elasticsearch
index and the access logs to another index, while having them both correctly
parsed.
Before deploying, we need to create a new namespace and
add some Helm repositories:
$ kubectl
create namespace logging
$ helm repo
add stable https://kubernetes-charts.storage.googleapis.com
$ helm repo
add elastic https://helm.elastic.co
$ helm repo
add kiwigrid https://kiwigrid.github.io
Deoploying Elasticsearch
Next, let's deploy Elasticsearch and Kibana using e
values files provided in the multiple-index-routing repository.
$ cd
multiple-index-routing
$ helm
install elasticsearch elastic/elasticsearch --namespace logging -f
helm-values/es-values.yaml
$ helm install
kibana elastic/kibana --namespace logging -f helm-values/kibana-values.yaml
We advise you to check that the setup is okay. To enable
it, forward the 5601 port on the Kibana container:
$ kubectl -n
logging port-forward $(kubectl -n logging get pods -l app=kibana -oname) 5601
and open http://localhost:5601 in your browser.
Last but not least, we need to tail our application logs,
which requires deploying an agent that can harvest them from disk and send them
to the appropriate aggregator. Consequently, the rest of this tutorial will be
split into two parts, one focusing on fluentbit/fluentd, and the other on
filebeat/logstash.
1. Fluentbit/Fluentd for Index Setup
Let's take a look at how we can achieve the above task
using the aforementioned technologies.
We start by configuring Fluentd. Custom plugins are
required in this case, namely fluent-plugin-grok-parser and fluent-plugin-rewrite-tag-filter, thus we created a custom
image that we pushed on our Docker Hub. As previously recommended, if you want
to build the image on your own and push it to your own registry, you can do it
by using this simple Dockerfile:
FROM
quay.io/fluentd_elasticsearch/fluentd
RUN
fluent-gem install fluent-plugin-rewrite-tag-filter fluent-plugin-grok-parser
We will proceed with installing Fluentd using the values
file provided in our repository:
$ helm
install fluentd kiwigrid/fluentd-elasticsearch --namespace logging -f
helm-values/fluentd-values.yaml
Now let's have a look at the configuration file:
<source>
@type forward
tag fakelogs
port 24224
bind 0.0.0.0
</source>
<filter
fakelogs>
@type parser
key_name log
<parse>
@type grok
<grok>
pattern %{IPORHOST:remote_ip} -
%{DATA:user_name} "%{WORD:method} %{DATA:url}
HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:bytes}
"%{DATA:referrer}" "%{DATA:agent}"
"(?:%{IPORHOST:real_ip},%{IPORHOST:internal_ip}|-)"
"%{DATA:frontend_cookie}" "%{DATA:request_time}"
"%{DATA:upstream_response_time}"
</grok>
<grok>
pattern \[%{LOGLEVEL:severity}\]
%{POSINT:pid}#%{NUMBER:threadid}\: \*%{NUMBER:connectionid}
%{GREEDYDATA:error_msg}, client: %{IP:client}, server: %{GREEDYDATA:server},
upstream: "%{GREEDYDATA:upstream}", bytes from/to
client:%{GREEDYDATA:bytes_ft_client}, bytes from/to
upstream:%{GREEDYDATA:bytes_ft_upstream}
</grok>
</parse>
</filter>
<match
fakelogs>
@type rewrite_tag_filter
<rule>
key severity
pattern /.*/
tag error_log
</rule>
<rule>
key response_code
pattern /.*/
tag access_log
</rule>
</match>
<match
error_log>
@type elasticsearch
hosts elasticsearch-master:9200
logstash_format true
logstash_prefix fd-error
type_name log
</match>
<match
access_log>
@type elasticsearch
hosts elasticsearch-master:9200
logstash_format true
logstash_prefix fd-access
type_name log
</match>
How to read the Fluentd configuration file
The first block we shall have a look at is the
<source> block. It specifies that fluentd is listening on port 24224 for
incoming connections and tags everything that comes there with the tag
fakelogs. For this reason, tagging is important because we want to apply
certain actions only to a certain subset of logs.
The <filter> block takes every log line and parses
it with those two grok patterns. If the first one fails, it tries the second
one, and if the second one fails, the log line remains unparsed. You can have N
grok patterns and your parsing will stop at the pattern that was a successful
match.
After the <filter> block, we have our first
<match> block which makes use of the rewrite_tag_filter plugin. This
plugin uses regex patterns to check if a field from the parsed log line matches
something specific. In our case, we only check if that field exists. For
example, an access log will surely not have a severity field, because it is not
even mentioned in the grok pattern. Moreover, after determining the types of
logs, we replace the old fakelogs tag, with a new one, either error_log or
access_log. This action helps us make decisions down the line about what to do
with them.
The magic happens in the last 2 <match> blocks,
because depending on which tag the log line has assigned, it is either sent to
the fd-access-* index, or the fd-error-* one.
Sending logs to fluentd
Now that we have our fluentd agent configured and
running, we must use something to send logs to it. Coming up next, we will
install the fluentbit agent:
$ helm
install fluentbit stable/fluent-bit --namespace logging -f
helm-values/fluentbit-values.yaml
Let's take a quick look at the values file for fluentbit
too. The only line which needs explaining is this one:
path:
/var/log/containers/spitlogs*_spitlogs_spitlogs-*.log
As stated in our previous article regarding
fluentbit, Kubernetes stores logs on disk using the <deployment_name>*_<namespace>_<container>-*.log format, so we make use of
that fact to only target the logs files from our spitlogs application.
After installing fluentbit, we should see some action in
Kibana.
Firstly, let's create a new Elasticsearch index pattern
in Kibana:
You can see that Kibana can find the fd-access-* and
fd-error-* indices in Elasticsearch:
To create the fd-access-* index pattern, write exactly
that in place of index-name-* and click Next step.
In the next step, choose @timestamp as the timestamp, and
finally, click Create index pattern. Repeat the same steps for the fd-error-*
index pattern as well.
After this, we can go to the Discover tab and see that we
have two index patterns created with parsed logs inside them.

2. Logstash/Filebeat for Index Routing
If Fluentbit/Fluentd does not suit your needs, the
alternative solution for Multiple Index Routing using Logstash and Filebeat.
Let's install Logstash using the values file provided in
our repository
$ helm
install logstash elastic/logstash --namespace logging -f
helm-values/logstash-values.yaml
and have a look at the configuration file:
input {
beats {
port => 5044
host => "0.0.0.0"
tags => ["fakelogs"]
}
}
filter {
if "fakelogs" in [tags] {
grok {
match => { "message" =>
"%{IPORHOST:remote_ip} - %{DATA:user_name} \"%{WORD:method}
%{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code}
%{NUMBER:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\"
\"(?:%{IPORHOST:real_ip},%{IPORHOST:internal_ip}|-)\"
\"%{DATA:frontend_cookie}\" \"%{DATA:request_time}\"
\"%{DATA:upstream_response_time}\"" }
add_tag => ["access_log"]
}
if "_grokparsefailure" in [tags]
{
grok {
match => { "message"
=> "\[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER:threadid}\:
\*%{NUMBER:connectionid} %{GREEDYDATA:error_msg}, client: %{IP:client}, server:
%{GREEDYDATA:server}, upstream: \"%{GREEDYDATA:upstream}\", bytes
from/to client:%{GREEDYDATA:bytes_ft_client}, bytes from/to upstream:%{GREEDYDATA:bytes_ft_upstream}"
}
add_tag => ["error_log"]
remove_tag =>
["_grokparsefailure"]
}
}
}
}
filter {
if "access_log" in [tags] {
mutate { add_field => {
"[@metadata][target_index]" =>
"ls-access-%{+yyyy.MM.dd}" } }
}
else {
mutate { add_field => {
"[@metadata][target_index]" => "ls-error-%{+yyyy.MM.dd}"
} }
}
}
output {
elasticsearch {
hosts =>
"elasticsearch-master:9200"
index =>
"%{[@metadata][target_index]}"
}
}
The input {} block is analogous to the <source>
block in fluentd, and does the same thing here, but it listens on a different
port.
How to read the Logstash configuration file
The first filter {} block first tries to parse the log
line with the access log grok pattern. Therefore, if a log line is not matched
with a grok pattern, logstash adds a _grokparsefailure tag in the tag array, so
we can check for it and parse again if the first try was unsuccessful.
Additionally, we use the same tags as in fluentd, and remove the previously
assigned _grokparsefailure tag.
The second filter {} block looks in the tag list and
assigns a different value to the target_index metadata field.
Lastly, the output {} block uses the target_index
metadata field to select the Elasticsearch index to which to send data.
If we have a look in Kibana, we can see that we have two
new indexes created in Elasticsearch.
Now we're taking the last step towards finishing our
Multiple Index Routing. Take a look at the index pattern creation-it's the same
as before. Then, after creating them, we can see that the logs go to the
correct indexes and are being parsed accordingly.
Pattern creation
You reached the end of this
article hoping that you feel confident enough to adapt these tips to your use
case.
Stay tuned for the latest updates
##
To learn more about containerized infrastructure and cloud native technologies, consider joining us at KubeCon + CloudNativeCon NA Virtual, November 17-20.
About the Author
Andrei Petrescu Head of DevOps, CloudHero
Andrei is the Head of the DevOps Department and blogger at CloudHero,
and also a Local Networks Teaching Assistant at the University Politehnica of
Bucharest. He is a Certified Kubernetes Administrator and holds a Bachelor
Degree in Computers and Information Technology from the Faculty of Automatic
Control and Computers.