Adding Error stack trace and filter options in Log4j Kafka Appender
In the previous article, we discussed how to direct spark logs to kafka but one issue with org.apache.kafka.log4jappender.KafkaLog4jAppender class is that error stack trace is not pushed into kafka. Error stack trace is vital and forms a crucial part of log analysis.
In this article, we will discuss creation of our own kafka appender class(myKafkaAppender) by extending org.apache.kafka.log4jappender.KafkaLog4jAppender and customizing this class to cater to our needs. Also, this class needs to be specified in all log4j properties files.
Adding error stack trace:-
To add error stack trace, we will have to create a subAppend method (we cannot override subAppend method of base class as it is private) and store the error stack trace in a string. We will append this string with the error log message. Code snippet of subAppend method is given below:-
private String subAppend(LoggingEvent event) {
StringBuilder str=new StringBuilder();
if(layout.ignoresThrowable()) {
String[] s = event.getThrowableStrRep();
if (s != null) {
int len = s.length;
for(int i = 0; i < len; i++) {
str=str.append(s[i]);
str=str.append(Layout.LINE_SEP);
}
}
}
return (this.layout == null) ? event.getRenderedMessage()+str : this.layout.format(event)+str;
}
subAppend method is called by the append method and the returned string by subAppend method is then published into the kafka topic.
Adding filter method to filter logs:-
We can create a filter method in our child class and use this to filter logs before publishing into kafka.
Below are some properties we added to filter our logs:-
1. filter_log_level:- To filter logs on the basis of logging level(INFO,WARN,ERROR)
2. stringToMatch:- To filter logs on the basis of string matching
3. acceptOnMatch:- To either accept(publish to kafka) or reject the filtered logs.
These three properties can be combined to filter logs on various levels.
For ex:- To reject all INFO logs or to accept just INFO logs having a certain substring.
The updated log4j.properties file is given below:-
# Root logger option
log4j.rootLogger=INFO, stdout,kafka
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
# Redirect log messages to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd'T'HH:mm:ss.SSS}{GMT}] %p %m (%c)%n
log4j.appender.kafka=myKafkaAppender
log4j.appender.kafka.brokerList=localhost:9095
log4j.appender.kafka.topic=Kafkalogstemp5
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.filter_log_level=INFO
log4j.appender.kafka.StringToMatch=INFO Deleting directory,INFO Shutdown hook
log4j.appender.kafka.AcceptOnMatch=false
log4j.appender.kafka.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss.SSS}] %p %m (%c)%n
stringToMatch will have a string containing comma separated substrings as input. Any one of these substrings will be eligible for triggering the filter condition.
The complete code can be found on the link below:-
https://github.com/parkar-12/spark-examples/tree/master/myKafkaAppender
Comments
Post a Comment