I think it tries to add the logging in Kafka itself back to the KafkaAppender.
This creates an infinite loop. Maybe you could try setting the log level in
Kafka package to OFF?

Thanks,

Jun


On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <style9...@gmail.com> wrote:

> I made simple log4j kafka appender.
> I copied most of the code from 0.8.0 Producer example in "
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example";
> to code "append" function.
>
> I confirmed producer example code is working with my environment.
> But when I use same logic for log4j appender, it didn`t work.
> It is trying to fetch metadata repeatedly and I am getting infinite
> "Utils$.swallowError" error.
>
> I have no idea on swallowError.
> It looks it failed to fetch metadata from broker, it is trying again and
> again.
> Max retries count is just 3, but I don`t know why it happens.
>
> Are there anything that should be done to produce log data into Kafka via
> log4j Appender?
>
>
> ---------------------------------------------------------------------------------------------------------------------------------
> INFO [main] (Logging.scala:67) - Verifying properties
>  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> overridden to kafka01:9092
>  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
>  INFO [main] (Logging.scala:67) - Property request.required.acks is
> overridden to 1
>  INFO [main] (Logging.scala:67) - Property partitioner.class is overridden
> to com.samsung.rtdp.SimplePartitioner2
>  INFO [main] (Logging.scala:67) - Property serializer.class is overridden
> to kafka.serializer.StringEncoder
>  INFO [main] (HelloWorld.java:14) - Entering application.
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> Set(KafkaAppenderTest)
>  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> Set(KafkaAppenderTest)
> .
> .
> .
> java.lang.StackOverflowError
> at java.lang.StringCoding.deref(StringCoding.java:64)
> at java.lang.StringCoding.encode(StringCoding.java:275)
> at java.lang.String.getBytes(String.java:954)
> at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> at java.io.File.exists(File.java:791)
> at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> at
>
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> at
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at
>
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.error(Category.java:322)
> at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> at kafka.utils.Utils$.swallow(Utils.scala:189)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at
>
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.info(Category.java:666)
> at kafka.utils.Logging$class.info(Logging.scala:67)
> at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:187)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> .
> .
> .
>
> ----------------------------------------------------------------------------------------------------------------------------------
>
>
> Here is my code.
> Since it came from producer example code, it is quite straightforward.
>
> -------------------------------------------------------------------------------------------------------------------------
> package com.samsung.rtdp;
>
> import java.io.IOException;
> import java.util.Date;
> import java.util.Properties;
> import java.util.Random;
>
> import org.apache.log4j.AppenderSkeleton;
> import org.apache.log4j.spi.ErrorCode;
> import org.apache.log4j.spi.LoggingEvent;
>
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.Partitioner;
> import kafka.producer.ProducerConfig;
> import kafka.utils.VerifiableProperties;
>
>
> public class KafkaAppender extends AppenderSkeleton {
>
> private String brokerList;
> private String serializer;
> private String partitioner;
> private String topic;
> private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
>
> private Properties props;
> private ProducerConfig config;
> private Producer<String, String> producer;
>
> public void setBrokerList(String brokerList) { this.brokerList =
> brokerList; }
> public String getBrokerList() { return this.brokerList; }
>
> public void setSerializerClass(String serializer) { this.serializer =
> serializer; }
> public String getSerializer() { return this.serializer; }
>
> public void setPartitionerClass(String partitioner) { this.partitioner =
> partitioner; }
> public String getPartitioner() { return this.partitioner; }
>
> public void setTopic(String topic) { this.topic = topic; }
> public String getTopic() { return this.topic; }
>
>
> public void printParameters(){
> System.out.println("BrokerList : " + brokerList);
> System.out.println("Serializer Class : " + serializer);
> System.out.println("Partitioner Class : " + partitioner);
> System.out.println("Topic : " + topic);
> }
>
> public void activateOptions() {
>
> // printParameters();
>
> props = new Properties();
>
> props.put("metadata.broker.list", brokerList);
> props.put("serializer.class", serializer);
> props.put("partitioner.class", partitioner);
> props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
> props.put("zk.connect", "kafka01:2181");
>
> config = new ProducerConfig(props);
> producer = new Producer<String, String>(config);
> }
>
>
> public void close() {
> // TODO Auto-generated method stub
> producer.close();
>
> }
>
>
> public boolean requiresLayout() {
> // TODO Auto-generated method stub
> return true;
> }
>
> @Override
> protected void append(LoggingEvent event) {
> // TODO Auto-generated method stub
>
>
> // printParameters();
>
> if( this.layout == null )
> {
> errorHandler.error("No layout for appender " + name , null,
> ErrorCode.MISSING_LAYOUT );
> return;
> }
>
> String msg = this.layout.format(event);
>
>  KeyedMessage<String, String> data = new KeyedMessage<String,
> String>("KafkaAppenderTest", msg, msg);
>
> producer.send(data);
> }
>
> }
>
>
> -------------------------------------------------------------------------------------------------------------------------------------
>
>
>
> And this is my log4j.properties
>
> -------------------------------------------------------------------------------------------------------------------------------------
> log4j.rootLogger=INFO, stdout, KAFKA
> # set the logger for your package to be the KAFKA appender
> #log4j.logger.com.samsung.rtdp=INFO, KAFKA
>
> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
>
> # Pattern to output the caller's file name and line number.
> log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
>
> log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> log4j.appender.KAFKA.BrokerList=kafka01:9092
> log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> log4j.appender.KAFKA.Topic=test
> log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
>
> --------------------------------------------------------------------------------------------------------------------------------------
>

Reply via email to