Re: Consumer thread dies

2014-11-10 Thread Jun Rao
Don't you have the same problem using SimpleConsumer? How does another
process know a SimpleConsumer hangs?

Thanks,

Jun

On Mon, Nov 10, 2014 at 9:47 AM, Srinivas Reddy Kancharla <
getre...@gmail.com> wrote:

> Thanks Jun for your response.
> Here is my scenario:
>
> topicCountMap.put(topic, new Integer(2));
> Map>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> List> streams = consumerMap.get(topic);
>
> So from above scenario (only 1 partition) , there will be 2 threads C1 and
> C2, and one of the thread would be holding one partition. In my scenario,
> C1 would be hung after consuming the message from stream and in hung state
> during processing of message. So in such scenario, looks like C2 will not
> get hold of partition stream unless C1 didn't die..
>
> So considering such scenarios, I am planning to use SimpleConsumer where I
> will have more control on consuming  Partition content. But I have to
> maintain offset state in zookeeper. Let me know if this approach is correct
> for such hung scenarios.
>
> Thanks and regards,
> Srini
>
>
> On Sun, Nov 9, 2014 at 9:12 PM, Jun Rao  wrote:
>
> > If C1 dies, C2 will be owning that partition. However, C1 has to really
> > die, which typically means that either you close the consumer connector
> or
> > the jvm of C1 is gone.
> >
> > In your case, it seems that C1 didn't die, it just hung. Do you know why
> C1
> > hung?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Nov 7, 2014 at 3:34 PM, Srinivas Reddy Kancharla <
> > getre...@gmail.com
> > > wrote:
> >
> > > Hi,
> > >
> > > I have a scenario where I have 1 partition and 1 consumer group having
> 2
> > > consumer threads running say C1 and C2. Since there is only one
> partition
> > > for a given topic, say C1 is holding that partition. Now due to some
> > reason
> > > if C1 dies, can C2 get hold of that partition?
> > >
> > > i.e. C1 was busy with KafkaStream instance, for any reason if C1 dies
> or
> > in
> > > hung state, Can we make C2 talking to KafkaStream (for Partition 0).
> > > I am facing this issue where I have 10 messages in partition 0 and C1
> was
> > > consuming it. At message 4, C1 went into hung state. Now I would like
> to
> > > make C2 to consumer other messages which are not consumed by C1.
> > >
> > > Thank and regards,
> > > Srini
> > >
> >
>


Re: Programmatic Kafka version detection/extraction?

2014-11-10 Thread Jun Rao
Otis,

We don't have an api for that now. We can probably expose this as a JMX as
part of kafka-1481.

Thanks,

Jun

On Mon, Nov 10, 2014 at 7:17 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> Is there a way to detect which version of Kafka one is running?
> Is there an API for that, or a constant with this value, or maybe an MBean
> or some other way to get to this info?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>


Programmatic Kafka version detection/extraction?

2014-11-10 Thread Otis Gospodnetic
Hi,

Is there a way to detect which version of Kafka one is running?
Is there an API for that, or a constant with this value, or maybe an MBean
or some other way to get to this info?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


Re: expanding cluster and reassigning parititions without restarting producer

2014-11-10 Thread Neha Narkhede
How can I auto refresh keyed producers to use new partitions as these
partitions are added?

Try using the new producer under org.apache.kafka.clients.producer.

Thanks,
Neha

On Mon, Nov 10, 2014 at 8:52 AM, Bhavesh Mistry 
wrote:

> I had different experience with expanding partition for new producer and
> its impact.  I only tried for non-key message.I would always advice to
> keep batch size relatively low or plan for expansion with new java producer
> in advance or since inception otherwise running producer code is impacted.
>
> Here is mail chain:
>
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201411.mbox/%3ccaoejijit4cgry97dgzfjkfvaqfduv-o1x1kafefbshgirkm...@mail.gmail.com%3E
>
> Thanks,
>
> Bhavesh
>
> On Mon, Nov 10, 2014 at 5:20 AM, Shlomi Hazan  wrote:
>
> > Hmmm..
> > The Java producer example seems to ignore added partitions too...
> > How can I auto refresh keyed producers to use new partitions as these
> > partitions are added?
> >
> >
> > On Mon, Nov 10, 2014 at 12:33 PM, Shlomi Hazan  wrote:
> >
> > > One more thing:
> > > I saw that the Python client is also unaffected by addition of
> partitions
> > > to a topic and that it continues to send requests only to the old
> > > partitions.
> > > is this also handled appropriately by the Java producer? Will he see
> the
> > > change and produce to the new partitions as well?
> > > Shlomi
> > >
> > > On Mon, Nov 10, 2014 at 9:34 AM, Shlomi Hazan 
> wrote:
> > >
> > >> No I don't see anything like that, the question was aimed at learning
> if
> > >> it is worthwhile to make the effort of reimplementing the Python
> > producer
> > >> in Java, I so I will not make all the effort just to be disappointed
> > >> afterwards.
> > >> understand I have nothing to worry about, so I will try to simulate
> this
> > >> situation in small scale...
> > >> maybe 3 brokers, one topic with one partition and then add partitions.
> > >> we'll see.
> > >> thanks for clarifying.
> > >> Oh, Good luck with Confluent!!
> > >> :)
> > >>
> > >> On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede <
> neha.narkh...@gmail.com
> > >
> > >> wrote:
> > >>
> > >>> The producer might get an error code if the leader of the partitions
> > >>> being
> > >>> reassigned also changes. However it should retry and succeed. Do you
> > see
> > >>> a
> > >>> behavior that suggests otherwise?
> > >>>
> > >>> On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan 
> > wrote:
> > >>>
> > >>> > Hi All,
> > >>> > I recently had an issue producing from python where expanding a
> > cluster
> > >>> > from 3 to 5 nodes and reassigning partitions forced me to restart
> the
> > >>> > producer b/c of KeyError thrown.
> > >>> > Is this situation handled by the Java producer automatically or
> need
> > I
> > >>> do
> > >>> > something to have the java producer refresh itself to see the
> > >>> reassigned
> > >>> > partition layout and produce away ?
> > >>> > Shlomi
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>


change retention for a topic on the fly does not work

2014-11-10 Thread Chen Wang
Hey guys,
i am using kafka_2.9.2-0.8.1.1

 bin/kafka-topics.sh --zookeeper localhost:2182 --alter --topic my_topic
 --config log.retention.hours.per.topic=48

It says:
Error while executing topic command requirement failed: Unknown
configuration "log.retention.hours.per.topic".
java.lang.IllegalArgumentException: requirement failed: Unknown
configuration "log.retention.hours.per.topic".
at scala.Predef$.require(Predef.scala:214)
at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:138)
at kafka.log.LogConfig$$anonfun$validateNames$1.apply(LogConfig.scala:137)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at
scala.collection.JavaConversions$JEnumerationWrapper.foreach(JavaConversions.scala:578)
at kafka.log.LogConfig$.validateNames(LogConfig.scala:137)
at kafka.log.LogConfig$.validate(LogConfig.scala:145)

Any idea?
Chen


Re: JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
I am not running locally.  The Spark master is:

"spark://:7077"



On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das 
wrote:

> What is the Spark master that you are using. Use local[4], not local
> if you are running locally.
>
> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
>  wrote:
> > I am embarrassed to admit but I can't get a basic 'word count' to work
> under
> > Kafka/Spark streaming.  My code looks like this.  I  don't see any word
> > counts in console output.  Also, don't see any output in UI.  Needless to
> > say, I am newbie in both 'Spark' as well as 'Kafka'.
> >
> > Please help.  Thanks.
> >
> > Here's the code:
> >
> > public static void main(String[] args) {
> > if (args.length < 4) {
> > System.err.println("Usage: JavaKafkaWordCount 
> 
> >  ");
> > System.exit(1);
> > }
> >
> > //StreamingExamples.setStreamingLogLevels();
> > //SparkConf sparkConf = new
> > SparkConf().setAppName("JavaKafkaWordCount");
> >
> > // Location of the Spark directory
> > String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
> >
> > // URL of the Spark cluster
> > String sparkUrl = "spark://mymachine:7077";
> >
> > // Location of the required JAR files
> > String jarFiles =
> >
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
> >
> > SparkConf sparkConf = new SparkConf();
> > sparkConf.setAppName("JavaKafkaWordCount");
> > sparkConf.setJars(new String[]{jarFiles});
> > sparkConf.setMaster(sparkUrl);
> > sparkConf.set("spark.ui.port", "2348");
> > sparkConf.setSparkHome(sparkHome);
> >
> > Map kafkaParams = new HashMap();
> > kafkaParams.put("zookeeper.connect", "myedgenode:2181");
> > kafkaParams.put("group.id", "1");
> > kafkaParams.put("metadata.broker.list", "myedgenode:9092");
> > kafkaParams.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > kafkaParams.put("request.required.acks", "1");
> >
> > // Create the context with a 1 second batch size
> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> new
> > Duration(2000));
> >
> > int numThreads = Integer.parseInt(args[3]);
> > Map topicMap = new HashMap();
> > String[] topics = args[2].split(",");
> > for (String topic: topics) {
> > topicMap.put(topic, numThreads);
> > }
> >
> > //JavaPairReceiverInputDStream messages =
> > //KafkaUtils.createStream(jssc, args[0], args[1],
> topicMap);
> > JavaPairDStream messages =
> > KafkaUtils.createStream(jssc,
> > String.class,
> > String.class,
> > StringDecoder.class,
> > StringDecoder.class,
> > kafkaParams,
> > topicMap,
> > StorageLevel.MEMORY_ONLY_SER());
> >
> >
> > JavaDStream lines = messages.map(new
> Function > String>, String>() {
> > @Override
> > public String call(Tuple2 tuple2) {
> > return tuple2._2();
> > }
> > });
> >
> > JavaDStream words = lines.flatMap(new
> > FlatMapFunction() {
> > @Override
> > public Iterable call(String x) {
> > return Lists.newArrayList(SPACE.split(x));
> > }
> > });
> >
> > JavaPairDStream wordCounts = words.mapToPair(
> > new PairFunction() {
> > @Override
> > public Tuple2 call(String s) {
> > return new Tuple2(s, 1);
> > }
> > }).reduceByKey(new Function2 Integer>() {
> > @Override
> > public Integer call(Integer i1, Integer i2) {
> > return i1 + i2;
> > }
> > });
> >
> > wordCounts.print();
> > jssc.start();
> > jssc.awaitTermination();
> >
>


Re: JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Tathagata Das
What is the Spark master that you are using. Use local[4], not local
if you are running locally.

On Mon, Nov 10, 2014 at 3:01 PM, Something Something
 wrote:
> I am embarrassed to admit but I can't get a basic 'word count' to work under
> Kafka/Spark streaming.  My code looks like this.  I  don't see any word
> counts in console output.  Also, don't see any output in UI.  Needless to
> say, I am newbie in both 'Spark' as well as 'Kafka'.
>
> Please help.  Thanks.
>
> Here's the code:
>
> public static void main(String[] args) {
> if (args.length < 4) {
> System.err.println("Usage: JavaKafkaWordCount  
>  ");
> System.exit(1);
> }
>
> //StreamingExamples.setStreamingLogLevels();
> //SparkConf sparkConf = new
> SparkConf().setAppName("JavaKafkaWordCount");
>
> // Location of the Spark directory
> String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>
> // URL of the Spark cluster
> String sparkUrl = "spark://mymachine:7077";
>
> // Location of the required JAR files
> String jarFiles =
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>
> SparkConf sparkConf = new SparkConf();
> sparkConf.setAppName("JavaKafkaWordCount");
> sparkConf.setJars(new String[]{jarFiles});
> sparkConf.setMaster(sparkUrl);
> sparkConf.set("spark.ui.port", "2348");
> sparkConf.setSparkHome(sparkHome);
>
> Map kafkaParams = new HashMap();
> kafkaParams.put("zookeeper.connect", "myedgenode:2181");
> kafkaParams.put("group.id", "1");
> kafkaParams.put("metadata.broker.list", "myedgenode:9092");
> kafkaParams.put("serializer.class",
> "kafka.serializer.StringEncoder");
> kafkaParams.put("request.required.acks", "1");
>
> // Create the context with a 1 second batch size
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(2000));
>
> int numThreads = Integer.parseInt(args[3]);
> Map topicMap = new HashMap();
> String[] topics = args[2].split(",");
> for (String topic: topics) {
> topicMap.put(topic, numThreads);
> }
>
> //JavaPairReceiverInputDStream messages =
> //KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
> JavaPairDStream messages =
> KafkaUtils.createStream(jssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicMap,
> StorageLevel.MEMORY_ONLY_SER());
>
>
> JavaDStream lines = messages.map(new Function String>, String>() {
> @Override
> public String call(Tuple2 tuple2) {
> return tuple2._2();
> }
> });
>
> JavaDStream words = lines.flatMap(new
> FlatMapFunction() {
> @Override
> public Iterable call(String x) {
> return Lists.newArrayList(SPACE.split(x));
> }
> });
>
> JavaPairDStream wordCounts = words.mapToPair(
> new PairFunction() {
> @Override
> public Tuple2 call(String s) {
> return new Tuple2(s, 1);
> }
> }).reduceByKey(new Function2() {
> @Override
> public Integer call(Integer i1, Integer i2) {
> return i1 + i2;
> }
> });
>
> wordCounts.print();
> jssc.start();
> jssc.awaitTermination();
>


JavaKafkaWordCount not working under Spark Streaming

2014-11-10 Thread Something Something
I am embarrassed to admit but I can't get a basic 'word count' to work
under Kafka/Spark streaming.  My code looks like this.  I  don't see any
word counts in console output.  Also, don't see any output in UI.  Needless
to say, I am newbie in both 'Spark' as well as 'Kafka'.

Please help.  Thanks.

Here's the code:

public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount 
  ");
System.exit(1);
}

//StreamingExamples.setStreamingLogLevels();
//SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaWordCount");

// Location of the Spark directory
String sparkHome = "/opt/mapr/spark/spark-1.0.2/";

// URL of the Spark cluster
String sparkUrl = "spark://mymachine:7077";

// Location of the required JAR files
String jarFiles =
"./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("JavaKafkaWordCount");
sparkConf.setJars(new String[]{jarFiles});
sparkConf.setMaster(sparkUrl);
sparkConf.set("spark.ui.port", "2348");
sparkConf.setSparkHome(sparkHome);

Map kafkaParams = new HashMap();
kafkaParams.put("zookeeper.connect", "myedgenode:2181");
kafkaParams.put("group.id", "1");
kafkaParams.put("metadata.broker.list", "myedgenode:9092");
kafkaParams.put("serializer.class",
"kafka.serializer.StringEncoder");
kafkaParams.put("request.required.acks", "1");

// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(2000));

int numThreads = Integer.parseInt(args[3]);
Map topicMap = new HashMap();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

//JavaPairReceiverInputDStream messages =
//KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaPairDStream messages =
KafkaUtils.createStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicMap,
StorageLevel.MEMORY_ONLY_SER());


JavaDStream lines = messages.map(new
Function, String>() {
@Override
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});

JavaDStream words = lines.flatMap(new
FlatMapFunction() {
@Override
public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});

JavaPairDStream wordCounts = words.mapToPair(
new PairFunction() {
@Override
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
}).reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

wordCounts.print();
jssc.start();
jssc.awaitTermination();


Re: Interrupting controlled shutdown breaks Kafka cluster

2014-11-10 Thread Solon Gordon
Thanks, Neha. I tried the same test with 0.8.2-beta and am happy to report
I've been unable to reproduce the bad behavior. I'll follow up if this
changes.

On Sun, Nov 9, 2014 at 9:30 PM, Neha Narkhede 
wrote:

> We fixed a couple issues related to automatic leader balancing and
> controlled shutdown. Would you mind trying out 0.8.2-beta?
>
> On Fri, Nov 7, 2014 at 11:52 AM, Solon Gordon  wrote:
>
> > We're using 0.8.1.1 with auto.leader.rebalance.enable=true.
> >
> > On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang 
> wrote:
> >
> > > Solon,
> > >
> > > Which version of Kafka are you running and are you enabling auto leader
> > > rebalance at the same time?
> > >
> > > Guozhang
> > >
> > > On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > My team has observed that if a broker process is killed in the middle
> > of
> > > > the controlled shutdown procedure, the remaining brokers start
> spewing
> > > > errors and do not properly rebalance leadership. The cluster cannot
> > > recover
> > > > without major manual intervention.
> > > >
> > > > Here is how to reproduce the problem:
> > > > 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call
> them
> > A,
> > > > B, and C.) Set controlled.shutdown.enable=true.
> > > > 2. Create a topic with replication_factor = 3 and a large number of
> > > > partitions (say 100).
> > > > 3. Send a TERM signal to broker A. This initiates controlled
> shutdown.
> > > > 4. Before controlled shutdown completes, quickly send a KILL signal
> to
> > > > broker A.
> > > >
> > > > Result:
> > > > - Brokers B and C start logging ReplicaFetcherThread connection
> errors
> > > > every few milliseconds. (See below for an example.)
> > > > - Broker A is still listed as a leader and ISR for any partitions
> which
> > > > were not transferred during controlled shutdown. This causes
> connection
> > > > errors when clients try to produce to or consume from these
> partitions.
> > > >
> > > > This scenario is difficult to recover from. The only ways we have
> found
> > > are
> > > > to restart broker A multiple times (if it still exists) or to kill
> > both B
> > > > and C and then start them one by one. Without this kind of
> > intervention,
> > > > the above issues persist indefinitely.
> > > >
> > > > This may sound like a contrived scenario, but it's exactly what we
> have
> > > > seen when a Kafka EC2 instance gets terminated by AWS. So this seems
> > > like a
> > > > real liability.
> > > >
> > > > Are there any existing JIRA tickets which cover this behavior? And do
> > you
> > > > have any recommendations for avoiding it, other than forsaking
> > controlled
> > > > shutdowns entirely?
> > > >
> > > > Thanks,
> > > > Solon
> > > >
> > > > Error example:
> > > > [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225],
> > > Error
> > > > in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500;
> ClientId:
> > > > ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait:
> 500
> > > ms;
> > > > MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
> > > > PartitionFetchInfo(503,10485760),[my-topic,63] ->
> > > > PartitionFetchInfo(386,10485760),[my-topic,99] ->
> > > > PartitionFetchInfo(525,10485760),[my-topic,84] ->
> > > > PartitionFetchInfo(436,10485760),[my-topic,48] ->
> > > > PartitionFetchInfo(484,10485760),[my-topic,75] ->
> > > > PartitionFetchInfo(506,10485760),[my-topic,45] ->
> > > > PartitionFetchInfo(473,10485760),[my-topic,66] ->
> > > > PartitionFetchInfo(532,10485760),[my-topic,30] ->
> > > > PartitionFetchInfo(435,10485760),[my-topic,96] ->
> > > > PartitionFetchInfo(517,10485760),[my-topic,27] ->
> > > > PartitionFetchInfo(470,10485760),[my-topic,36] ->
> > > > PartitionFetchInfo(472,10485760),[my-topic,9] ->
> > > > PartitionFetchInfo(514,10485760),[my-topic,33] ->
> > > > PartitionFetchInfo(582,10485760),[my-topic,69] ->
> > > > PartitionFetchInfo(504,10485760),[my-topic,57] ->
> > > > PartitionFetchInfo(444,10485760),[my-topic,78] ->
> > > > PartitionFetchInfo(559,10485760),[my-topic,12] ->
> > > > PartitionFetchInfo(417,10485760),[my-topic,90] ->
> > > > PartitionFetchInfo(429,10485760),[my-topic,18] ->
> > > > PartitionFetchInfo(497,10485760),[my-topic,0] ->
> > > > PartitionFetchInfo(402,10485760),[my-topic,6] ->
> > > > PartitionFetchInfo(527,10485760),[my-topic,54] ->
> > > > PartitionFetchInfo(524,10485760),[my-topic,15] ->
> > > > PartitionFetchInfo(448,10485760),[console,0] ->
> > > > PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > at
> > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > > > at
> > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at
> > kafka.consumer.Si

Re: Consumer thread dies

2014-11-10 Thread Srinivas Reddy Kancharla
Thanks Jun for your response.
Here is my scenario:

topicCountMap.put(topic, new Integer(2));
Map>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List> streams = consumerMap.get(topic);

So from above scenario (only 1 partition) , there will be 2 threads C1 and
C2, and one of the thread would be holding one partition. In my scenario,
C1 would be hung after consuming the message from stream and in hung state
during processing of message. So in such scenario, looks like C2 will not
get hold of partition stream unless C1 didn't die..

So considering such scenarios, I am planning to use SimpleConsumer where I
will have more control on consuming  Partition content. But I have to
maintain offset state in zookeeper. Let me know if this approach is correct
for such hung scenarios.

Thanks and regards,
Srini


On Sun, Nov 9, 2014 at 9:12 PM, Jun Rao  wrote:

> If C1 dies, C2 will be owning that partition. However, C1 has to really
> die, which typically means that either you close the consumer connector or
> the jvm of C1 is gone.
>
> In your case, it seems that C1 didn't die, it just hung. Do you know why C1
> hung?
>
> Thanks,
>
> Jun
>
> On Fri, Nov 7, 2014 at 3:34 PM, Srinivas Reddy Kancharla <
> getre...@gmail.com
> > wrote:
>
> > Hi,
> >
> > I have a scenario where I have 1 partition and 1 consumer group having 2
> > consumer threads running say C1 and C2. Since there is only one partition
> > for a given topic, say C1 is holding that partition. Now due to some
> reason
> > if C1 dies, can C2 get hold of that partition?
> >
> > i.e. C1 was busy with KafkaStream instance, for any reason if C1 dies or
> in
> > hung state, Can we make C2 talking to KafkaStream (for Partition 0).
> > I am facing this issue where I have 10 messages in partition 0 and C1 was
> > consuming it. At message 4, C1 went into hung state. Now I would like to
> > make C2 to consumer other messages which are not consumed by C1.
> >
> > Thank and regards,
> > Srini
> >
>


Re: Error in fetch Name. How to recover broken node?

2014-11-10 Thread Marco
Thanks. That worked just fine!


Il Lunedì 10 Novembre 2014 17:53, Guozhang Wang  ha scritto:
 


You do not need to delete the data folder, I think "file handles" here are 
mostly due to socket leaks, i.e. network socket file handlers, not disk file 
handlers. Just restart the broker should do the work.

Guozhang


On Mon, Nov 10, 2014 at 7:47 AM, Marco  wrote:

We're using kafka 0.8.1.1.
>
>About network partition, it is an option.
>now i'm just wondering if deleting the data folder on the second node will at 
>least have it come up again.
>
>i think another guy tried a kafka-reassign-partitions just before it all blew 
>up.
>
>
>Il Lunedì 10 Novembre 2014 16:36, Guozhang Wang  ha 
>scritto:
>
>Hi Marco,
>
>The fetch error comes from "UnresolvedAddressException", could you try to
>check if you have a network partition issue during that time?
>
>As for the "Too many file handlers", I think this is due to not properly
>handling such exceptions that it does not close the socket in time, which
>version of Kafka are you using?
>
>Guozhang
>
>
>
>
>On Mon, Nov 10, 2014 at 6:08 AM, Marco  wrote:
>
>> Hi,
>> i've got a 2-machine kafka cluster. For some reasons after a restart the
>> second node won't start.
>> i get tons of "Error in fetch Name" until I get a final "Too many open
>> files".
>>
>> How do i start dealing with this?
>>
>> thanks
>>
>> this is the error
>>
>> [2014-11-10 14:48:01,169] INFO [Kafka Server 2], started
>> (kafka.server.KafkaServer)
>> [2014-11-10 14:48:01,378] INFO [ReplicaFetcherManager on broker 2] Removed
>> fetcher for partitions
>> [news,3],[test,0],[test,2],[news,1],[test3,1],[test3,3]
>> (kafka.server.ReplicaFetcherManager)
>> [2014-11-10 14:48:01,459] INFO Truncating log news-3 to offset 249.
>> (kafka.log.Log)
>> [2014-11-10 14:48:01,462] INFO Truncating log test-0 to offset 0.
>> (kafka.log.Log)
>> [2014-11-10 14:48:01,462] INFO Truncating log test-2 to offset 0.
>> (kafka.log.Log)
>> [2014-11-10 14:48:01,463] INFO Truncating log news-1 to offset 268.
>> (kafka.log.Log)
>> [2014-11-10 14:48:01,464] INFO Truncating log test3-1 to offset 0.
>> (kafka.log.Log)
>> [2014-11-10 14:48:01,464] INFO Truncating log test3-3 to offset 0.
>> (kafka.log.Log)
>> [2014-11-10 14:48:01,530] INFO [ReplicaFetcherThread-0-1], Starting
>> (kafka.server.ReplicaFetcherThread)
>> [2014-11-10 14:48:01,535] INFO [ReplicaFetcherManager on broker 2] Added
>> fetcher for partitions ArrayBuffer([[news,3], initOffset 249 to broker
>> id:1,host:machine1,port:9092] , [[news,1], initOffset 268 to broker
>> id:1,host:machine1,port:9092] ) (kafka.server.ReplicaFetcherManager)
>> [2014-11-10 14:48:01,551] ERROR [ReplicaFetcherThread-0-1], Error in fetch
>> Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId:
>> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
>> RequestInfo: [news,3] -> PartitionFetchInfo(249,1048576),[news,1] ->
>> PartitionFetchInfo(268,1048576) (kafka.server.ReplicaFetcherThread)
>> java.nio.channels.UnresolvedAddressException
>> at sun.nio.ch.Net.checkAddress(Net.java:127)
>> ...
>>
>
>
>
>--
>-- Guozhang
>


-- 

-- Guozhang

Re: corrupt recovery checkpoint file issue....

2014-11-10 Thread Guozhang Wang
You are right. The swap will be skipped in that case. It seems this
mechanism does not prevent scenarios where the storage system's hard crash.

An orthogonal note: I originally though renameTo in Linux is atomic, but
after reading some JavaDocs I think maybe we should use nio.File.move to be
safer?

https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#move%28java.nio.file.Path,%20java.nio.file.Path,%20java.nio.file.CopyOption...%29

Guozhang

On Sun, Nov 9, 2014 at 6:10 PM, Jun Rao  wrote:

> Guozhang,
>
> In OffsetCheckpoint.write(), we don't catch any exceptions. There is only a
> finally clause to close the writer. So, it there is any exception during
> write or sync, the exception will be propagated back to the caller and
> swapping will be skipped.
>
> Thanks,
>
> Jun
>
> On Fri, Nov 7, 2014 at 9:47 AM, Guozhang Wang  wrote:
>
> > Jun,
> >
> > Checking the OffsetCheckpoint.write function, if
> > "fileOutputStream.getFD.sync" throws exception it will just be caught and
> > forgotten, and the swap will still happen, may be we need to catch the
> > SyncFailedException and re-throw it as a FATAIL error to skip the swap.
> >
> > Guozhang
> >
> >
> > On Thu, Nov 6, 2014 at 8:50 PM, Jason Rosenberg 
> wrote:
> >
> > > I'm still not sure what caused the reboot of the system (but yes it
> > appears
> > > to have crashed hard).  The file system is xfs, on CentOs linux.  I'm
> not
> > > yet sure, but I think also before the crash, the system might have
> become
> > > wedged.
> > >
> > > It appears the corrupt recovery files actually contained all zero
> bytes,
> > > after looking at it with odb.
> > >
> > > I'll file a Jira.
> > >
> > > On Thu, Nov 6, 2014 at 7:09 PM, Jun Rao  wrote:
> > >
> > > > I am also wondering how the corruption happened. The way that we
> update
> > > the
> > > > OffsetCheckpoint file is to first write to a tmp file and flush the
> > data.
> > > > We then rename the tmp file to the final file. This is done to
> prevent
> > > > corruption caused by a crash in the middle of the writes. In your
> case,
> > > was
> > > > the host crashed? What kind of storage system are you using? Is there
> > any
> > > > non-volatile cache on the storage system?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg 
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > We recently had a kafka node go down suddenly. When it came back
> up,
> > it
> > > > > apparently had a corrupt recovery file, and refused to startup:
> > > > >
> > > > > 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
> > > > > starting up KafkaServer
> > > > > java.lang.NumberFormatException: For input string:
> > > > >
> > > > >
> > > >
> > >
> >
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
> > > > >
> > > > >
> > > >
> > >
> >
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> > > > > at
> > > > >
> > > >
> > >
> >
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> > > > > at java.lang.Integer.parseInt(Integer.java:481)
> > > > > at java.lang.Integer.parseInt(Integer.java:527)
> > > > > at
> > > > >
> > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> > > > > at
> > > scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> > > > > at
> > > kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> > > > > at
> > > > >
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> > > > > at
> > > > >
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > > > > at
> > > > >
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> > > > > at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> > > > > at kafka.log.LogManager.(LogManager.scala:57)
> > > > > at
> > > > kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> > > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
> > > > >
> > > > > And since the app is under a monitor (so it was repeatedly
> restarting
> > > and
> > > > > failing with this error for several minutes before we got to it)…
> > > > >
> > > > > We moved the ‘recovery-point-offset-checkpoint’ file out of the
> way,
> > > and
> > > > it
> > > > > then restarted cleanly (but of course re-synced all it’s data from
> > > > > replicas, so we had no data loss).
> > > > >
> > > > > Anyway, I’m wondering if that’s the expected behavior? Or should it
> > not
>

Re: expanding cluster and reassigning parititions without restarting producer

2014-11-10 Thread Bhavesh Mistry
I had different experience with expanding partition for new producer and
its impact.  I only tried for non-key message.I would always advice to
keep batch size relatively low or plan for expansion with new java producer
in advance or since inception otherwise running producer code is impacted.

Here is mail chain:
http://mail-archives.apache.org/mod_mbox/kafka-dev/201411.mbox/%3ccaoejijit4cgry97dgzfjkfvaqfduv-o1x1kafefbshgirkm...@mail.gmail.com%3E

Thanks,

Bhavesh

On Mon, Nov 10, 2014 at 5:20 AM, Shlomi Hazan  wrote:

> Hmmm..
> The Java producer example seems to ignore added partitions too...
> How can I auto refresh keyed producers to use new partitions as these
> partitions are added?
>
>
> On Mon, Nov 10, 2014 at 12:33 PM, Shlomi Hazan  wrote:
>
> > One more thing:
> > I saw that the Python client is also unaffected by addition of partitions
> > to a topic and that it continues to send requests only to the old
> > partitions.
> > is this also handled appropriately by the Java producer? Will he see the
> > change and produce to the new partitions as well?
> > Shlomi
> >
> > On Mon, Nov 10, 2014 at 9:34 AM, Shlomi Hazan  wrote:
> >
> >> No I don't see anything like that, the question was aimed at learning if
> >> it is worthwhile to make the effort of reimplementing the Python
> producer
> >> in Java, I so I will not make all the effort just to be disappointed
> >> afterwards.
> >> understand I have nothing to worry about, so I will try to simulate this
> >> situation in small scale...
> >> maybe 3 brokers, one topic with one partition and then add partitions.
> >> we'll see.
> >> thanks for clarifying.
> >> Oh, Good luck with Confluent!!
> >> :)
> >>
> >> On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede  >
> >> wrote:
> >>
> >>> The producer might get an error code if the leader of the partitions
> >>> being
> >>> reassigned also changes. However it should retry and succeed. Do you
> see
> >>> a
> >>> behavior that suggests otherwise?
> >>>
> >>> On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan 
> wrote:
> >>>
> >>> > Hi All,
> >>> > I recently had an issue producing from python where expanding a
> cluster
> >>> > from 3 to 5 nodes and reassigning partitions forced me to restart the
> >>> > producer b/c of KeyError thrown.
> >>> > Is this situation handled by the Java producer automatically or need
> I
> >>> do
> >>> > something to have the java producer refresh itself to see the
> >>> reassigned
> >>> > partition layout and produce away ?
> >>> > Shlomi
> >>> >
> >>>
> >>
> >>
> >
>


Re: Error in fetch Name. How to recover broken node?

2014-11-10 Thread Guozhang Wang
You do not need to delete the data folder, I think "file handles" here are
mostly due to socket leaks, i.e. network socket file handlers, not disk
file handlers. Just restart the broker should do the work.

Guozhang

On Mon, Nov 10, 2014 at 7:47 AM, Marco  wrote:

> We're using kafka 0.8.1.1.
>
> About network partition, it is an option.
> now i'm just wondering if deleting the data folder on the second node will
> at least have it come up again.
>
> i think another guy tried a kafka-reassign-partitions just before it all
> blew up.
>
>
> Il Lunedì 10 Novembre 2014 16:36, Guozhang Wang  ha
> scritto:
> Hi Marco,
>
> The fetch error comes from "UnresolvedAddressException", could you try to
> check if you have a network partition issue during that time?
>
> As for the "Too many file handlers", I think this is due to not properly
> handling such exceptions that it does not close the socket in time, which
> version of Kafka are you using?
>
> Guozhang
>
>
>
>
> On Mon, Nov 10, 2014 at 6:08 AM, Marco  wrote:
>
> > Hi,
> > i've got a 2-machine kafka cluster. For some reasons after a restart the
> > second node won't start.
> > i get tons of "Error in fetch Name" until I get a final "Too many open
> > files".
> >
> > How do i start dealing with this?
> >
> > thanks
> >
> > this is the error
> >
> > [2014-11-10 14:48:01,169] INFO [Kafka Server 2], started
> > (kafka.server.KafkaServer)
> > [2014-11-10 14:48:01,378] INFO [ReplicaFetcherManager on broker 2]
> Removed
> > fetcher for partitions
> > [news,3],[test,0],[test,2],[news,1],[test3,1],[test3,3]
> > (kafka.server.ReplicaFetcherManager)
> > [2014-11-10 14:48:01,459] INFO Truncating log news-3 to offset 249.
> > (kafka.log.Log)
> > [2014-11-10 14:48:01,462] INFO Truncating log test-0 to offset 0.
> > (kafka.log.Log)
> > [2014-11-10 14:48:01,462] INFO Truncating log test-2 to offset 0.
> > (kafka.log.Log)
> > [2014-11-10 14:48:01,463] INFO Truncating log news-1 to offset 268.
> > (kafka.log.Log)
> > [2014-11-10 14:48:01,464] INFO Truncating log test3-1 to offset 0.
> > (kafka.log.Log)
> > [2014-11-10 14:48:01,464] INFO Truncating log test3-3 to offset 0.
> > (kafka.log.Log)
> > [2014-11-10 14:48:01,530] INFO [ReplicaFetcherThread-0-1], Starting
> > (kafka.server.ReplicaFetcherThread)
> > [2014-11-10 14:48:01,535] INFO [ReplicaFetcherManager on broker 2] Added
> > fetcher for partitions ArrayBuffer([[news,3], initOffset 249 to broker
> > id:1,host:machine1,port:9092] , [[news,1], initOffset 268 to broker
> > id:1,host:machine1,port:9092] ) (kafka.server.ReplicaFetcherManager)
> > [2014-11-10 14:48:01,551] ERROR [ReplicaFetcherThread-0-1], Error in
> fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [news,3] -> PartitionFetchInfo(249,1048576),[news,1] ->
> > PartitionFetchInfo(268,1048576) (kafka.server.ReplicaFetcherThread)
> > java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:127)
> > ...
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: powered by kafka

2014-11-10 Thread Joe Stein
Cool, updated, thanks!!!

On Mon, Nov 10, 2014 at 7:57 AM, Andrew Otto  wrote:

> Oo, add us too!
>
> The Wikimedia Foundation (http://wikimediafoundation.org/wiki/Our_projects)
> uses Kafka as a log transport for analytics data from production webservers
> and applications.  This data is consumed into Hadoop using Camus and to
> other processors of analytics data.
>
>
>
>
>
> > On Nov 10, 2014, at 9:56 AM, Yann Schwartz 
> wrote:
> >
> > Hello,
> >
> > I'd just chime in then. Criteo ( http://www.criteo.com ) has been using
> kafka for over a year for stream processing and log transfer (over 2M
> messages/s and growing)
> >
> > Yann.
> > 
> > From: Joe Stein 
> > Sent: Monday, November 10, 2014 2:03 PM
> > To: users@kafka.apache.org
> > Subject: Re: powered by kafka
> >
> > I added both Cityzen Data  and Exoscale to the Wiki. Please feel free to
> > edit and expand on how more. If you need edit permission let me know.
> >
> > /***
> > Joe Stein
> > Founder, Principal Consultant
> > Big Data Open Source Security LLC
> > http://www.stealth.ly
> > Twitter: @allthingshadoop 
> > /
> >
>
>


Re: powered by kafka

2014-11-10 Thread Andrew Otto
Oo, add us too!

The Wikimedia Foundation (http://wikimediafoundation.org/wiki/Our_projects) 
uses Kafka as a log transport for analytics data from production webservers and 
applications.  This data is consumed into Hadoop using Camus and to other 
processors of analytics data.





> On Nov 10, 2014, at 9:56 AM, Yann Schwartz  wrote:
> 
> Hello,
> 
> I'd just chime in then. Criteo ( http://www.criteo.com ) has been using kafka 
> for over a year for stream processing and log transfer (over 2M messages/s 
> and growing)  
> 
> Yann.
> 
> From: Joe Stein 
> Sent: Monday, November 10, 2014 2:03 PM
> To: users@kafka.apache.org
> Subject: Re: powered by kafka
> 
> I added both Cityzen Data  and Exoscale to the Wiki. Please feel free to
> edit and expand on how more. If you need edit permission let me know.
> 
> /***
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop 
> /
> 



Re: Error in fetch Name. How to recover broken node?

2014-11-10 Thread Marco
We're using kafka 0.8.1.1.

About network partition, it is an option.
now i'm just wondering if deleting the data folder on the second node will at 
least have it come up again.

i think another guy tried a kafka-reassign-partitions just before it all blew 
up.


Il Lunedì 10 Novembre 2014 16:36, Guozhang Wang  ha scritto:
Hi Marco,

The fetch error comes from "UnresolvedAddressException", could you try to
check if you have a network partition issue during that time?

As for the "Too many file handlers", I think this is due to not properly
handling such exceptions that it does not close the socket in time, which
version of Kafka are you using?

Guozhang




On Mon, Nov 10, 2014 at 6:08 AM, Marco  wrote:

> Hi,
> i've got a 2-machine kafka cluster. For some reasons after a restart the
> second node won't start.
> i get tons of "Error in fetch Name" until I get a final "Too many open
> files".
>
> How do i start dealing with this?
>
> thanks
>
> this is the error
>
> [2014-11-10 14:48:01,169] INFO [Kafka Server 2], started
> (kafka.server.KafkaServer)
> [2014-11-10 14:48:01,378] INFO [ReplicaFetcherManager on broker 2] Removed
> fetcher for partitions
> [news,3],[test,0],[test,2],[news,1],[test3,1],[test3,3]
> (kafka.server.ReplicaFetcherManager)
> [2014-11-10 14:48:01,459] INFO Truncating log news-3 to offset 249.
> (kafka.log.Log)
> [2014-11-10 14:48:01,462] INFO Truncating log test-0 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,462] INFO Truncating log test-2 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,463] INFO Truncating log news-1 to offset 268.
> (kafka.log.Log)
> [2014-11-10 14:48:01,464] INFO Truncating log test3-1 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,464] INFO Truncating log test3-3 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,530] INFO [ReplicaFetcherThread-0-1], Starting
> (kafka.server.ReplicaFetcherThread)
> [2014-11-10 14:48:01,535] INFO [ReplicaFetcherManager on broker 2] Added
> fetcher for partitions ArrayBuffer([[news,3], initOffset 249 to broker
> id:1,host:machine1,port:9092] , [[news,1], initOffset 268 to broker
> id:1,host:machine1,port:9092] ) (kafka.server.ReplicaFetcherManager)
> [2014-11-10 14:48:01,551] ERROR [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [news,3] -> PartitionFetchInfo(249,1048576),[news,1] ->
> PartitionFetchInfo(268,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:127)
> ...
>



-- 
-- Guozhang


Re: One question about "New Producer Configs"

2014-11-10 Thread Guozhang Wang
Just some additions to Chia-Chun's response: each topic can have multiple
partitions and each partition can be replicated as multiple replicas on
different machines, acks = n means that the data sent to a particular
partition has been replicated to at least n replicas.

Guozhang

On Sun, Nov 9, 2014 at 11:27 PM, Chia-Chun Shih 
wrote:

> Dear Genlong,
>
> "New Producer Configs" is for upcoming versions. If you are using 0.8.1,
> please refer to "Producer Configs".
>
> acks=n, in which n means number of in-sync replicas, not number of
> partitions.
>
> regards,
> Chia-Chun
>
> 2014-11-10 10:51 GMT+08:00 hi <995174...@qq.com>:
>
> > Dear sir or madam,
> >
> > There is one question when I'm using Kafka:
> > From the "documentation:New Producer Configs" ,I get that acks=n
> means
> > the number of acknowledgments the producer requires the leader to have
> > received before considering a request complete. But why can the producer
> > send messages successfully when I use the  Producer API‍ and  the acks is
> > set to 2 (the number of partitions is 3 but there is only one partition‍
> > can work)?
> >
> > Thanks a lot and look forward to hearing from you soon.
> >
> > Best Regards,
> >
> > Genlong Wang
> > ‍
> >
> > ‍‍‍
>



-- 
-- Guozhang


Re: Error in fetch Name. How to recover broken node?

2014-11-10 Thread Guozhang Wang
Hi Marco,

The fetch error comes from "UnresolvedAddressException", could you try to
check if you have a network partition issue during that time?

As for the "Too many file handlers", I think this is due to not properly
handling such exceptions that it does not close the socket in time, which
version of Kafka are you using?

Guozhang

On Mon, Nov 10, 2014 at 6:08 AM, Marco  wrote:

> Hi,
> i've got a 2-machine kafka cluster. For some reasons after a restart the
> second node won't start.
> i get tons of "Error in fetch Name" until I get a final "Too many open
> files".
>
> How do i start dealing with this?
>
> thanks
>
> this is the error
>
> [2014-11-10 14:48:01,169] INFO [Kafka Server 2], started
> (kafka.server.KafkaServer)
> [2014-11-10 14:48:01,378] INFO [ReplicaFetcherManager on broker 2] Removed
> fetcher for partitions
> [news,3],[test,0],[test,2],[news,1],[test3,1],[test3,3]
> (kafka.server.ReplicaFetcherManager)
> [2014-11-10 14:48:01,459] INFO Truncating log news-3 to offset 249.
> (kafka.log.Log)
> [2014-11-10 14:48:01,462] INFO Truncating log test-0 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,462] INFO Truncating log test-2 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,463] INFO Truncating log news-1 to offset 268.
> (kafka.log.Log)
> [2014-11-10 14:48:01,464] INFO Truncating log test3-1 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,464] INFO Truncating log test3-3 to offset 0.
> (kafka.log.Log)
> [2014-11-10 14:48:01,530] INFO [ReplicaFetcherThread-0-1], Starting
> (kafka.server.ReplicaFetcherThread)
> [2014-11-10 14:48:01,535] INFO [ReplicaFetcherManager on broker 2] Added
> fetcher for partitions ArrayBuffer([[news,3], initOffset 249 to broker
> id:1,host:machine1,port:9092] , [[news,1], initOffset 268 to broker
> id:1,host:machine1,port:9092] ) (kafka.server.ReplicaFetcherManager)
> [2014-11-10 14:48:01,551] ERROR [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [news,3] -> PartitionFetchInfo(249,1048576),[news,1] ->
> PartitionFetchInfo(268,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:127)
> ...
>



-- 
-- Guozhang


Re: powered by kafka

2014-11-10 Thread Joe Stein
Awesome! updated, thanks!

On Mon, Nov 10, 2014 at 6:56 AM, Yann Schwartz 
wrote:

> Hello,
>
> I'd just chime in then. Criteo ( http://www.criteo.com ) has been using
> kafka for over a year for stream processing and log transfer (over 2M
> messages/s and growing)
>
> Yann.
> 
> From: Joe Stein 
> Sent: Monday, November 10, 2014 2:03 PM
> To: users@kafka.apache.org
> Subject: Re: powered by kafka
>
> I added both Cityzen Data  and Exoscale to the Wiki. Please feel free to
> edit and expand on how more. If you need edit permission let me know.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
>


RE: powered by kafka

2014-11-10 Thread Yann Schwartz
Hello,

I'd just chime in then. Criteo ( http://www.criteo.com ) has been using kafka 
for over a year for stream processing and log transfer (over 2M messages/s and 
growing)  

Yann.

From: Joe Stein 
Sent: Monday, November 10, 2014 2:03 PM
To: users@kafka.apache.org
Subject: Re: powered by kafka

I added both Cityzen Data  and Exoscale to the Wiki. Please feel free to
edit and expand on how more. If you need edit permission let me know.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/



Error in fetch Name. How to recover broken node?

2014-11-10 Thread Marco
Hi,
i've got a 2-machine kafka cluster. For some reasons after a restart the second 
node won't start.
i get tons of "Error in fetch Name" until I get a final "Too many open files".

How do i start dealing with this?

thanks

this is the error

[2014-11-10 14:48:01,169] INFO [Kafka Server 2], started 
(kafka.server.KafkaServer)
[2014-11-10 14:48:01,378] INFO [ReplicaFetcherManager on broker 2] Removed 
fetcher for partitions [news,3],[test,0],[test,2],[news,1],[test3,1],[test3,3] 
(kafka.server.ReplicaFetcherManager)
[2014-11-10 14:48:01,459] INFO Truncating log news-3 to offset 249. 
(kafka.log.Log)
[2014-11-10 14:48:01,462] INFO Truncating log test-0 to offset 0. 
(kafka.log.Log)
[2014-11-10 14:48:01,462] INFO Truncating log test-2 to offset 0. 
(kafka.log.Log)
[2014-11-10 14:48:01,463] INFO Truncating log news-1 to offset 268. 
(kafka.log.Log)
[2014-11-10 14:48:01,464] INFO Truncating log test3-1 to offset 0. 
(kafka.log.Log)
[2014-11-10 14:48:01,464] INFO Truncating log test3-3 to offset 0. 
(kafka.log.Log)
[2014-11-10 14:48:01,530] INFO [ReplicaFetcherThread-0-1], Starting  
(kafka.server.ReplicaFetcherThread)
[2014-11-10 14:48:01,535] INFO [ReplicaFetcherManager on broker 2] Added 
fetcher for partitions ArrayBuffer([[news,3], initOffset 249 to broker 
id:1,host:machine1,port:9092] , [[news,1], initOffset 268 to broker 
id:1,host:machine1,port:9092] ) (kafka.server.ReplicaFetcherManager)
[2014-11-10 14:48:01,551] ERROR [ReplicaFetcherThread-0-1], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId: 
ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [news,3] -> PartitionFetchInfo(249,1048576),[news,1] -> 
PartitionFetchInfo(268,1048576) (kafka.server.ReplicaFetcherThread)
java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:127)
...


Re: expanding cluster and reassigning parititions without restarting producer

2014-11-10 Thread Shlomi Hazan
Hmmm..
The Java producer example seems to ignore added partitions too...
How can I auto refresh keyed producers to use new partitions as these
partitions are added?


On Mon, Nov 10, 2014 at 12:33 PM, Shlomi Hazan  wrote:

> One more thing:
> I saw that the Python client is also unaffected by addition of partitions
> to a topic and that it continues to send requests only to the old
> partitions.
> is this also handled appropriately by the Java producer? Will he see the
> change and produce to the new partitions as well?
> Shlomi
>
> On Mon, Nov 10, 2014 at 9:34 AM, Shlomi Hazan  wrote:
>
>> No I don't see anything like that, the question was aimed at learning if
>> it is worthwhile to make the effort of reimplementing the Python producer
>> in Java, I so I will not make all the effort just to be disappointed
>> afterwards.
>> understand I have nothing to worry about, so I will try to simulate this
>> situation in small scale...
>> maybe 3 brokers, one topic with one partition and then add partitions.
>> we'll see.
>> thanks for clarifying.
>> Oh, Good luck with Confluent!!
>> :)
>>
>> On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede 
>> wrote:
>>
>>> The producer might get an error code if the leader of the partitions
>>> being
>>> reassigned also changes. However it should retry and succeed. Do you see
>>> a
>>> behavior that suggests otherwise?
>>>
>>> On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan  wrote:
>>>
>>> > Hi All,
>>> > I recently had an issue producing from python where expanding a cluster
>>> > from 3 to 5 nodes and reassigning partitions forced me to restart the
>>> > producer b/c of KeyError thrown.
>>> > Is this situation handled by the Java producer automatically or need I
>>> do
>>> > something to have the java producer refresh itself to see the
>>> reassigned
>>> > partition layout and produce away ?
>>> > Shlomi
>>> >
>>>
>>
>>
>


Re: powered by kafka

2014-11-10 Thread Joe Stein
I added both Cityzen Data  and Exoscale to the Wiki. Please feel free to
edit and expand on how more. If you need edit permission let me know.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/

On Mon, Nov 10, 2014 at 3:02 AM, Mathias Herberts <
mathias.herbe...@gmail.com> wrote:

> Hi there,
>
> my company Cityzen Data uses Kafka as well, we provide a paltform for
> collecting, storing and analyzing machine data.
>
> http://www.cityzendata.com/
> @CityzenData
>
> Mathias.
>
> On Mon, Nov 10, 2014 at 11:57 AM, Pierre-Yves Ritschard
>  wrote:
> > I guess I should mention that exoscale (https://exoscale.ch) is powered
> by
> > kafka as well.
> >
> > Cheers,
> >- pyr
> >
> > On Sun, Nov 9, 2014 at 7:36 PM, Gwen Shapira 
> wrote:
> >
> >> I'm not Jay, but fixed it anyways ;)
> >>
> >> Gwen
> >>
> >> On Sun, Nov 9, 2014 at 10:34 AM, vipul jhawar 
> >> wrote:
> >>
> >> > Hi Jay
> >> >
> >> > Thanks for posting the update.
> >> >
> >> > However, i checked the page history and the hyperlink is pointing to
> the
> >> > wrong domain.
> >> > Exponential refers to www.exponential.com. I sent the twitter handle,
> >> > should have sent the domain.
> >> > Please correct.
> >> >
> >> > Thanks
> >> >
> >> > On Sat, Nov 8, 2014 at 3:45 PM, vipul jhawar 
> >> > wrote:
> >> >
> >> > > Exponential @exponentialinc is using kafka in production to power
> the
> >> > > events ingestion pipeline for real time analytics and log feed
> >> > consumption.
> >> > >
> >> > > Please post on powered by kafka wiki -
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
> >> > >
> >> > > Thanks
> >> > > Vipul
> >> > > http://in.linkedin.com/in/vjhawar/
> >> > >
> >> >
> >>
>


Re: powered by kafka

2014-11-10 Thread Mathias Herberts
Hi there,

my company Cityzen Data uses Kafka as well, we provide a paltform for
collecting, storing and analyzing machine data.

http://www.cityzendata.com/
@CityzenData

Mathias.

On Mon, Nov 10, 2014 at 11:57 AM, Pierre-Yves Ritschard
 wrote:
> I guess I should mention that exoscale (https://exoscale.ch) is powered by
> kafka as well.
>
> Cheers,
>- pyr
>
> On Sun, Nov 9, 2014 at 7:36 PM, Gwen Shapira  wrote:
>
>> I'm not Jay, but fixed it anyways ;)
>>
>> Gwen
>>
>> On Sun, Nov 9, 2014 at 10:34 AM, vipul jhawar 
>> wrote:
>>
>> > Hi Jay
>> >
>> > Thanks for posting the update.
>> >
>> > However, i checked the page history and the hyperlink is pointing to the
>> > wrong domain.
>> > Exponential refers to www.exponential.com. I sent the twitter handle,
>> > should have sent the domain.
>> > Please correct.
>> >
>> > Thanks
>> >
>> > On Sat, Nov 8, 2014 at 3:45 PM, vipul jhawar 
>> > wrote:
>> >
>> > > Exponential @exponentialinc is using kafka in production to power the
>> > > events ingestion pipeline for real time analytics and log feed
>> > consumption.
>> > >
>> > > Please post on powered by kafka wiki -
>> > > https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
>> > >
>> > > Thanks
>> > > Vipul
>> > > http://in.linkedin.com/in/vjhawar/
>> > >
>> >
>>



Re: powered by kafka

2014-11-10 Thread Pierre-Yves Ritschard
I guess I should mention that exoscale (https://exoscale.ch) is powered by
kafka as well.

Cheers,
   - pyr

On Sun, Nov 9, 2014 at 7:36 PM, Gwen Shapira  wrote:

> I'm not Jay, but fixed it anyways ;)
>
> Gwen
>
> On Sun, Nov 9, 2014 at 10:34 AM, vipul jhawar 
> wrote:
>
> > Hi Jay
> >
> > Thanks for posting the update.
> >
> > However, i checked the page history and the hyperlink is pointing to the
> > wrong domain.
> > Exponential refers to www.exponential.com. I sent the twitter handle,
> > should have sent the domain.
> > Please correct.
> >
> > Thanks
> >
> > On Sat, Nov 8, 2014 at 3:45 PM, vipul jhawar 
> > wrote:
> >
> > > Exponential @exponentialinc is using kafka in production to power the
> > > events ingestion pipeline for real time analytics and log feed
> > consumption.
> > >
> > > Please post on powered by kafka wiki -
> > > https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
> > >
> > > Thanks
> > > Vipul
> > > http://in.linkedin.com/in/vjhawar/
> > >
> >
>


Re: expanding cluster and reassigning parititions without restarting producer

2014-11-10 Thread Shlomi Hazan
One more thing:
I saw that the Python client is also unaffected by addition of partitions
to a topic and that it continues to send requests only to the old
partitions.
is this also handled appropriately by the Java producer? Will he see the
change and produce to the new partitions as well?
Shlomi

On Mon, Nov 10, 2014 at 9:34 AM, Shlomi Hazan  wrote:

> No I don't see anything like that, the question was aimed at learning if
> it is worthwhile to make the effort of reimplementing the Python producer
> in Java, I so I will not make all the effort just to be disappointed
> afterwards.
> understand I have nothing to worry about, so I will try to simulate this
> situation in small scale...
> maybe 3 brokers, one topic with one partition and then add partitions.
> we'll see.
> thanks for clarifying.
> Oh, Good luck with Confluent!!
> :)
>
> On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede 
> wrote:
>
>> The producer might get an error code if the leader of the partitions being
>> reassigned also changes. However it should retry and succeed. Do you see a
>> behavior that suggests otherwise?
>>
>> On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan  wrote:
>>
>> > Hi All,
>> > I recently had an issue producing from python where expanding a cluster
>> > from 3 to 5 nodes and reassigning partitions forced me to restart the
>> > producer b/c of KeyError thrown.
>> > Is this situation handled by the Java producer automatically or need I
>> do
>> > something to have the java producer refresh itself to see the reassigned
>> > partition layout and produce away ?
>> > Shlomi
>> >
>>
>
>