It seems you hit an exception when instantiating the sfl4j logger inside the producer.
Thanks, Jun On Sun, Nov 23, 2014 at 9:29 PM, Haoming Zhang <haoming.zh...@outlook.com> wrote: > > > > Hi all, > > Basically I used a lot of codes from this project > https://github.com/stealthly/scala-kafka , my idea is to sent a key/value > pair to Kafka, so that I can design a partition function in the further. > > I checked the document and seems I should create a ProducerRecord, then I > can specify a partition or key. Follows the codes from stealehly's project, > I created a test main function as following: > import org.apache.kafka.clients.producer.{ KafkaProducer => > NewKafkaProducer } > import org.apache.kafka.clients.producer.ProducerConfig > import java.util.Properties > > object Test extends App { > val testMessage = UUID.randomUUID().toString > val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9" > val groupId_1 = UUID.randomUUID().toString > > val brokerList: String = "localhost:9092" > val acks: Int = -1 > val metadataFetchTimeout: Long = 3000L > val blockOnBufferFull: Boolean = true > val bufferSize: Long = 1024L * 1024L > val retries: Int = 0 > > val producerProps = new Properties() > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) > producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) > producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, > metadataFetchTimeout.toString) > producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, > blockOnBufferFull.toString) > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, > bufferSize.toString) > producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) > producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") > producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") > > val producer = new NewKafkaProducer(producerProps) > > val record = new ProducerRecord(testTopic, 0, "key".getBytes, > testMessage.getBytes) > producer.send(record) > > val consumer = new KafkaConsumer(testTopic, groupId_1, > "localhost:2181") > > def exec(binaryObject: Array[Byte]) = { > val message = new String(binaryObject) > print("testMessage = " + testMessage + " and consumed message = " + > message) > consumer.close() > } > > print("KafkaSpec is waiting some seconds") > consumer.read(exec) > print("KafkaSpec consumed") > } > > The KafkaConsumer class is exactly as the same as stealehly's project: > https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala > > But when I tried to run the test program, I got the following exception: > [2014-11-23 21:15:36,461] INFO ProducerConfig values: > block.on.buffer.full = true > retry.backoff.ms = 100 > buffer.memory = 33554432 > batch.size = 16384 > metrics.sample.window.ms = 30000 > metadata.max.age.ms = 300000 > receive.buffer.bytes = 32768 > timeout.ms = 30000 > max.in.flight.requests.per.connection = 5 > metric.reporters = [] > bootstrap.servers = [localhost:9092] > client.id = > compression.type = none > retries = 0 > max.request.size = 1048576 > send.buffer.bytes = 131072 > acks = 1 > reconnect.backoff.ms = 10 > linger.ms = 0 > metrics.num.samples = 2 > metadata.fetch.timeout.ms = 60000 > (org.apache.kafka.clients.producer.ProducerConfig) > Exception in thread "main" java.lang.ExceptionInInitializerError > at > org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73) > at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243) > at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255) > at > org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64) > at Test$delayedInit$body.apply(Test.scala:47) > at scala.Function0$class.apply$mcV$sp(Function0.scala:40) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:71) > at scala.App$$anonfun$main$1.apply(App.scala:71) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) > at scala.App$class.main(App.scala:71) > at Test$.main(Test.scala:21) > at Test.main(Test.scala) > Caused by: java.lang.NullPointerException > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94) > at > kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63) > at > org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257) > at > org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133) > at > org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97) > at > org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689) > at > org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647) > at > org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568) > at > org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442) > at > org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476) > at > org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471) > at org.apache.log4j.LogManager.<clinit>(LogManager.java:125) > ... 14 more > > I think it should be an error came from ProducerConfig, I spent time on > find a solution, but no luck. Does anyone could give me a hit/help please? > > Regards, > Haoming > >