Hi all,

Basically I used a lot of codes from this project 
https://github.com/stealthly/scala-kafka , my idea is to sent a key/value pair 
to Kafka, so that I can design a partition function in the further.

I checked the document and seems I should create a ProducerRecord, then I can 
specify a partition or key. Follows the codes from stealehly's project, I 
created a test main function as following:
import org.apache.kafka.clients.producer.{ KafkaProducer => NewKafkaProducer }
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties

object Test extends App {
    val testMessage = UUID.randomUUID().toString
    val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
    val groupId_1 = UUID.randomUUID().toString
  
    val brokerList: String = "localhost:9092"
    val acks: Int = -1
    val metadataFetchTimeout: Long = 3000L
    val blockOnBufferFull: Boolean = true
    val bufferSize: Long = 1024L * 1024L
    val retries: Int = 0

    val producerProps = new Properties()
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
    producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
    producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, 
metadataFetchTimeout.toString)
    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, 
blockOnBufferFull.toString)
    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
    producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
    producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")

    val producer = new NewKafkaProducer(producerProps)
    
    val record = new ProducerRecord(testTopic, 0, "key".getBytes, 
testMessage.getBytes)
    producer.send(record)

    val consumer = new KafkaConsumer(testTopic, groupId_1, "localhost:2181")
  
    def exec(binaryObject: Array[Byte]) = {
      val message = new String(binaryObject)
      print("testMessage = " + testMessage + " and consumed message = " + 
message)
      consumer.close()
    }
  
    print("KafkaSpec is waiting some seconds")
    consumer.read(exec)
    print("KafkaSpec consumed")
}

The KafkaConsumer class is exactly as the same as stealehly's project: 
https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala

But when I tried to run the test program, I got the following exception:
[2014-11-23 21:15:36,461] INFO ProducerConfig values: 
    block.on.buffer.full = true
    retry.backoff.ms = 100
    buffer.memory = 33554432
    batch.size = 16384
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    receive.buffer.bytes = 32768
    timeout.ms = 30000
    max.in.flight.requests.per.connection = 5
    metric.reporters = []
    bootstrap.servers = [localhost:9092]
    client.id = 
    compression.type = none
    retries = 0
    max.request.size = 1048576
    send.buffer.bytes = 131072
    acks = 1
    reconnect.backoff.ms = 10
    linger.ms = 0
    metrics.num.samples = 2
    metadata.fetch.timeout.ms = 60000
 (org.apache.kafka.clients.producer.ProducerConfig)
Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
    at 
org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64)
    at Test$delayedInit$body.apply(Test.scala:47)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(Test.scala:21)
    at Test.main(Test.scala)
Caused by: java.lang.NullPointerException
    at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98)
    at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94)
    at 
kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
    at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
    at 
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
    at 
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
    at 
org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
    at 
org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
    at 
org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
    at 
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
    at 
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476)
    at 
org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471)
    at org.apache.log4j.LogManager.<clinit>(LogManager.java:125)
    ... 14 more

I think it should be an error came from ProducerConfig, I spent time on find a 
solution, but no luck. Does anyone could give me a hit/help please?

Regards,
Haoming

                                          

Reply via email to