Re: can't produce message in kafka production

2014-12-18 Thread Neha Narkhede
The producer is complaining that it's socket channel is already closed.
Which makes me think it was closed due to some error that is not present in
your logs. I'd enable DEBUG and see if that shows the cause.

On Thu, Dec 18, 2014 at 4:13 PM, Gwen Shapira  wrote:
>
> Perhaps you have the logs from broker? It may show other errors that
> can help us troubleshoot.
>
> On Thu, Dec 18, 2014 at 4:11 PM, Sa Li  wrote:
> > Thanks, Gwen, I telnet it,
> > root@precise64:/etc/kafka# telnet 10.100.98.100 9092
> > Trying 10.100.98.100...
> > Connected to 10.100.98.100.
> > Escape character is '^]'.
> >
> > seems it connected, and I check with system operation people, netstate
> > should 9092 is listening. I am assuming this is the connection issue,
> since
> > I can run the same command to my dev-cluster with no problem at all,
> which
> > is 10.100.70.128:9092.
> >
> > Just in case, is it possibly caused by other types of issues?
> >
> > thanks
> >
> > Alec
> >
> > On Thu, Dec 18, 2014 at 2:33 PM, Gwen Shapira 
> wrote:
> >>
> >> Looks like you can't connect to: 10.100.98.100:9092
> >>
> >> I'd validate that this is the issue using telnet and then check the
> >> firewall / ipfilters settings.
> >>
> >> On Thu, Dec 18, 2014 at 2:21 PM, Sa Li  wrote:
> >> > Dear all
> >> >
> >> > We just build a kafka production cluster, I can create topics in kafka
> >> > production from another host. But when I am send very simple message
> as
> >> > producer, it generate such errors:
> >> >
> >> > root@precise64:/etc/kafka# bin/kafka-console-producer.sh
> --broker-list
> >> > 10.100.98.100:9092 --topic my-replicated-topic-production
> >> > SLF4J: Class path contains multiple SLF4J bindings.
> >> > SLF4J: Found binding in
> >> >
> >>
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> > SLF4J: Found binding in
> >> >
> >>
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> > explanation.
> >> > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> >> > my test message 1
> >> > [2014-12-18 21:44:25,830] WARN Failed to send producer request with
> >> > correlation id 2 to broker 101 with data for partitions
> >> > [my-replicated-topic-production,1]
> >> > (kafka.producer.async.DefaultEventHandler)
> >> > java.nio.channels.ClosedChannelException
> >> > at
> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> >> > at
> >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> >> > at
> >> >
> >>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> >> > at
> >> >
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> >> > at
> >> >
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> >> > at
> >> >
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> >> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> > at
> >> >
> >>
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
> >> > at
> >> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> >> > at
> >> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> >> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
> >> > at
> >> >
> >>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
> >> > at
> >> >
> >>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
> >> > at
> >> >
> >>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
> >> > at
> >> >
> >>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >> > at
> >> >
> >>
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >> > at
> >> >
> >>
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >> > at
> >> >
> >>
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> >> > at
> >> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> >> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> >> > at
> >> >
> >>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >> > at
> >> >
> >>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99

Re: can't produce message in kafka production

2014-12-18 Thread Gwen Shapira
Perhaps you have the logs from broker? It may show other errors that
can help us troubleshoot.

On Thu, Dec 18, 2014 at 4:11 PM, Sa Li  wrote:
> Thanks, Gwen, I telnet it,
> root@precise64:/etc/kafka# telnet 10.100.98.100 9092
> Trying 10.100.98.100...
> Connected to 10.100.98.100.
> Escape character is '^]'.
>
> seems it connected, and I check with system operation people, netstate
> should 9092 is listening. I am assuming this is the connection issue, since
> I can run the same command to my dev-cluster with no problem at all, which
> is 10.100.70.128:9092.
>
> Just in case, is it possibly caused by other types of issues?
>
> thanks
>
> Alec
>
> On Thu, Dec 18, 2014 at 2:33 PM, Gwen Shapira  wrote:
>>
>> Looks like you can't connect to: 10.100.98.100:9092
>>
>> I'd validate that this is the issue using telnet and then check the
>> firewall / ipfilters settings.
>>
>> On Thu, Dec 18, 2014 at 2:21 PM, Sa Li  wrote:
>> > Dear all
>> >
>> > We just build a kafka production cluster, I can create topics in kafka
>> > production from another host. But when I am send very simple message as
>> > producer, it generate such errors:
>> >
>> > root@precise64:/etc/kafka# bin/kafka-console-producer.sh --broker-list
>> > 10.100.98.100:9092 --topic my-replicated-topic-production
>> > SLF4J: Class path contains multiple SLF4J bindings.
>> > SLF4J: Found binding in
>> >
>> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> > SLF4J: Found binding in
>> >
>> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> > explanation.
>> > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> > my test message 1
>> > [2014-12-18 21:44:25,830] WARN Failed to send producer request with
>> > correlation id 2 to broker 101 with data for partitions
>> > [my-replicated-topic-production,1]
>> > (kafka.producer.async.DefaultEventHandler)
>> > java.nio.channels.ClosedChannelException
>> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>> > at
>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>> > at
>> >
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > at
>> >
>> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>> > at
>> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>> > at
>> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
>> > at
>> >
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> > at
>> >
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> > at
>> >
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> > at
>> >
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>> > at
>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>> > at
>> >
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
>> > at
>> >
>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>> > at
>> >
>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>> > at
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>> > at
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>> > at scala.collection.immutable.Stream.foreach(Stream.scala:

Re: can't produce message in kafka production

2014-12-18 Thread Sa Li
Thanks, Gwen, I telnet it,
root@precise64:/etc/kafka# telnet 10.100.98.100 9092
Trying 10.100.98.100...
Connected to 10.100.98.100.
Escape character is '^]'.

seems it connected, and I check with system operation people, netstate
should 9092 is listening. I am assuming this is the connection issue, since
I can run the same command to my dev-cluster with no problem at all, which
is 10.100.70.128:9092.

Just in case, is it possibly caused by other types of issues?

thanks

Alec

On Thu, Dec 18, 2014 at 2:33 PM, Gwen Shapira  wrote:
>
> Looks like you can't connect to: 10.100.98.100:9092
>
> I'd validate that this is the issue using telnet and then check the
> firewall / ipfilters settings.
>
> On Thu, Dec 18, 2014 at 2:21 PM, Sa Li  wrote:
> > Dear all
> >
> > We just build a kafka production cluster, I can create topics in kafka
> > production from another host. But when I am send very simple message as
> > producer, it generate such errors:
> >
> > root@precise64:/etc/kafka# bin/kafka-console-producer.sh --broker-list
> > 10.100.98.100:9092 --topic my-replicated-topic-production
> > SLF4J: Class path contains multiple SLF4J bindings.
> > SLF4J: Found binding in
> >
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > explanation.
> > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> > my test message 1
> > [2014-12-18 21:44:25,830] WARN Failed to send producer request with
> > correlation id 2 to broker 101 with data for partitions
> > [my-replicated-topic-production,1]
> > (kafka.producer.async.DefaultEventHandler)
> > java.nio.channels.ClosedChannelException
> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> > at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> > at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > at
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> > at
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> > at
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
> > at
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> > at
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
> > at
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
> > at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
> > at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> > at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> > at
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> > at
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
> > at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2014-12-18 21:44:25,948] WARN Failed to send producer request with

Re: Better exception handling in kafka.producer.async.DefaultEventHandler

2014-12-18 Thread Joe Stein
I would suggest to use the new client java producer in 0.8.2-beta
http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
it handles the case you brought up (among lots of other goodies).

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/

On Thu, Dec 18, 2014 at 4:27 PM, Xiaoyu Wang  wrote:
>
> Hello,
>
> I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler
> file. Below is the dispatchSerializedData function. Looks like we are
> catching exception outside the loop and purely logs an error message.
> We then return failedProduceRequests.
>
> In case one broker is having problem, messages that will be sent to
> brokers after the problematic broker will NOT be included in the
> failedTopicAndPartitions and will be ignored quietly. Is this correct?
> Shall we change the code to catch exception for sending message to
> each broker?
>
> Thanks
>
> private def dispatchSerializedData(messages:
> Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
>   val partitionedDataOpt = partitionAndCollate(messages)
>   partitionedDataOpt match {
> case Some(partitionedData) =>
>   val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
>   try {
>
> *  for ((brokerid, messagesPerBrokerMap) <- partitionedData) { *
>   if (logger.isTraceEnabled)
> messagesPerBrokerMap.foreach(partitionAndEvent =>
>   trace("Handling event for Topic: %s, Broker: %d,
> Partitions: %s".format(partitionAndEvent._1, brokerid,
> partitionAndEvent._2)))
>   val messageSetPerBroker =
> groupMessagesToSet(messagesPerBrokerMap)
>
>   val failedTopicPartitions = send(brokerid, messageSetPerBroker)
>   failedTopicPartitions.foreach(topicPartition => {
> messagesPerBrokerMap.get(topicPartition) match {
>   case Some(data) => failedProduceRequests.appendAll(data)
>   case None => // nothing
> }
>   })
> }
>
>
>
> *   } catch {case t: Throwable => error("Failed to send
> messages", t)  }  *failedProduceRequests
> case None => // all produce requests failed
>   messages
>   }
> }
>


Re: can't produce message in kafka production

2014-12-18 Thread Gwen Shapira
Looks like you can't connect to: 10.100.98.100:9092

I'd validate that this is the issue using telnet and then check the
firewall / ipfilters settings.

On Thu, Dec 18, 2014 at 2:21 PM, Sa Li  wrote:
> Dear all
>
> We just build a kafka production cluster, I can create topics in kafka
> production from another host. But when I am send very simple message as
> producer, it generate such errors:
>
> root@precise64:/etc/kafka# bin/kafka-console-producer.sh --broker-list
> 10.100.98.100:9092 --topic my-replicated-topic-production
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> my test message 1
> [2014-12-18 21:44:25,830] WARN Failed to send producer request with
> correlation id 2 to broker 101 with data for partitions
> [my-replicated-topic-production,1]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2014-12-18 21:44:25,948] WARN Failed to send producer request with
> correlation id 5 to broker 101 with data for partitions
> [my-replicated-topic-production,1]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply

can't produce message in kafka production

2014-12-18 Thread Sa Li
Dear all

We just build a kafka production cluster, I can create topics in kafka
production from another host. But when I am send very simple message as
producer, it generate such errors:

root@precise64:/etc/kafka# bin/kafka-console-producer.sh --broker-list
10.100.98.100:9092 --topic my-replicated-topic-production
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
my test message 1
[2014-12-18 21:44:25,830] WARN Failed to send producer request with
correlation id 2 to broker 101 with data for partitions
[my-replicated-topic-production,1]
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2014-12-18 21:44:25,948] WARN Failed to send producer request with
correlation id 5 to broker 101 with data for partitions
[my-replicated-topic-production,1]
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at k

RE: In Flight Requests

2014-12-18 Thread Orelowitz, David
Ray,

Thank you for the detailed explanation. I mistakenly thought this determined 
the number of messages that were outstanding - not the number of requests 
(batch of messages) that could be outstanding.

This makes sense.

Thank you.

-Original Message-
From: Jay Kreps [mailto:j...@confluent.io] 
Sent: Thursday, December 18, 2014 5:04 PM
To: users@kafka.apache.org
Subject: Re: In Flight Requests

Hi David,

Each request sent to Kafka gets acknowledged. The protocol allows multiple 
requests to be sent on a connection without waiting on a connection. The number 
of requests currently awaiting acknowledgement is the in flight request count. 
By default once there are five unacknowledged requests on a connection the 
client will wait for one of these to get acknowledged before sending the next 
request to that node.

In general this is a fairly low-level performance tuning parameter and not one 
you likely need to monkey with unless you are trying to eek the last bit of 
performance out.

The impact is the following: sending more than one request at a time is good as 
you can have one request being sent while the previous one is being processed. 
So allowing more than one in-flight request makes sense. However allowing an 
unbounded number of in-flight requests is actually not a good thing--the reason 
is that the client will batch together messages that come in at the same time 
for the same topic/partition and these big batches are much more efficient to 
process than single message requests. This is one of the primary performance 
optimizations in the producer (see 
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines).
So once we have a few requests in flight, it is best to stop sending more and 
more small requests (which are just going to queue up in the network stack 
anyway) and let the messages batch together into big, efficient batch requests.

The impact of this is that the more message acknowledgement falls behind, the 
more efficient the producer gets at creating big batch requests, and hence the 
better throughput gets. This means let's the producer dynamically batch for 
latency under load without any manually configured backoff.

In my testing this setting actually isn't very sensitive as long as there is 
some reasonable cap that forces the batching to kick in as requests back up.

We'll get the docs for this in with the final 0.8.2 release.

-Jay

On Thu, Dec 18, 2014 at 1:38 PM, Orelowitz, David 
wrote:
>
> I notice that there is parameter max.in.flight.requests.per.connection 
> in the ProducerConfig.java code that can be set. It is defaulted to 5.
>
> This is not documented in the 0.8.2 documentation.
>
> http://kafka.apache.org/082/documentation.html#newproducerconfigs
>
> Is it something we should play with? Does anyone have any experience 
> with the parameter?
>
> Thanks,
> David
>
>
> --
> This message, and any attachments, is for the intended recipient(s) 
> only, may contain information that is privileged, confidential and/or 
> proprietary and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> intended recipient, please delete this message.
>

--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


Re: In Flight Requests

2014-12-18 Thread Jay Kreps
Hi David,

Each request sent to Kafka gets acknowledged. The protocol allows multiple
requests to be sent on a connection without waiting on a connection. The
number of requests currently awaiting acknowledgement is the in flight
request count. By default once there are five unacknowledged requests on a
connection the client will wait for one of these to get acknowledged before
sending the next request to that node.

In general this is a fairly low-level performance tuning parameter and not
one you likely need to monkey with unless you are trying to eek the last
bit of performance out.

The impact is the following: sending more than one request at a time is
good as you can have one request being sent while the previous one is being
processed. So allowing more than one in-flight request makes sense. However
allowing an unbounded number of in-flight requests is actually not a good
thing--the reason is that the client will batch together messages that come
in at the same time for the same topic/partition and these big batches are
much more efficient to process than single message requests. This is one of
the primary performance optimizations in the producer (see
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines).
So once we have a few requests in flight, it is best to stop sending more
and more small requests (which are just going to queue up in the network
stack anyway) and let the messages batch together into big, efficient batch
requests.

The impact of this is that the more message acknowledgement falls behind,
the more efficient the producer gets at creating big batch requests, and
hence the better throughput gets. This means let's the producer dynamically
batch for latency under load without any manually configured backoff.

In my testing this setting actually isn't very sensitive as long as there
is some reasonable cap that forces the batching to kick in as requests back
up.

We'll get the docs for this in with the final 0.8.2 release.

-Jay

On Thu, Dec 18, 2014 at 1:38 PM, Orelowitz, David 
wrote:
>
> I notice that there is parameter max.in.flight.requests.per.connection in
> the ProducerConfig.java code that can be set. It is defaulted to 5.
>
> This is not documented in the 0.8.2 documentation.
>
> http://kafka.apache.org/082/documentation.html#newproducerconfigs
>
> Is it something we should play with? Does anyone have any experience with
> the parameter?
>
> Thanks,
> David
>
>
> --
> This message, and any attachments, is for the intended recipient(s) only,
> may contain information that is privileged, confidential and/or proprietary
> and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> intended recipient, please delete this message.
>


In Flight Requests

2014-12-18 Thread Orelowitz, David
I notice that there is parameter max.in.flight.requests.per.connection in the 
ProducerConfig.java code that can be set. It is defaulted to 5.

This is not documented in the 0.8.2 documentation.

http://kafka.apache.org/082/documentation.html#newproducerconfigs

Is it something we should play with? Does anyone have any experience with the 
parameter?

Thanks,
David


--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


Better exception handling in kafka.producer.async.DefaultEventHandler

2014-12-18 Thread Xiaoyu Wang
Hello,

I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler
file. Below is the dispatchSerializedData function. Looks like we are
catching exception outside the loop and purely logs an error message.
We then return failedProduceRequests.

In case one broker is having problem, messages that will be sent to
brokers after the problematic broker will NOT be included in the
failedTopicAndPartitions and will be ignored quietly. Is this correct?
Shall we change the code to catch exception for sending message to
each broker?

Thanks

private def dispatchSerializedData(messages:
Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
  val partitionedDataOpt = partitionAndCollate(messages)
  partitionedDataOpt match {
case Some(partitionedData) =>
  val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
  try {

*  for ((brokerid, messagesPerBrokerMap) <- partitionedData) { *
  if (logger.isTraceEnabled)
messagesPerBrokerMap.foreach(partitionAndEvent =>
  trace("Handling event for Topic: %s, Broker: %d,
Partitions: %s".format(partitionAndEvent._1, brokerid,
partitionAndEvent._2)))
  val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)

  val failedTopicPartitions = send(brokerid, messageSetPerBroker)
  failedTopicPartitions.foreach(topicPartition => {
messagesPerBrokerMap.get(topicPartition) match {
  case Some(data) => failedProduceRequests.appendAll(data)
  case None => // nothing
}
  })
}



*   } catch {case t: Throwable => error("Failed to send
messages", t)  }  *failedProduceRequests
case None => // all produce requests failed
  messages
  }
}


Better handling of exception in kafka.producer.async.DefaultEventHandler

2014-12-18 Thread Xiaoyu Wang
Hello,

I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler
file. Below is the dispatchSerializedData function. Looks like we are
catching exception outside the loop and purely logs an error message.
We then return failedProduceRequests.

In case one broker is having problem, messages that will be sent to
brokers after the problematic broker will NOT be included in the
failedTopicAndPartitions and will be ignored quietly. Is this correct?
Shall we change the code to catch exception for sending message to
each broker?

Thanks

private def dispatchSerializedData(messages:
Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
  val partitionedDataOpt = partitionAndCollate(messages)
  partitionedDataOpt match {
case Some(partitionedData) =>
  val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
  try {

*  for ((brokerid, messagesPerBrokerMap) <- partitionedData) { *
  if (logger.isTraceEnabled)
messagesPerBrokerMap.foreach(partitionAndEvent =>
  trace("Handling event for Topic: %s, Broker: %d,
Partitions: %s".format(partitionAndEvent._1, brokerid,
partitionAndEvent._2)))
  val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)

  val failedTopicPartitions = send(brokerid, messageSetPerBroker)
  failedTopicPartitions.foreach(topicPartition => {
messagesPerBrokerMap.get(topicPartition) match {
  case Some(data) => failedProduceRequests.appendAll(data)
  case None => // nothing
}
  })
}



*   } catch {case t: Throwable => error("Failed to send
messages", t)  }  *failedProduceRequests
case None => // all produce requests failed
  messages
  }
}


Re: metrics about how behind a replica is?

2014-12-18 Thread Xiaoyu Wang
@Jun, We can increase the number of resends, but the produce request may
still fail.

For async producer, at the time when it fails, we have

   - messages that are in queue but has not been sent. From javaapi, we
   don't know which messages are still in queue.


   - Is it possible that we expose the blocking queue size so we know what
  remains in the queue?


   - messages we have failed retrying. For the last batch, some may have
   succeeded, but some failed retrying. From javaapi, we don't know what are
   the messages failed.
  - Is it possible to dump the failed messages to a file so that the
  next run can pick them up?

Does this make sense? Is there other way you will recommend to keep track
of messages that have been sent for async producer?

Thanks



On Wed, Dec 17, 2014 at 10:58 AM, Jun Rao  wrote:
>
> You can configure the number of resends on the producer.
>
> Thanks,
>
> Jun
>
> On Wed, Dec 17, 2014 at 10:34 AM, Xiaoyu Wang 
> wrote:
> >
> > I have tested using "async" producer with "required.ack=-1" and got
> really
> > good performance.
> >
> > We have not used async producer much previously, any potential dataloss
> > when a broker goes down? For example, when a broker goes down, does
> > producer resend all the messages in a batch?
> >
> >
> > On Wed, Dec 17, 2014 at 1:16 PM, Xiaoyu Wang 
> wrote:
> > >
> > > Thanks Jun.
> > >
> > > We have tested our producer with the different required.ack config.
> Even
> > > with the required.ack=1, the producer is > 10 times slower than with
> > > required.ack=0. Does this confirm with your  testing?
> > >
> > > I saw the presentation of LinkedIn Kafka SRE. Wondering what
> > configuration
> > > you guys have at LinkedIn to guarantee zero data loss.
> > >
> > > Thanks again and really appreciate your help!
> > >
> > > On Tue, Dec 16, 2014 at 9:50 PM, Jun Rao  wrote:
> > >>
> > >> replica.lag.max.messages only controls when a replica should be
> dropped
> > >> out
> > >> of the in-sync replica set (ISR). For a message to be considered
> > >> committed,
> > >> it has to be added to every replica in ISR. When the producer uses
> > ack=-1,
> > >> the broker waits until the produced message is committed before
> > >> acknowledging the client. So in the case of a clean leader election
> > (i.e.,
> > >> there is at least one remaining replica in ISR), no committed messages
> > are
> > >> lost. In the case of an unclean leader election, the number of
> messages
> > >> that can be lost depends on the state of the replicas and it's
> possible
> > to
> > >> lose more than replica.lag.max.messages messages.
> > >>
> > >> We do have the lag jmx metric per replica (see
> > >> http://kafka.apache.org/documentation.html#monitoring).
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Sun, Dec 14, 2014 at 7:20 AM, Xiaoyu Wang 
> > >> wrote:
> > >> >
> > >> > Hello,
> > >> >
> > >> > If I understand it correctly, when the number of messages a replica
> is
> > >> > behind from the leader is < replica.lag.max.messages, the replica is
> > >> > considered in sync with the master and are eligible for leader
> > election.
> > >> >
> > >> > This means we can lose at most replica.lag.max.messages messages
> > during
> > >> > leader election, is it? We can set the replica.lag.max.messages to
> be
> > >> very
> > >> > low, but then we may result in unclean leader election, so still we
> > can
> > >> > lose data.
> > >> >
> > >> > Can you recommend some way to prevent data loss? We have tried
> setting
> > >> > require ack from all replicas, but that slows down producer
> > >> significantly.
> > >> >
> > >> > In addition, do we have metrics about how far each replica is
> behind?
> > If
> > >> > not, can we add them.
> > >> >
> > >> >
> > >> > Thanks,
> > >> >
> > >>
> > >
> >
>


Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-18 Thread Jay Kreps
Hey Paul,

Here are the constraints:
1. We wanted the storage of messages to be in their compact binary form so
we could bound memory usage. This implies partitioning prior to enqueue.
And as you note partitioning requires having metadata (even stale metadata)
about topics.
2. We wanted to avoid prefetching metadata for all topics since there may
be quite a lot of topics.
3. We wanted to make metadata fetching lazy so that it would be possible to
create a client without having an active network connection. This tends to
be important when services are brought up in development or test
environments where it is annoying to have to control the dependency graph
when starting things.

This blocking isn't too bad as it only occurs on the first request for each
topic. Our feeling was that many things tend to get setup on a first
request (DB connections are established, caches populated, etc) so this was
not unreasonable.

If you want to pre-initialize the metadata to avoid blocking on the first
request you can do so by fetching the metadata using the
producer.partitionsFor(topic) api at start-up.

-Jay

On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy  wrote:
>
> Hello,
>
>   Playing around with the 0.8.2-beta producer client. One of my test cases
> is to ensure producers can deal with Kafka being down when the producer is
> created. My tests failed miserably because of the default blocking in the
> producer with regard to metadata.fetch.timeout.ms. The first line of new
> producer is waitOnMetadata which is blocking.
>
> I can handle this case by loading topic meta on init and setting the
> timeout value to very low metadata.fetch.timeout.ms and either throwing
> away messages or creating my own internal queue to buffer.
>
> I’m surprised the metasync isn’t done async. If it fails, return that in
> the future/callback. This way the API could actually be considered safely
> async and the producer buffer could try to hold on to things until
> block.on.buffer.full kicks in to either drop messages or block. You’d
> probably need a partition callback since numPartitions wouldn’t be
> available.
>
> The implication is that people's apps will work fine if first messages are
> sent while kafka server is up, however, if kafka is down and they restart
> their app, the new producer will block all sends and blow things up if you
> haven't written your app to be aware of this edge case.
>
>
> Thanks,
>
> Paul
>


Kafka 0.8.2 new producer blocking on metadata

2014-12-18 Thread Paul Pearcy
Hello,

  Playing around with the 0.8.2-beta producer client. One of my test cases
is to ensure producers can deal with Kafka being down when the producer is
created. My tests failed miserably because of the default blocking in the
producer with regard to metadata.fetch.timeout.ms. The first line of new
producer is waitOnMetadata which is blocking.

I can handle this case by loading topic meta on init and setting the
timeout value to very low metadata.fetch.timeout.ms and either throwing
away messages or creating my own internal queue to buffer.

I’m surprised the metasync isn’t done async. If it fails, return that in
the future/callback. This way the API could actually be considered safely
async and the producer buffer could try to hold on to things until
block.on.buffer.full kicks in to either drop messages or block. You’d
probably need a partition callback since numPartitions wouldn’t be
available.

The implication is that people's apps will work fine if first messages are
sent while kafka server is up, however, if kafka is down and they restart
their app, the new producer will block all sends and blow things up if you
haven't written your app to be aware of this edge case.


Thanks,

Paul


Re: Increase in Kafka replication fetcher thread not reducing log replication

2014-12-18 Thread nitin sharma
HI All --  has any one faced the problem i have posted in my previous email
?

Regards,
Nitin Kumar Sharma.


On Wed, Dec 17, 2014 at 11:08 AM, nitin sharma 
wrote:
>
> Hi All,
>
> I am trying to figure out best configuration for my Kafka brokers so that
> in case of restarted, the new node catch up with Leader at quick pace.
>
> My test environment has 2 kafka brokers and 1 Topic with one Partition.
>
> I first ran the test (Test#1) with default setting, i.e.
> num.replica.fetchers =1 and replica.fetch.max.bytes = 1048576 Bytes (1 MB).
> it took 11min and 40 sec to copy the 37.9 GB @ the rate 55.5MB/sec (
> 37.9*1024/700)
>
> Later I increased the num.replica.fetchers = 5 and
> replica.fetch.max.bytes=1MB and ran another test (Test #2). I got the
> replica @ 89 MB/sec. Which is good but i was expecting 4*55 =  221 MB/sec.
>
> i ran two more test and results got much worse:
> Test#3 : replica thread = 5 and replica.fetch.max.bytes = 5MB.
>   replication rate = 92.7 MB/sec
>
> Test #4: replication thread = 20 and replica.fetch.max.bytes = 5 MB.
>   replication rate = 93.54 MB/sec
>
> Any reason why increasing the replica fetcher thread or increase in fetch
> max bytes not increasing my replication rate linearly.
>
> note: in all the test CPU utilization was not more than 45%
>
>
> Regards,
> Nitin Kumar Sharma.
>
>


RE: Issues With Parallelism In Kafka Spout

2014-12-18 Thread Nilesh Chhapru
Hi All,

Please give some inputs as this is pending since long and need to meet the 
deadlines

Regards,
Nilesh Chhapru.

From: Nilesh Chhapru [mailto:nilesh.chha...@ugamsolutions.com]
Sent: 18 December 2014 01:24 PM
To: u...@storm.apache.org; users@kafka.apache.org
Subject: Issues With Parallelism In Kafka Spout

Hi All,

I have implemented a high level Kafka consumer in Storm but looks like the 
parallelism isn't getting achieved as I have 3 partitions and 2 task for the 
spout, but only one of it is emitting the data.
PFB the screen grab for number of task of spout and data emitted by only one of 
them.

Please assist on how to achieve parallelism using high level Kafka spout.

[cid:image001.png@01D01ABD.FA56FB90]

Regards,
Nilesh Chhapru.



---Disclaimer--

Opinions expressed in this e-mail are those of the author and do not 
necessarily represent those of Ugam. Ugam does not accept any responsibility or 
liability for it. This e-mail message may contain proprietary, confidential or 
legally privileged information for the sole use of the person or entity to whom 
this message was originally addressed. Any review, re-transmission, 
dissemination or other use of or taking of any action in reliance upon this 
information by persons or entities other than the intended recipient is 
prohibited. If you have received this e-mail in error, please delete it and all 
attachments from any servers, hard drives or any other media.

Warning: Sufficient measures have been taken to scan any presence of viruses 
however the recipient should check this email and any attachments for the 
presence of viruses. Ugam accepts no liability for any damage caused by any 
virus transmitted by this email. 


---Disclaimer--

Opinions expressed in this e-mail are those of the author and do not 
necessarily represent those of Ugam. Ugam does not accept any responsibility or 
liability for it. This e-mail message may contain proprietary, confidential or 
legally privileged information for the sole use of the person or entity to whom 
this message was originally addressed. Any review, re-transmission, 
dissemination or other use of or taking of any action in reliance upon this 
information by persons or entities other than the intended recipient is 
prohibited. If you have received this e-mail in error, please delete it and all 
attachments from any servers, hard drives or any other media.

Warning: Sufficient measures have been taken to scan any presence of viruses 
however the recipient should check this email and any attachments for the 
presence of viruses. Ugam accepts no liability for any damage caused by any 
virus transmitted by this email. 


Kafka 0.8.2 new producer blocking on metadata

2014-12-18 Thread Paul Pearcy
Heya,
  Playing around with the 0.8.2-beta producer client. One of my test cases is 
to ensure producers can deal with Kafka being down when the producer is 
created. My tests failed miserably because of the default blocking in the 
producer with regard to metadata.fetch.timeout.ms. The first line of new 
producer is waitOnMetadata which is blocking.

I can handle this case by loading topic meta on init and setting the timeout 
value to very low metadata.fetch.timeout.ms and either throwing away messages 
or creating my own internal queue to buffer.

I'm surprised the metasync isn't done async. If it fails, return that in the 
future/callback. This way the API could actually be considered safely async and 
the producer buffer could try to hold on to things until block.on.buffer.full 
kicks in. You'd probably need a partition callback since numPartitions wouldn't 
be available.

The implication is that people's apps will work fine if first messages are sent 
while kafka server is up, however, if kafka is down and they restart their app, 
the new producer will block all sends and blow things up if you haven't written 
your app to be aware of this edge case.

Thanks,
Paul
This email and any attachments may contain confidential and proprietary 
information of Blackboard that is for the sole use of the intended recipient. 
If you are not the intended recipient, disclosure, copying, re-distribution or 
other use of any of this information is strictly prohibited. Please immediately 
notify the sender and delete this transmission if you received this email in 
error.


Issues With Parallelism In Kafka Spout

2014-12-18 Thread Nilesh Chhapru
Hi All,

I have implemented a high level Kafka consumer in Storm but looks like the 
parallelism isn't getting achieved as I have 3 partitions and 2 task for the 
spout, but only one of it is emitting the data.
PFA the screen grab for number of task of spout and data emitted by only one of 
them.

Please assist on how to achieve parallelism using high level Kafka spout.

Regards,
Nilesh Chhapru.



---Disclaimer--

Opinions expressed in this e-mail are those of the author and do not 
necessarily represent those of Ugam. Ugam does not accept any responsibility or 
liability for it. This e-mail message may contain proprietary, confidential or 
legally privileged information for the sole use of the person or entity to whom 
this message was originally addressed. Any review, re-transmission, 
dissemination or other use of or taking of any action in reliance upon this 
information by persons or entities other than the intended recipient is 
prohibited. If you have received this e-mail in error, please delete it and all 
attachments from any servers, hard drives or any other media.

Warning: Sufficient measures have been taken to scan any presence of viruses 
however the recipient should check this email and any attachments for the 
presence of viruses. Ugam accepts no liability for any damage caused by any 
virus transmitted by this email.