Exception in thread "main" java.lang.ExceptionInInitializerError

2014-11-23 Thread Haoming Zhang



Hi all,

Basically I used a lot of codes from this project 
https://github.com/stealthly/scala-kafka , my idea is to sent a key/value pair 
to Kafka, so that I can design a partition function in the further.

I checked the document and seems I should create a ProducerRecord, then I can 
specify a partition or key. Follows the codes from stealehly's project, I 
created a test main function as following:
import org.apache.kafka.clients.producer.{ KafkaProducer => NewKafkaProducer }
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties

object Test extends App {
val testMessage = UUID.randomUUID().toString
val testTopic = "0e7fa3c2-1b75-407b-a03c-f40679ea3ce9"
val groupId_1 = UUID.randomUUID().toString
  
val brokerList: String = "localhost:9092"
val acks: Int = -1
val metadataFetchTimeout: Long = 3000L
val blockOnBufferFull: Boolean = true
val bufferSize: Long = 1024L * 1024L
val retries: Int = 0

val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, 
metadataFetchTimeout.toString)
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, 
blockOnBufferFull.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")

val producer = new NewKafkaProducer(producerProps)

val record = new ProducerRecord(testTopic, 0, "key".getBytes, 
testMessage.getBytes)
producer.send(record)

val consumer = new KafkaConsumer(testTopic, groupId_1, "localhost:2181")
  
def exec(binaryObject: Array[Byte]) = {
  val message = new String(binaryObject)
  print("testMessage = " + testMessage + " and consumed message = " + 
message)
  consumer.close()
}
  
print("KafkaSpec is waiting some seconds")
consumer.read(exec)
print("KafkaSpec consumed")
}

The KafkaConsumer class is exactly as the same as stealehly's project: 
https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaConsumer.scala

But when I tried to run the test program, I got the following exception:
[2014-11-23 21:15:36,461] INFO ProducerConfig values: 
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 33554432
batch.size = 16384
metrics.sample.window.ms = 3
metadata.max.age.ms = 30
receive.buffer.bytes = 32768
timeout.ms = 3
max.in.flight.requests.per.connection = 5
metric.reporters = []
bootstrap.servers = [localhost:9092]
client.id = 
compression.type = none
retries = 0
max.request.size = 1048576
send.buffer.bytes = 131072
acks = 1
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 6
 (org.apache.kafka.clients.producer.ProducerConfig)
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:243)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:255)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:64)
at Test$delayedInit$body.apply(Test.scala:47)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at Test$.main(Test.scala:21)
at Test.main(Test.scala)
Caused by: java.lang.NullPointerException
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:98)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:94)
at 
kafka.producer.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.scala:63)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:257)
at 
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:133)
at 
org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:97)
at 
org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:689)
at 
org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:647)
at 
org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:568)
at 
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:442)
at 
org.apa

Re: Kafka restart takes a long time

2014-11-23 Thread Rajiv Kurian
Thanks every one. I"ll try to clean up the disk space and try again.

On Sun, Nov 23, 2014 at 8:47 AM, Jason Rosenberg  wrote:

> Rajiv,
>
> So, any time a broker's disk fills up, it will shut itself down immediately
> (it will do this in response to any IO error on writing to disk).
> Unfortunately, this means that the node will not be able to do any
> housecleaning before shutdown, which is an 'unclean' shutdown.  This means
> that when it restarts, it needs to reset the data to the last known
> checkpoint.  If the partition is replicated, and it can restore it from
> another broker, it will try to do that (but it doesn't sound like it can do
> that in your case, since all the other nodes are down too).
>
> There is a fix coming in 0.8.2 that will allow a broker to restore multiple
> partitions in parallel (but the current behavior in 0.8.1.1 and prior is to
> restore partitions 1 by 1). See:
> https://issues.apache.org/jira/browse/KAFKA-1414.  This fix should speed
> things up greatly when you have a large number of partitions.
>
> If a disk is full, the broker will refuse to even start up (or will fail
> immediately on the first write attempt and shut itself down).  So,
> generally, in this event, you need to clear some disk space before trying
> to restart the server.
>
> The bottom line is that you don't want any of your brokers to run out of
> disk space (thus you need to have good monitoring/alerting for advance
> warning on this).  Kafka doesn't attempt to detect if it's about to run out
> of space and die, so you have to manage that and guard against it outside
> of kafka.
>
> Jason
>
> On Sat, Nov 22, 2014 at 5:27 PM, Harsha  wrote:
>
> > It might logs check your kafka logs dir (server logs) . Kafka can
> > produce lot of logs in a quick time make sure thats whats in play here.
> > -Harsha
> >
> > On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote:
> > > Actually see a bunch of errors. One of the brokers is out of space and
> > > this
> > > might be causing everything to spin out of control.
> > >
> > > Some logs:
> > >
> > > On *broker 1* (the one that has run out of space):
> > >
> > > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13  ]
> > > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13],
> Disk
> > > error while replicating data.
> > >
> > > kafka.common.KafkaStorageException: I/O exception in append to log
> > > 'mytopic-633'
> > >
> > > at kafka.log.Log.append(Log.scala:283)
> > >
> > > at
> > >
> >
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
> > >
> > > at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> > >
> > > at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> > >
> > > at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> > >
> > > at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > > at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> > >
> > > at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> > >
> > > at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> > >
> > > at
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> > >
> > > at kafka.utils.Utils$.inLock(Utils.scala:538)
> > >
> > > at
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> > >
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > >
> > > at
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > > Caused by: java.io.IOException: No space left on device
> > >
> > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> > >
> > > at
> > > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
> > >
> > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> > >
> > > at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> > >
> > > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
> > >
> > > at
> > >
> >
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)
> > >
> > > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> > >
> > > at kafka.log.LogSegment.append(LogSegment.scala:80)
> > >
> > > at kafka.log.Log.append(Log.scala:269)
> > >
> > > ... 13 more
> > >
> > > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13  ]
> > > [kafka.server.ReplicaFetc

Re: Kafka restart takes a long time

2014-11-23 Thread Jason Rosenberg
Rajiv,

So, any time a broker's disk fills up, it will shut itself down immediately
(it will do this in response to any IO error on writing to disk).
Unfortunately, this means that the node will not be able to do any
housecleaning before shutdown, which is an 'unclean' shutdown.  This means
that when it restarts, it needs to reset the data to the last known
checkpoint.  If the partition is replicated, and it can restore it from
another broker, it will try to do that (but it doesn't sound like it can do
that in your case, since all the other nodes are down too).

There is a fix coming in 0.8.2 that will allow a broker to restore multiple
partitions in parallel (but the current behavior in 0.8.1.1 and prior is to
restore partitions 1 by 1). See:
https://issues.apache.org/jira/browse/KAFKA-1414.  This fix should speed
things up greatly when you have a large number of partitions.

If a disk is full, the broker will refuse to even start up (or will fail
immediately on the first write attempt and shut itself down).  So,
generally, in this event, you need to clear some disk space before trying
to restart the server.

The bottom line is that you don't want any of your brokers to run out of
disk space (thus you need to have good monitoring/alerting for advance
warning on this).  Kafka doesn't attempt to detect if it's about to run out
of space and die, so you have to manage that and guard against it outside
of kafka.

Jason

On Sat, Nov 22, 2014 at 5:27 PM, Harsha  wrote:

> It might logs check your kafka logs dir (server logs) . Kafka can
> produce lot of logs in a quick time make sure thats whats in play here.
> -Harsha
>
> On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote:
> > Actually see a bunch of errors. One of the brokers is out of space and
> > this
> > might be causing everything to spin out of control.
> >
> > Some logs:
> >
> > On *broker 1* (the one that has run out of space):
> >
> > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13  ]
> > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13], Disk
> > error while replicating data.
> >
> > kafka.common.KafkaStorageException: I/O exception in append to log
> > 'mytopic-633'
> >
> > at kafka.log.Log.append(Log.scala:283)
> >
> > at
> >
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
> >
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >
> > at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> > at kafka.utils.Utils$.inLock(Utils.scala:538)
> >
> > at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> > at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Caused by: java.io.IOException: No space left on device
> >
> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> >
> > at
> > sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
> >
> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> >
> > at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> >
> > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
> >
> > at
> >
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)
> >
> > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> >
> > at kafka.log.LogSegment.append(LogSegment.scala:80)
> >
> > at kafka.log.Log.append(Log.scala:269)
> >
> > ... 13 more
> >
> > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13  ]
> > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-2-13],
> > Error
> > getting offset for partition [myTopic,0] to broker 13
> >
> > java.io.IOException: No space left on device
> >
> > at java.io.FileOutputStream.writeBytes(Native Method)
> >
> > at java.io.FileOutputStream.write(FileOutputStream.java:345)
> >
> > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> >
> 

issues using the new 0.8.2 producer

2014-11-23 Thread Shlomi Hazan
Hi,
Started to dig into that new producer and have a few questions:
1. what part (if any) of the old producer config still apply to the new
producer or is it just what is specified on "New Producer Configs"?
2. how do you specify a partitioner to the new producer? if no such option,
what usage is made with the given key? is it simply hashed with Java's
String API?
3. the javadoc example (

ProducerRecord record = new ProducerRecord("the-topic", "key, "value");

) is incorrect and shows as if creating a producer record takes 3 strings
whereas it takes byte arrays for the last two arguments. will the final API
be the one documented or rather the one implemented?

I am really missing a working example for the new producer so if anyone has
one I will be happy to get inspired...
Shlomi