Hi all,
Basically I used a lot of codes from this project
https://github.com/stealthly/scala-kafka , my idea is to sent a key/value pair
to Kafka, so that I can design a partition function in the further.
I checked the document and seems I should create a ProducerRecord, then I can
specify a partition or key. Follows the codes from stealehly's project, I
created a test main function as following:
import org.apache.kafka.clients.producer.{ KafkaProducer => NewKafkaProducer }
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties
object Test extends App {
val testMessage = UUID.randomUUID().toString
val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
val groupId_1 = UUID.randomUUID().toString
val brokerList: String = "localhost:9092"
val acks: Int = -1
val metadataFetchTimeout: Long = 3000L
val blockOnBufferFull: Boolean = true
val bufferSize: Long = 1024L * 1024L
val retries: Int = 0
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,
metadataFetchTimeout.toString)
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG,
blockOnBufferFull.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
val producer = new NewKafkaProducer(producerProps)
val record = new ProducerRecord(testTopic, 0, "key".getBytes,
testMessage.getBytes)
producer.send(record)
val consumer = new KafkaConsumer(testTopic, groupId_1, "localhost:2181")
def exec(binaryObject: Array[Byte]) = {
val message = new String(binaryObject)
print("testMessage = " + testMessage + " and consumed message = " +
message)
consumer.close()
}
print("KafkaSpec is waiting some seconds")
consumer.read(exec)
print("KafkaSpec consumed")
}
The KafkaConsumer class is exactly as the same as stealehly's project:
https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala
But when I tried to run the test program, I got the following exception:
[2014-11-23 21:15:36,461] INFO ProducerConfig values:
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 33554432
batch.size = 16384
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
timeout.ms = 30000
max.in.flight.requests.per.connection = 5
metric.reporters = []
bootstrap.servers = [localhost:9092]
client.id =
compression.type = none
retries = 0
max.request.size = 1048576
send.buffer.bytes = 131072
acks = 1
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 60000
(org.apache.kafka.clients.producer.ProducerConfig)
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
at
org.apache.kafka.clients.producer.KafkaProducer.<clinit>(KafkaProducer.java:64)
at Test$delayedInit$body.apply(Test.scala:47)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at Test$.main(Test.scala:21)
at Test.main(Test.scala)
Caused by: java.lang.NullPointerException
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:98)
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:94)
at
kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
at
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
at
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
at
org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
at
org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
at
org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
at
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
at
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:476)
at
org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:471)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:125)
... 14 more
I think it should be an error came from ProducerConfig, I spent time on find a
solution, but no luck. Does anyone could give me a hit/help please?
Regards,
Haoming