[
https://issues.apache.org/jira/browse/KAFKA-244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13183540#comment-13183540
]
Stefano Santoro edited comment on KAFKA-244 at 1/13/12 1:44 AM:
----------------------------------------------------------------
see attached patch file
was (Author: vtkstef):
Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
(revision 1229701)
+++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
(working copy)
@@ -66,9 +66,11 @@
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c -
%m%n")
props.put("log4j.appender.KAFKA.Host", "localhost")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.appender.KAFKA.encoder",
"kafka.log4j.AppenderStringEncoder")
+ props.put("log4j.appender.KAFKA.SerializerClass",
"kafka.log4j.AppenderStringEncoder")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// port missing
@@ -82,8 +84,10 @@
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c -
%m%n")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.appender.KAFKA.Encoder",
"kafka.log4j.AppenderStringEncoder")
+ props.put("log4j.appender.KAFKA.SerializerClass",
"kafka.log4j.AppenderStringEncoder")
props.put("log4j.appender.KAFKA.Port", "9092")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
@@ -98,9 +102,11 @@
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c -
%m%n")
props.put("log4j.appender.KAFKA.Host", "localhost")
props.put("log4j.appender.KAFKA.Port", "9092")
- props.put("log4j.appender.KAFKA.Encoder",
"kafka.log4j.AppenderStringEncoder")
+ props.put("log4j.appender.KAFKA.SerializerClass",
"kafka.log4j.AppenderStringEncoder")
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
// topic missing
@@ -114,6 +120,8 @@
props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c -
%m%n")
props.put("log4j.appender.KAFKA.Host", "localhost")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.appender.KAFKA.Port", "9092")
@@ -123,7 +131,7 @@
try {
PropertyConfigurator.configure(props)
}catch {
- case e: MissingConfigException => fail("should default to
kafka.producer.DefaultStringEncoder")
+ case e: MissingConfigException => fail("should default to
kafka.serializer.StringEncoder")
}
}
@@ -153,10 +161,11 @@
var props = new Properties()
props.put("log4j.rootLogger", "INFO")
props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
- props.put("log4j.appender.KAFKA.Port", "9092")
- props.put("log4j.appender.KAFKA.Host", "localhost")
+ props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+ props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c -
%m%n")
+ props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:9092")
props.put("log4j.appender.KAFKA.Topic", "test-topic")
- props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+ props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
props
}
Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
===================================================================
--- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision
1229701)
+++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy)
@@ -18,66 +18,86 @@
package kafka.producer
import async.MissingConfigException
-import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
import org.apache.log4j.{Logger, AppenderSkeleton}
import kafka.utils.{Utils, Logging}
import kafka.serializer.Encoder
import java.util.{Properties, Date}
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import scala.collection._
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var port:Int = 0
var host:String = null
var topic:String = null
- var encoderClass:String = null
+ var serializerClass:String = null
+ var zkConnect:String = null
+ var brokerList:String = null
- private var producer:SyncProducer = null
- private var encoder: Encoder[AnyRef] = null
-
+ private var producer: Producer[String, String] = null
+
def getPort:Int = port
- def setPort(port: Int) = { this.port = port }
+ def setPort(port: Int) { this.port = port }
def getHost:String = host
- def setHost(host: String) = { this.host = host }
+ def setHost(host: String) { this.host = host }
def getTopic:String = topic
- def setTopic(topic: String) = { this.topic = topic }
+ def setTopic(topic: String) { this.topic = topic }
- def getEncoder:String = encoderClass
- def setEncoder(encoder: String) = { this.encoderClass = encoder }
+ def getZkConnect:String = zkConnect
+ def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
- override def activateOptions = {
+ def getBrokerList:String = brokerList
+ def setBrokerList(brokerList: String) { this.brokerList = brokerList }
+
+ def getSerializerClass:String = serializerClass
+ def setSerializerClass(serializerClass:String) { this.serializerClass =
serializerClass }
+
+ override def activateOptions() {
+ val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
// check for config parameter validity
- if(host == null)
- throw new MissingConfigException("Broker Host must be specified by the
Kafka log4j appender")
- if(port == 0)
- throw new MissingConfigException("Broker Port must be specified by the
Kafka log4j appender")
+ val props = new Properties()
+ if( zkConnect == null) connectDiagnostic += "zkConnect"
+ else props.put("zk.connect", zkConnect);
+ if( brokerList == null) connectDiagnostic += "brokerList"
+ else if( props.isEmpty) props.put("broker.list", brokerList)
+ if( host == null || port == 0) connectDiagnostic += "host|port"
+ else if( props.isEmpty) {
+ props.put("broker.list", "0:"+host+":"+port.toString)
+ }
+ if(props.isEmpty )
+ throw new MissingConfigException(
+ connectDiagnostic mkString ("One of these connection properties must
be specified: ", ", ", ".")
+ )
if(topic == null)
throw new MissingConfigException("topic must be specified by the Kafka
log4j appender")
- if(encoderClass == null) {
- info("Using default encoder - kafka.producer.DefaultStringEncoder")
- encoder = Utils.getObject("kafka.producer.DefaultStringEncoder")
- }else // instantiate the encoder, if present
- encoder = Utils.getObject(encoderClass)
- val props = new Properties()
- props.put("host", host)
- props.put("port", port.toString)
- producer = new SyncProducer(new SyncProducerConfig(props))
- info("Kafka producer connected to " + host + "," + port)
+ if(serializerClass == null) {
+ serializerClass = "kafka.serializer.StringEncoder"
+ info("Using default encoder - kafka.serializer.StringEncoder")
+ }
+ props.put("serializer.class", serializerClass)
+ val config : ProducerConfig = new ProducerConfig(props)
+ producer = new Producer[String, String](config)
+ info("Kafka producer connected to " + (if(config.zkConnect == null)
config.brokerList else config.zkConnect))
info("Logging for topic: " + topic)
}
- override def append(event: LoggingEvent) = {
- debug("[" + new Date(event.getTimeStamp).toString + "]" +
event.getRenderedMessage +
- " for " + host + "," + port)
- val message = encoder.toMessage(event)
- producer.send(topic, new ByteBufferMessageSet(compressionCodec =
NoCompressionCodec, messages = message))
+ override def append(event: LoggingEvent) {
+ val message : String = if( this.layout == null) {
+ event.getRenderedMessage
+ }
+ else this.layout.format(event)
+ debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
+ val messageData : ProducerData[String, String] =
+ new ProducerData[String, String](topic, message)
+ producer.send(messageData);
}
- override def close = {
+ override def close() {
if(!this.closed) {
this.closed = true
- producer.close
+ producer.close()
}
}
> Improve log4j appender to use kafka.producer.Producer, and support
> zk.connect|broker.list options
> ---------------------------------------------------------------------------------------------------
>
> Key: KAFKA-244
> URL: https://issues.apache.org/jira/browse/KAFKA-244
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Affects Versions: 0.7.1
> Reporter: Stefano Santoro
> Labels: log4j, newbie
> Fix For: 0.7.1
>
> Attachments: Log4jAppender.patch,
> Log4jAppenderWithWorkingZkConnect.patch
>
>
> Taken from #kafka IRC session with Neha Narkhede:
> The log4j appender is quite obsolete, there are a few things to change there.
> Make it use the kafka.producer.Producer instead of SyncProducer. That allows
> you to use either the broker.list or the zk.connect option
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira