[ 
https://issues.apache.org/jira/browse/KAFKA-244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13183540#comment-13183540
 ] 

Stefano Santoro edited comment on KAFKA-244 at 1/13/12 1:44 AM:
----------------------------------------------------------------

see attached patch file
                
      was (Author: vtkstef):
    Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala   
(revision 1229701)
+++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala   
(working copy)
@@ -66,9 +66,11 @@
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - 
%m%n")
     props.put("log4j.appender.KAFKA.Host", "localhost")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.encoder", 
"kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.SerializerClass", 
"kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // port missing
@@ -82,8 +84,10 @@
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - 
%m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.Encoder", 
"kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.SerializerClass", 
"kafka.log4j.AppenderStringEncoder")
     props.put("log4j.appender.KAFKA.Port", "9092")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
@@ -98,9 +102,11 @@
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - 
%m%n")
     props.put("log4j.appender.KAFKA.Host", "localhost")
     props.put("log4j.appender.KAFKA.Port", "9092")
-    props.put("log4j.appender.KAFKA.Encoder", 
"kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.SerializerClass", 
"kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // topic missing
@@ -114,6 +120,8 @@
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - 
%m%n")
     props.put("log4j.appender.KAFKA.Host", "localhost")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.appender.KAFKA.Port", "9092")
@@ -123,7 +131,7 @@
     try {
       PropertyConfigurator.configure(props)
     }catch {
-      case e: MissingConfigException => fail("should default to 
kafka.producer.DefaultStringEncoder")
+      case e: MissingConfigException => fail("should default to 
kafka.serializer.StringEncoder")
     }
   }
 
@@ -153,10 +161,11 @@
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Port", "9092")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - 
%m%n")
+    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:9092")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+    props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
     props
   }
 
Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
===================================================================
--- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 
1229701)
+++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy)
@@ -18,66 +18,86 @@
 package kafka.producer
 
 import async.MissingConfigException
-import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
 import org.apache.log4j.{Logger, AppenderSkeleton}
 import kafka.utils.{Utils, Logging}
 import kafka.serializer.Encoder
 import java.util.{Properties, Date}
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var port:Int = 0
   var host:String = null
   var topic:String = null
-  var encoderClass:String = null
+  var serializerClass:String = null
+  var zkConnect:String = null
+  var brokerList:String = null
   
-  private var producer:SyncProducer = null
-  private var encoder: Encoder[AnyRef] = null
-  
+  private var producer: Producer[String, String] = null
+
   def getPort:Int = port
-  def setPort(port: Int) = { this.port = port }
+  def setPort(port: Int) { this.port = port }
 
   def getHost:String = host
-  def setHost(host: String) = { this.host = host }
+  def setHost(host: String) { this.host = host }
 
   def getTopic:String = topic
-  def setTopic(topic: String) = { this.topic = topic }
+  def setTopic(topic: String) { this.topic = topic }
 
-  def getEncoder:String = encoderClass
-  def setEncoder(encoder: String) = { this.encoderClass = encoder }
+  def getZkConnect:String = zkConnect
+  def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
   
-  override def activateOptions = {
+  def getBrokerList:String = brokerList
+  def setBrokerList(brokerList: String) { this.brokerList = brokerList }
+  
+  def getSerializerClass:String = serializerClass
+  def setSerializerClass(serializerClass:String) { this.serializerClass = 
serializerClass }
+
+  override def activateOptions() {
+    val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
     // check for config parameter validity
-    if(host == null)
-      throw new MissingConfigException("Broker Host must be specified by the 
Kafka log4j appender")
-    if(port == 0)
-      throw new MissingConfigException("Broker Port must be specified by the 
Kafka log4j appender") 
+    val props = new Properties()
+    if( zkConnect == null) connectDiagnostic += "zkConnect"
+    else props.put("zk.connect", zkConnect);
+    if( brokerList == null) connectDiagnostic += "brokerList"
+    else if( props.isEmpty) props.put("broker.list", brokerList)
+    if( host == null || port == 0) connectDiagnostic += "host|port"
+    else if( props.isEmpty) {
+      props.put("broker.list", "0:"+host+":"+port.toString)
+    }
+    if(props.isEmpty )
+      throw new MissingConfigException(
+        connectDiagnostic mkString ("One of these connection properties must 
be specified: ", ", ", ".")
+      )
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka 
log4j appender")
-    if(encoderClass == null) {
-      info("Using default encoder - kafka.producer.DefaultStringEncoder")
-      encoder = Utils.getObject("kafka.producer.DefaultStringEncoder")
-    }else // instantiate the encoder, if present
-      encoder = Utils.getObject(encoderClass)
-    val props = new Properties()
-    props.put("host", host)
-    props.put("port", port.toString)
-    producer = new SyncProducer(new SyncProducerConfig(props))
-    info("Kafka producer connected to " + host + "," + port)
+    if(serializerClass == null) {
+      serializerClass = "kafka.serializer.StringEncoder"
+      info("Using default encoder - kafka.serializer.StringEncoder")
+    }
+    props.put("serializer.class", serializerClass)
+    val config : ProducerConfig = new ProducerConfig(props)
+    producer = new Producer[String, String](config)
+    info("Kafka producer connected to " + (if(config.zkConnect == null) 
config.brokerList else config.zkConnect))
     info("Logging for topic: " + topic)
   }
   
-  override def append(event: LoggingEvent) = {
-    debug("[" + new Date(event.getTimeStamp).toString + "]" + 
event.getRenderedMessage +
-            " for " + host + "," + port)
-    val message = encoder.toMessage(event)
-    producer.send(topic, new ByteBufferMessageSet(compressionCodec = 
NoCompressionCodec, messages = message))
+  override def append(event: LoggingEvent)  {
+    val message : String = if( this.layout == null) {
+      event.getRenderedMessage
+    }
+    else this.layout.format(event)
+    debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
+    val messageData : ProducerData[String, String] =
+      new ProducerData[String, String](topic, message)
+    producer.send(messageData);
   }
 
-  override def close = {
+  override def close() {
     if(!this.closed) {
       this.closed = true
-      producer.close
+      producer.close()
     }
   }
 

                  
> Improve log4j appender to use kafka.producer.Producer, and support 
> zk.connect|broker.list options  
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-244
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.7.1
>            Reporter: Stefano Santoro
>              Labels: log4j, newbie
>             Fix For: 0.7.1
>
>         Attachments: Log4jAppender.patch, 
> Log4jAppenderWithWorkingZkConnect.patch
>
>
> Taken from #kafka IRC session with Neha Narkhede:
> The log4j appender is quite obsolete, there are a few things to change there. 
> Make it use the kafka.producer.Producer instead of SyncProducer. That allows 
> you to use either the broker.list or the zk.connect option

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to