[ https://issues.apache.org/jira/browse/KAFKA-244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13183540#comment-13183540 ]
Stefano Santoro edited comment on KAFKA-244 at 1/13/12 1:44 AM: ---------------------------------------------------------------- see attached patch file was (Author: vtkstef): Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (revision 1229701) +++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (working copy) @@ -66,9 +66,11 @@ var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") + props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Host", "localhost") props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.appender.KAFKA.encoder", "kafka.log4j.AppenderStringEncoder") + props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") // port missing @@ -82,8 +84,10 @@ props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") + props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder") + props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.appender.KAFKA.Port", "9092") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -98,9 +102,11 @@ props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") + props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Host", "localhost") props.put("log4j.appender.KAFKA.Port", "9092") - props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder") + props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") // topic missing @@ -114,6 +120,8 @@ props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") + props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Host", "localhost") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.Port", "9092") @@ -123,7 +131,7 @@ try { PropertyConfigurator.configure(props) }catch { - case e: MissingConfigException => fail("should default to kafka.producer.DefaultStringEncoder") + case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder") } } @@ -153,10 +161,11 @@ var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.Port", "9092") - props.put("log4j.appender.KAFKA.Host", "localhost") + props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:9092") props.put("log4j.appender.KAFKA.Topic", "test-topic") - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") + props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") props } Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala =================================================================== --- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 1229701) +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy) @@ -18,66 +18,86 @@ package kafka.producer import async.MissingConfigException -import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.spi.{LoggingEvent, ErrorCode} import org.apache.log4j.{Logger, AppenderSkeleton} import kafka.utils.{Utils, Logging} import kafka.serializer.Encoder import java.util.{Properties, Date} import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} +import scala.collection._ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var port:Int = 0 var host:String = null var topic:String = null - var encoderClass:String = null + var serializerClass:String = null + var zkConnect:String = null + var brokerList:String = null - private var producer:SyncProducer = null - private var encoder: Encoder[AnyRef] = null - + private var producer: Producer[String, String] = null + def getPort:Int = port - def setPort(port: Int) = { this.port = port } + def setPort(port: Int) { this.port = port } def getHost:String = host - def setHost(host: String) = { this.host = host } + def setHost(host: String) { this.host = host } def getTopic:String = topic - def setTopic(topic: String) = { this.topic = topic } + def setTopic(topic: String) { this.topic = topic } - def getEncoder:String = encoderClass - def setEncoder(encoder: String) = { this.encoderClass = encoder } + def getZkConnect:String = zkConnect + def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect } - override def activateOptions = { + def getBrokerList:String = brokerList + def setBrokerList(brokerList: String) { this.brokerList = brokerList } + + def getSerializerClass:String = serializerClass + def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } + + override def activateOptions() { + val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer(); // check for config parameter validity - if(host == null) - throw new MissingConfigException("Broker Host must be specified by the Kafka log4j appender") - if(port == 0) - throw new MissingConfigException("Broker Port must be specified by the Kafka log4j appender") + val props = new Properties() + if( zkConnect == null) connectDiagnostic += "zkConnect" + else props.put("zk.connect", zkConnect); + if( brokerList == null) connectDiagnostic += "brokerList" + else if( props.isEmpty) props.put("broker.list", brokerList) + if( host == null || port == 0) connectDiagnostic += "host|port" + else if( props.isEmpty) { + props.put("broker.list", "0:"+host+":"+port.toString) + } + if(props.isEmpty ) + throw new MissingConfigException( + connectDiagnostic mkString ("One of these connection properties must be specified: ", ", ", ".") + ) if(topic == null) throw new MissingConfigException("topic must be specified by the Kafka log4j appender") - if(encoderClass == null) { - info("Using default encoder - kafka.producer.DefaultStringEncoder") - encoder = Utils.getObject("kafka.producer.DefaultStringEncoder") - }else // instantiate the encoder, if present - encoder = Utils.getObject(encoderClass) - val props = new Properties() - props.put("host", host) - props.put("port", port.toString) - producer = new SyncProducer(new SyncProducerConfig(props)) - info("Kafka producer connected to " + host + "," + port) + if(serializerClass == null) { + serializerClass = "kafka.serializer.StringEncoder" + info("Using default encoder - kafka.serializer.StringEncoder") + } + props.put("serializer.class", serializerClass) + val config : ProducerConfig = new ProducerConfig(props) + producer = new Producer[String, String](config) + info("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList else config.zkConnect)) info("Logging for topic: " + topic) } - override def append(event: LoggingEvent) = { - debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage + - " for " + host + "," + port) - val message = encoder.toMessage(event) - producer.send(topic, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message)) + override def append(event: LoggingEvent) { + val message : String = if( this.layout == null) { + event.getRenderedMessage + } + else this.layout.format(event) + debug("[" + new Date(event.getTimeStamp).toString + "]" + message) + val messageData : ProducerData[String, String] = + new ProducerData[String, String](topic, message) + producer.send(messageData); } - override def close = { + override def close() { if(!this.closed) { this.closed = true - producer.close + producer.close() } } > Improve log4j appender to use kafka.producer.Producer, and support > zk.connect|broker.list options > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-244 > URL: https://issues.apache.org/jira/browse/KAFKA-244 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 0.7.1 > Reporter: Stefano Santoro > Labels: log4j, newbie > Fix For: 0.7.1 > > Attachments: Log4jAppender.patch, > Log4jAppenderWithWorkingZkConnect.patch > > > Taken from #kafka IRC session with Neha Narkhede: > The log4j appender is quite obsolete, there are a few things to change there. > Make it use the kafka.producer.Producer instead of SyncProducer. That allows > you to use either the broker.list or the zk.connect option -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira