Distributed tracing with Istio, Kubernetes, Elasticsearch, Fluentd and Kibana
In a complex distributed Kubernetes systems consisting of dozens of services, running in hundreds of pods and spanning across multiple nodes, it might be challenging to trace execution of a specific flow, especially when services interact one with another (directly via REST calls or indirectly via messaging). In this article, I will explain how it can be done easy using Elasticsearch, Fluentd and Kibana (a.k.a. EFK stack).
Elasticseach
Elasticsearch is an extremely popular NoSQL document database, especially good at two things:
- High write throughput — from my experience, on average hardware able to accept and index millions of documents per minute
- Full text search — able to search very fast text or partial text inside indexed documents
These two things making it as an ideal database for storing logs. It can be deployed as a standalone server (suitable for local deployment environment) or as a highly available cluster (on-premises or on cloud). Every major cloud provider also has a “managed” version of Elasticsearch service. For last three years I was using AWS Elasticsearch service in production and was very pleased with the overall experience. Recently due to licensing dispute, AWS switched to use an open source fork of Elasticseach, named Opensearch.
Kibana
Kibana is a web UI designed to work with Elasticsearch. Using it, it is possible to query Elasticsearch and to visualize results in a different ways. We will use it to query our logs.
Fluentd
According to Fluent’s website:
Fluentd is an open source data collector for unified logging layer.
To make it simple, Fluentd is an open source component able to gather logs from specified locations, parse and transform logs contents if needed and to send result to desired destination. And everything is flexible and configurable. There are many opensource plugins for any of these stages (e.g. gathering, parsing, sending) and ready for use integrations with many well known components, including Kubernetes and Elasticsearch.
In Kubernetes world, it is a common to deploy Fluentd as a Daemonset — which means on every node there is a single Fluentd instance, collecting logs from all pods running on that node and sending to the configured destination — in our case Elasticsearch.
In fact, Fluentd on Kubernetes is so popular that there are many ready for use YAMLs and configurations out there for convenient deployment. One of the places you can find them is at fluentd-kubernetes-daemonset project. it contains templates you can use in order to build Docker images on your own or link to prebuilt, ready for use images. As at time of writing this article, there is also ready for use image pre-configured for integration with Elasticsearch:
docker pull fluent/fluentd-kubernetes-daemonset:v1.14.5-debian-elasticsearch7-amd64–1.1
If you will look into its configuration, you will notice many configuration parameters, most of them optional, which expected to be provided as environment variables and can be used in order to configure various parameters of the integration with the Elasticsearch.
In addition, there is an example of YAML, which can be used as a baseline for your needs in order to deploy desired Fluentd image as a Daemonset on Kubernetes. Note the “env” section, where can be specified all desired configuration parameters as environment variables.
Distributed Tracing
These days it is a very common setup to have microservices running on Kubernetes and Istio acting as an API gateway in front. The usual flow is when a client issuing a HTTP request, HTTP request received by Istio ingress gateway, and then Istio ingress gateway passing that request to the target Kubernetes service according to Virtual Service or Destination Rules definitions.
But this is only the beginning of the story. While request being processed by one of the service pods, interaction with other services might be needed in order to satisfy request. So, pod might issue a request (directly or indirectly) to another service, which in turn can call to another one, and so one. Now imagine that we need to analyze such flow and understand what happened on the way during execution.
Sure, having Fluentd set and running of each of the nodes, collecting and sending logs to Elasticsearch will give us all logs in one single place — and that’s great. However, each of the logs will be stored as a standalone record, and it will be nearly impossible to understand the story of the whole flow. We missing a “secret ingredient” — Flow ID.
Ideally, we would like to have each log record belonging to same flow to have the same the Flow ID recorded. This way, we could query Kibana for specific Flow ID and to have all logs from that specific request with ability to search in these logs any desired text or values!
If you are using Istio, then I have a great news for you: for every incoming HTTP request, Istio ingress gateway automatically injects a special header named “x-request-id”, containing an unique Flow ID value. However, it being injected only to the request from Istio ingress gateway to the first receiving pod. In case during the flow there are subsequent call(s) from the receiving pod to another pod(s) — it is a developer responsibility to propagate received “x-request-id” header forward. It worth to mention that it doesn’t have to be HTTP requests. For example, in case you are using messaging for internal communication — then you can add value of the “x-request-id” as an attribute to the sent message. The only thing important is to make sure that every log you are logging will contain a value of the “x-request-id”.
So, no matter which technology you are using for your services and their internal communication, the general idea presented in the diagram below:
In the following section I will suggest a one possible implementation approach for Java / Spring Boot / Logback stack.
Spring Boot specific implementation
Java together with a Spring Boot is a popular choice for microservices implementation. By default Spring Boot is configured with Tomcat as an embedded web server and a with Logback as a default logger. We will keep all these defaults for our example. Also, for sake of simplicity we will assume all communication between services done over REST, but other options such as messaging are possible using similar concepts.
Our implementation consists of three major parts:
- On incoming HTTP request, extract “x-request-id” header.
- Each request (at least in Tomcat) received on a separate thread. Store “x-request-id” header value per thread and configure logger to automatically inject it for every log.
- On every issued outgoing HTTP request, automatically use stored “x-request-id” header value and add it to the request headers.
We we start from bullet #2: we will define a component which will allow to store “x-request-id” value per thread and to get that value or clear it on demand. In Spring Boot, that would be a Service component, we will name it as a FlowService:
@Service
public class FlowService {
private final ConcurrentMap<Thread, String> flowStates = new ConcurrentHashMap<>();private final static String FlowIdName = "flow-id";public String getFlowId() {
Thread thread = Thread.currentThread();
return this.flowStates.getOrDefault(thread, null);
}
public void setFlowId(@NotNull String flowId) {
String thread = Thread.currentThread();
this.flowStates.put(thread, flowId);
this.injectFlowIdIntoLoggerContext(flowId);
}
public void clearFlowId() {
String thread = Thread.currentThread();
this.clearFlowIdFromLoggerContext();
this.flowStates.remove(thread);
}
private void injectFlowIdIntoLoggerContext(String flowId) {
MDC.put(FlowIdName, flowId);
}
private void clearFlowIdFromLoggerContext() {
MDC.remove(FlowIdName);
}
}
The code above is quite straightforward, we just managing a thread/flowId pairs in a thread safe dictionary. One particularly interesting moment, in my opinion, is a use of MDC in injectFlowIdIntoLoggerContext / clearFlowIdFromLoggerContext methods. MDC is a place where you can store desired values per thread and they will be automatically injected with any log being logged — exactly what we need in order to make our FlowId added automatically as part of any log.
Next, we will implement bullet #1 — extracting desired headers (“x-request-id” in our example) from the incoming requests and then storing them inside FlowService. One possible approach to do that is Spring Boot is to use a filter:
public class IncomingRequestFilter implements Filter {
private final FlowService flowService;
public IncomingRequestFilter(FlowService flowService) {
this.flowService= flowService;
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse response, FilterChain chain) throws IOException, ServletException {
if (servletRequest instanceof HttpServletRequest) {
HttpServletRequest request = (HttpServletRequest) servletRequest;String flowIdValue = request.getHeader("x-request-id");
if(!flowIdValue.isBlank())
this.flowService.setFlowId(flowIdValue);
}
chain.doFilter(servletRequest, response);
this.flowService.clearFlowId();
}
}
Here in filter, we getting “x-request-id” header from the incoming request and if it not null storing inside FlowService. Please note that right after request completion (e.g. after “chain.doFilter”) we are clearing flowId from the FlowService. This is because after request processing completion thread returns to the thread pool, and we need to clean it before.
Now we need to register that filter in the filters pipeline, and we want to put it in the top of the chain as possible in order to enable FlowId for the subsequent filters:
@Configuration
@ConditionalOnWebApplication
public class IncomingRequestFilterConfiguration {
@Bean
public FilterRegistrationBean injectInContextFilter(FlowService flowService) {
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(new IncomingRequestFilter(flowService));
registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE);
return registrationBean;
}
}
At this point, for every incoming request we are extracting “x-request-id” header and if it’s value is not null — storing inside FlowService for later. Now we will handle the bullet #3: on every outgoing request, get the stored flowId value and add it to the request headers. Since all outgoing REST requests done (by default) using RestTemplate, we will implement ClientHttpRequestInterceptor which will add “x-request-id” header into outgoing request:
@Component
public class OutgoingInterceptor implements ClientHttpRequestInterceptor {
private FlowService flowService;
public OutgoingInterceptor(FlowService flowService) {
this.flowService= flowService;
}
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
String flowId = this.flowService.getFlowId();
if (!flowId.isBlank()) {
request.getHeaders().add("x-request-id", flowId);
}
return execution.execute(request, body);
}
}
Here, assuming all outgoing requests issued from the same thread which is handling incoming request, we are getting flowId value associated with that thread and injecting it into any outgoing request. Next step would be register our interceptor in the RestTemplate’s interceptors collection:
@Configuration
public class RestTemplateConfiguration {
@Bean
@Primary
@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate(); List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
if (CollectionUtils.isEmpty(interceptors)) {
interceptors = new ArrayList<>();
}
interceptors.add(this.tracingInterceptor);
restTemplate.setInterceptors(interceptors); return restTemplate;
}
}
As a last step, we have to configure our Logback logger. I found that it is very convenient if our logs initially being logged in a Logstash format. Logstash format is a special JSON log representation, which is natively stored in Elasticsearch as a JSON object, allowing later to query on any of the log properties. Very powerful stuff, as we will see shortly.
In order to log our logs in a Logstash format, I recommending to use this great library, “logstash-logback-encoder”. After adding it to the dependencies, we need to modify logger configuration in order to use logstash encoder (usually in logback.xml):
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="Console"
class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
</encoder>
</appender><root level="INFO">
<appender-ref ref="Console" />
</root>
</configuration>
In this configuration, we configured Console appender to use LogstashEncoder, so all logs will be logged in JSON format into stdout. As with any Docker containers, all stdout content is being written into files at special location, which is “/var/lib/docker/containers” (if you using MicroK8s — it would be “/var/snap/microk8s/common/var/lib/containerd”), then these files would be picked up by a FluentD (which should be configured appropriately at “dockercontainerlogdirectory” at its YAML). FluentD will recognize Logstash format (by default you have “logstash_format=true” in the fluentd.conf) and then will send log record to the Elasticsearch, which in turn will create Logstash index if needed and will index a log record.
Querying Indexed Logs
First thing to do after you logging into Kibana, is to perform a one-time Index Patterns configuration, which will tell Kibana which indexes to query. We do that at Management>Index patterns view. Press “Create index pattern” and enter a pattern “logstash-*”. This will make Kibana to query all indexes which names starting with “logstash-”. This is because FluentD will instruct Elasticsearch (via index template) to automatically create a different index per each day, for example “logstash-2022.03.18”. Press “Next step”, choose in time filter dropdown a @Timestamp field and then press “Create index pattern”.
Now you should able to query logs by providing a specific FlowID. For example, go to “Discovery” view, make sure “logstash-*” index pattern is selected and then in query input put the following text:
flow-id : *
In time period input put something to ensure you will get some results, like “Last 1 year”. Hit “Update / Refresh” button — if you had any logs with “flow-id” from the last year indexed — you will get them in results. Now expand of the records. You expected to see list of name/value fields, such as “kubernetes.container_name”, “kubernetes.pod_name”, “message” and of cause “flow-id”. You can build a powerful queries to query logs based on any combination of any of these fields!
Last tip: in production, I found extremely useful to provide “flow id” in the error message presented to user in case of the error. This way, based on “flow-id” value, support could immediately query for logs from that specific flow which led to the error, see the whole story where exactly the flow passed and what happened on its way and this way easily identify an issue root cause.