Exception in thread "main" java.lang.ExceptionInInitializerError
Hi all, Basically I used a lot of codes from this project https://github.com/stealthly/scala-kafka , my idea is to sent a key/value pair to Kafka, so that I can design a partition function in the further. I checked the document and seems I should create a ProducerRecord, then I can specify a partition or key. Follows the codes from stealehly's project, I created a test main function as following: import org.apache.kafka.clients.producer.{ KafkaProducer => NewKafkaProducer } import org.apache.kafka.clients.producer.ProducerConfig import java.util.Properties object Test extends App { val testMessage = UUID.randomUUID().toString val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9" val groupId_1 = UUID.randomUUID().toString val brokerList: String = "localhost:9092" val acks: Int = -1 val metadataFetchTimeout: Long = 3000L val blockOnBufferFull: Boolean = true val bufferSize: Long = 1024L * 1024L val retries: Int = 0 val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") val producer = new NewKafkaProducer(producerProps) val record = new ProducerRecord(testTopic, 0, "key".getBytes, testMessage.getBytes) producer.send(record) val consumer = new KafkaConsumer(testTopic, groupId_1, "localhost:2181") def exec(binaryObject: Array[Byte]) = { val message = new String(binaryObject) print("testMessage = " + testMessage + " and consumed message = " + message) consumer.close() } print("KafkaSpec is waiting some seconds") consumer.read(exec) print("KafkaSpec consumed") } The KafkaConsumer class is exactly as the same as stealehly's project: https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala But when I tried to run the test program, I got the following exception: [2014-11-23 21:15:36,461] INFO ProducerConfig values: block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 33554432 batch.size = 16384 metrics.sample.window.ms = 3 metadata.max.age.ms = 30 receive.buffer.bytes = 32768 timeout.ms = 3 max.in.flight.requests.per.connection = 5 metric.reporters = [] bootstrap.servers = [localhost:9092] client.id = compression.type = none retries = 0 max.request.size = 1048576 send.buffer.bytes = 131072 acks = 1 reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 6 (org.apache.kafka.clients.producer.ProducerConfig) Exception in thread "main" java.lang.ExceptionInInitializerError at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:64) at Test$delayedInit$body.apply(Test.scala:47) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(Test.scala:21) at Test.main(Test.scala) Caused by: java.lang.NullPointerException at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:98) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:94) at kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63) at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257) at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133) at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97) at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689) at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647) at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568) at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442) at org.apa
Re: Kafka restart takes a long time
Thanks every one. I"ll try to clean up the disk space and try again. On Sun, Nov 23, 2014 at 8:47 AM, Jason Rosenberg wrote: > Rajiv, > > So, any time a broker's disk fills up, it will shut itself down immediately > (it will do this in response to any IO error on writing to disk). > Unfortunately, this means that the node will not be able to do any > housecleaning before shutdown, which is an 'unclean' shutdown. This means > that when it restarts, it needs to reset the data to the last known > checkpoint. If the partition is replicated, and it can restore it from > another broker, it will try to do that (but it doesn't sound like it can do > that in your case, since all the other nodes are down too). > > There is a fix coming in 0.8.2 that will allow a broker to restore multiple > partitions in parallel (but the current behavior in 0.8.1.1 and prior is to > restore partitions 1 by 1). See: > https://issues.apache.org/jira/browse/KAFKA-1414. This fix should speed > things up greatly when you have a large number of partitions. > > If a disk is full, the broker will refuse to even start up (or will fail > immediately on the first write attempt and shut itself down). So, > generally, in this event, you need to clear some disk space before trying > to restart the server. > > The bottom line is that you don't want any of your brokers to run out of > disk space (thus you need to have good monitoring/alerting for advance > warning on this). Kafka doesn't attempt to detect if it's about to run out > of space and die, so you have to manage that and guard against it outside > of kafka. > > Jason > > On Sat, Nov 22, 2014 at 5:27 PM, Harsha wrote: > > > It might logs check your kafka logs dir (server logs) . Kafka can > > produce lot of logs in a quick time make sure thats whats in play here. > > -Harsha > > > > On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote: > > > Actually see a bunch of errors. One of the brokers is out of space and > > > this > > > might be causing everything to spin out of control. > > > > > > Some logs: > > > > > > On *broker 1* (the one that has run out of space): > > > > > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13 ] > > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-1-13], > Disk > > > error while replicating data. > > > > > > kafka.common.KafkaStorageException: I/O exception in append to log > > > 'mytopic-633' > > > > > > at kafka.log.Log.append(Log.scala:283) > > > > > > at > > > > > > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > > > at kafka.utils.Utils$.inLock(Utils.scala:538) > > > > > > at > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > > > > > at > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > > > at > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > Caused by: java.io.IOException: No space left on device > > > > > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > > > > > > at > > > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > > > > > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > > > > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > > > > > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > > > > > > at > > > > > > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132) > > > > > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) > > > > > > at kafka.log.LogSegment.append(LogSegment.scala:80) > > > > > > at kafka.log.Log.append(Log.scala:269) > > > > > > ... 13 more > > > > > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13 ] > > > [kafka.server.ReplicaFetc
Re: Kafka restart takes a long time
Rajiv, So, any time a broker's disk fills up, it will shut itself down immediately (it will do this in response to any IO error on writing to disk). Unfortunately, this means that the node will not be able to do any housecleaning before shutdown, which is an 'unclean' shutdown. This means that when it restarts, it needs to reset the data to the last known checkpoint. If the partition is replicated, and it can restore it from another broker, it will try to do that (but it doesn't sound like it can do that in your case, since all the other nodes are down too). There is a fix coming in 0.8.2 that will allow a broker to restore multiple partitions in parallel (but the current behavior in 0.8.1.1 and prior is to restore partitions 1 by 1). See: https://issues.apache.org/jira/browse/KAFKA-1414. This fix should speed things up greatly when you have a large number of partitions. If a disk is full, the broker will refuse to even start up (or will fail immediately on the first write attempt and shut itself down). So, generally, in this event, you need to clear some disk space before trying to restart the server. The bottom line is that you don't want any of your brokers to run out of disk space (thus you need to have good monitoring/alerting for advance warning on this). Kafka doesn't attempt to detect if it's about to run out of space and die, so you have to manage that and guard against it outside of kafka. Jason On Sat, Nov 22, 2014 at 5:27 PM, Harsha wrote: > It might logs check your kafka logs dir (server logs) . Kafka can > produce lot of logs in a quick time make sure thats whats in play here. > -Harsha > > On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote: > > Actually see a bunch of errors. One of the brokers is out of space and > > this > > might be causing everything to spin out of control. > > > > Some logs: > > > > On *broker 1* (the one that has run out of space): > > > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13 ] > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-1-13], Disk > > error while replicating data. > > > > kafka.common.KafkaStorageException: I/O exception in append to log > > 'mytopic-633' > > > > at kafka.log.Log.append(Log.scala:283) > > > > at > > > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) > > > > at > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > > > > at > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > at > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > at > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) > > > > at kafka.utils.Utils$.inLock(Utils.scala:538) > > > > at > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) > > > > at > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > at > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > Caused by: java.io.IOException: No space left on device > > > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > > > > at > > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > > > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > > > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > > > > at > > > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132) > > > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210) > > > > at kafka.log.LogSegment.append(LogSegment.scala:80) > > > > at kafka.log.Log.append(Log.scala:269) > > > > ... 13 more > > > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13 ] > > [kafka.server.ReplicaFetcherThread ]: [ReplicaFetcherThread-2-13], > > Error > > getting offset for partition [myTopic,0] to broker 13 > > > > java.io.IOException: No space left on device > > > > at java.io.FileOutputStream.writeBytes(Native Method) > > > > at java.io.FileOutputStream.write(FileOutputStream.java:345) > > > > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > > >
issues using the new 0.8.2 producer
Hi, Started to dig into that new producer and have a few questions: 1. what part (if any) of the old producer config still apply to the new producer or is it just what is specified on "New Producer Configs"? 2. how do you specify a partitioner to the new producer? if no such option, what usage is made with the given key? is it simply hashed with Java's String API? 3. the javadoc example ( ProducerRecord record = new ProducerRecord("the-topic", "key, "value"); ) is incorrect and shows as if creating a producer record takes 3 strings whereas it takes byte arrays for the last two arguments. will the final API be the one documented or rather the one implemented? I am really missing a working example for the new producer so if anyone has one I will be happy to get inspired... Shlomi