It seems you hit an exception when instantiating the sfl4j logger inside
the producer.

Thanks,

Jun

On Sun, Nov 23, 2014 at 9:29 PM, Haoming Zhang <haoming.zh...@outlook.com>
wrote:

>
>
>
> Hi all,
>
> Basically I used a lot of codes from this project
> https://github.com/stealthly/scala-kafka , my idea is to sent a key/value
> pair to Kafka, so that I can design a partition function in the further.
>
> I checked the document and seems I should create a ProducerRecord, then I
> can specify a partition or key. Follows the codes from stealehly's project,
> I created a test main function as following:
> import org.apache.kafka.clients.producer.{ KafkaProducer =>
> NewKafkaProducer }
> import org.apache.kafka.clients.producer.ProducerConfig
> import java.util.Properties
>
> object Test extends App {
>     val testMessage = UUID.randomUUID().toString
>     val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
>     val groupId_1 = UUID.randomUUID().toString
>
>     val brokerList: String = "localhost:9092"
>     val acks: Int = -1
>     val metadataFetchTimeout: Long = 3000L
>     val blockOnBufferFull: Boolean = true
>     val bufferSize: Long = 1024L * 1024L
>     val retries: Int = 0
>
>     val producerProps = new Properties()
>     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
>     producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
>     producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,
> metadataFetchTimeout.toString)
>     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG,
> blockOnBufferFull.toString)
>     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> bufferSize.toString)
>     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
>     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
>     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
>
>     val producer = new NewKafkaProducer(producerProps)
>
>     val record = new ProducerRecord(testTopic, 0, "key".getBytes,
> testMessage.getBytes)
>     producer.send(record)
>
>     val consumer = new KafkaConsumer(testTopic, groupId_1,
> "localhost:2181")
>
>     def exec(binaryObject: Array[Byte]) = {
>       val message = new String(binaryObject)
>       print("testMessage = " + testMessage + " and consumed message = " +
> message)
>       consumer.close()
>     }
>
>     print("KafkaSpec is waiting some seconds")
>     consumer.read(exec)
>     print("KafkaSpec consumed")
> }
>
> The KafkaConsumer class is exactly as the same as stealehly's project:
> https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala
>
> But when I tried to run the test program, I got the following exception:
> [2014-11-23 21:15:36,461] INFO ProducerConfig values:
>     block.on.buffer.full = true
>     retry.backoff.ms = 100
>     buffer.memory = 33554432
>     batch.size = 16384
>     metrics.sample.window.ms = 30000
>     metadata.max.age.ms = 300000
>     receive.buffer.bytes = 32768
>     timeout.ms = 30000
>     max.in.flight.requests.per.connection = 5
>     metric.reporters = []
>     bootstrap.servers = [localhost:9092]
>     client.id =
>     compression.type = none
>     retries = 0
>     max.request.size = 1048576
>     send.buffer.bytes = 131072
>     acks = 1
>     reconnect.backoff.ms = 10
>     linger.ms = 0
>     metrics.num.samples = 2
>     metadata.fetch.timeout.ms = 60000
>  (org.apache.kafka.clients.producer.ProducerConfig)
> Exception in thread "main" java.lang.ExceptionInInitializerError
>     at
> org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
>     at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
>     at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
>     at
> org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64)
>     at Test$delayedInit$body.apply(Test.scala:47)
>     at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>     at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>     at scala.App$$anonfun$main$1.apply(App.scala:71)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>     at scala.App$class.main(App.scala:71)
>     at Test$.main(Test.scala:21)
>     at Test.main(Test.scala)
> Caused by: java.lang.NullPointerException
>     at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98)
>     at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94)
>     at
> kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
>     at
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
>     at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
>     at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
>     at
> org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
>     at
> org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
>     at
> org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
>     at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
>     at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476)
>     at
> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471)
>     at org.apache.log4j.LogManager.<clinit>(LogManager.java:125)
>     ... 14 more
>
> I think it should be an error came from ProducerConfig, I spent time on
> find a solution, but no luck. Does anyone could give me a hit/help please?
>
> Regards,
> Haoming
>
>

Reply via email to