Kafka Connect usage

2016-01-11 Thread Shiti Saxena
Hi,

I tried executing the following,

bin/connect-standalone.sh config/connect-standalone.properties
config/connect-file-source.properties config/connect-console-sink.properties

I created a file text.txt in kafka directory but get the error,

ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out
while waiting for producer to flush outstanding messages, 1 left
({ProducerRecord(topic=connect-test, partition=null, key=[B@71bb594d,
value=[B@7f7ca90a=ProducerRecord(topic=connect-test, partition=null,
key=[B@71bb594d, value=[B@7f7ca90a})
(org.apache.kafka.connect.runtime.WorkerSourceTask:237)
[2016-01-12 11:43:51,948] ERROR Failed to commit offsets for
WorkerSourceTask{id=local-file-source-0}
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)

Is any other configuration required?

Thanks,
Shiti


Stalling behaviour with 0.9 console consumer

2016-01-11 Thread Suyog Rao
Hi, I started with a clean install of 0.9 Kafka broker and populated a test
topic with 1 million messages. I then used the console consumer to read
from beginning offset. Using --new-consumer reads the messages, but it
stalls after every x number of messages or so, and then continues again. It
is very batchy in its behaviour. If I go back to the old consumer, I am
able to stream the messages continuously. Am I missing a timeout setting or
something?

I created my own consumer in Java and call poll(0) in a loop, but I still
get the same behaviour. This is on Mac OS X (yosemite) with java version
"1.8.0_65".

Any ideas?

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
apache_logs --from-beginning --new-consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
apache_logs --from-beginning -zookeeper localhost:2181


Re: fallout from upgrading to the new Kafka producers

2016-01-11 Thread Guozhang Wang
Hi Rajiv,

This warning could be ignored and is indeed done in 0.9.0, where we
downgrade the logging level for it from WARN to DEBUG. So if you upgrade to
0.9.0 Java producer you should not see this warning.

A bit more context on the EOFException, a socket closure could result this;
and a server could actively close a socket under some cases, for example 1)
if it is idle for some time and server would decide to close it based on
the idle management config, or 2) if producer use ack=0 and there is an
error processing the request, so server just close the socket to "notify"
the client, etc.

Guozhang




On Mon, Jan 11, 2016 at 1:08 PM, Rajiv Kurian  wrote:

> We have recently upgraded some of our applications to use the Kafka 0.8.2
> Java producers from the old Java wrappers over Scala producers.
>
> We've noticed these log messages on our application since the upgrade:
>
> 2016-01-11T20:56:43.023Z WARN  [roducer-network-thread | producer-2]
> [s.o.a.kafka.common.network.Selector ] {}: Error in I/O with
> my_kafka_host/some_ip
>
> java.io.EOFException: null
>
> at
>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> ~[kafka_2.10-0.8.2.2.jar:na]
>
> at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> ~[kafka_2.10-0.8.2.2.jar:na]
>
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> [kafka_2.10-0.8.2.2jar:na]
>
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> [kafka_2.10-0.8.2.2.jar:na]
>
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> [kafka_2.10-0.8.2.2.jar:na]
>
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
>
> They don't occur too often and may be harmless but it is pretty alarming to
> see these. It happens with all the brokers we connect to so it doesn't seem
> like a problem with a single broker. Our producer config looks a bit like
> this:
>
> final Properties config = new Properties();
>
>  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> OUR_KAFKA_CONNECT_STRING);
>
>  config.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, false);  // non
> blocking
>
>  config.put(ProducerConfig.BUFFER_MEMORY_CONFIG,  10 * 1024 * 1024);  // 10
> MB
>
>  config.put(ProducerConfig.BATCH_SIZE_CONFIG,  16384);  // 16 KB
>
>  config.put(ProducerConfig.LINGER_MS_CONFIG, 50);  // 50 ms
>
>
> Thanks,
>
> Rajiv
>



-- 
-- Guozhang


Re: error in KafkaConsumer javadocs?

2016-01-11 Thread Jason Gustafson
Hey Richard,

Yeah, I think you're right. I think this is the same issue from KAFKA-2478,
which appears to have been forgotten about. I'll see if we can get the
patch merged.

-Jason

On Mon, Jan 11, 2016 at 4:27 PM, Richard Lee  wrote:

> Apologies if this has been discussed already...
>
> The ‘Manual Offset Control’ section of the KafkaConsumer javadocs has some
> code that looks like this:
>
>  ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  buffer.add(record);
>  if (buffer.size() >= commitInterval) {
>  insertIntoDb(buffer);
>  consumer.commitSync();
>  buffer.clear();
>  }
>  }
>
> From my reading of the docs on commitSync(), this seems like a fairly
> unsafe thing to do.  In particular, the consumer is calling commitSync() in
> the middle of processing a bunch of records.  If it dies immediately after
> that commit, it will pick up processing on restart *after* the remaining
> unprocessed records.
>
> It would seem that this example should be using
>
>  commitSync(java.util.Map
> offsets)
>
> which, of course, would require a bit more bookkeeping of what was the
> last processed offset for each partition of each topic in the current set
> of records.
>
> Or, perhaps this example should be reworked to not be committing offsets
> until after it has processed all the records?
>
> ---
> Richard Lee
> Principal Engineer
> Office of the CTO
> TiVo Inc.
>
>


error in KafkaConsumer javadocs?

2016-01-11 Thread Richard Lee
Apologies if this has been discussed already...

The ‘Manual Offset Control’ section of the KafkaConsumer javadocs has some code 
that looks like this:

 ConsumerRecords records = consumer.poll(100);
 for (ConsumerRecord record : records) {
 buffer.add(record);
 if (buffer.size() >= commitInterval) {
 insertIntoDb(buffer);
 consumer.commitSync();
 buffer.clear();
 }
 }

From my reading of the docs on commitSync(), this seems like a fairly unsafe 
thing to do.  In particular, the consumer is calling commitSync() in the middle 
of processing a bunch of records.  If it dies immediately after that commit, it 
will pick up processing on restart *after* the remaining unprocessed records.

It would seem that this example should be using 

 commitSync(java.util.Map offsets)

which, of course, would require a bit more bookkeeping of what was the last 
processed offset for each partition of each topic in the current set of records.

Or, perhaps this example should be reworked to not be committing offsets until 
after it has processed all the records?

---
Richard Lee
Principal Engineer
Office of the CTO
TiVo Inc.



smime.p7s
Description: S/MIME cryptographic signature


Re: Java client 0.9 poll doesn't return

2016-01-11 Thread Jason Gustafson
Looks like you might have bootstrap.servers pointed at Zookeeper. It should
point to the Kafka brokers instead. The behavior of poll() currently is to
block until the group's coordinator is found, but sending the wrong kind of
request to Zookeeper probably results in a server-side disconnect. In that
case, there is never an error to propagate and the consumer keeps on
trying.

-Jason

On Mon, Jan 11, 2016 at 3:24 PM, Gary Struthers 
wrote:

> Calling the stand alone client (org.apache.kafka" % "kafka-clients" %
> “0.9.0.0”)  from Scala, consumer.poll never returns. I’ve tried both assign
> TopicPartition and subscribe and various timeouts and I’ve quintuple
> checked config properties. Here’s a Scala-Ide worksheet
>
>   val props = loadProperties(new StringBuilder("kafkaConsumer.properties"))
>   //> props  :
> java.util.Properties = {key.deserializer=org.apache.kafka.common.se
>   //|
> rialization.StringDeserializer, auto.commit.interval.ms=1000,
> bootstrap.serv
>   //| ers=127.0.0.1:2181,
> enable.auto.commit=true, group.id=test, value.deserializ
>   //|
> er=org.apache.kafka.common.serialization.LongDeserializer, session.timeout.m
>   //| s=3}
>   val topic = "debug-topic"   //> topic  : String =
> debug-topic
>   val topicList = List(topic).asJava  //> topicList  :
> java.util.List[String] = [debug-topic]
>   val consumer = new KafkaConsumer[String, String](props)
>   //> 15:07:22,501 |-INFO
> in ch.qos.logback.classic.LoggerContext[default] - Could
>   //|  NOT find resource
> [logback.groovy]
>   //| 15:07:22,501 |-INFO
> in ch.qos.logback.classic.LoggerContext[default] - Could
>   //|  NOT find resource
> [logback-test.xml]
>   //| 15:07:22,501 |-INFO
> in ch.qos.logback.classic.LoggerContext[default] - Found
>   //|  resource
> [logback.xml] at [file:/Users/garystruthers/git/dendrites/target/s
>   //|
> cala-2.11/classes/logback.xml]
>   //| 15:07:22,502 |-WARN
> in ch.qos.logback.classic.LoggerContext[default] - Resou
>   //| rce [logback.xml]
> occurs multiple times on the classpath.
>   //| 15:07:22,502 |-WARN
> in ch.qos.logback.classic.LoggerContext[default] - Resou
>   //| rce [logback.xml]
> occurs at [file:/Users/garystruthers/git/dendrites/bin/log
>   //| back.xml]
>   //| 15:07:22,502 |-WARN
> in ch.qos.logback.classic.LoggerContext[default] - Resou
>   //| rce [logback.xml]
> occurs at [file:/Users/garystruthers/git/dendrites/target/
>   //|
> scala-2.11/classes/logback.xml]
>   //| 15:07:22,592 |-INFO
> in ch.qos.logback.
>   //| Output exceeds
> cutoff limit.
>   val tp0 = new TopicPartition(topic, 0)  //> tp0  :
> org.apache.kafka.common.TopicPartition = debug-topic-0
>   val topicPartitions = List(tp0).asJava  //> topicPartitions  :
> java.util.List[org.apache.kafka.common.TopicPartition] =
>   //| [debug-topic-0]\
>   consumer.assign(topicPartitions)
>   //consumer.subscribe(topicList)
>  val records = consumer.poll(1000)
>
> Gary


Java client 0.9 poll doesn't return

2016-01-11 Thread Gary Struthers
Calling the stand alone client (org.apache.kafka" % "kafka-clients" % 
“0.9.0.0”)  from Scala, consumer.poll never returns. I’ve tried both assign 
TopicPartition and subscribe and various timeouts and I’ve quintuple checked 
config properties. Here’s a Scala-Ide worksheet

  val props = loadProperties(new StringBuilder("kafkaConsumer.properties"))
  //> props  : 
java.util.Properties = {key.deserializer=org.apache.kafka.common.se
  //| 
rialization.StringDeserializer, auto.commit.interval.ms=1000, bootstrap.serv
  //| ers=127.0.0.1:2181, 
enable.auto.commit=true, group.id=test, value.deserializ
  //| 
er=org.apache.kafka.common.serialization.LongDeserializer, session.timeout.m
  //| s=3}
  val topic = "debug-topic"   //> topic  : String = 
debug-topic
  val topicList = List(topic).asJava  //> topicList  : 
java.util.List[String] = [debug-topic]
  val consumer = new KafkaConsumer[String, String](props)
  //> 15:07:22,501 |-INFO in 
ch.qos.logback.classic.LoggerContext[default] - Could
  //|  NOT find resource 
[logback.groovy]
  //| 15:07:22,501 |-INFO in 
ch.qos.logback.classic.LoggerContext[default] - Could
  //|  NOT find resource 
[logback-test.xml]
  //| 15:07:22,501 |-INFO in 
ch.qos.logback.classic.LoggerContext[default] - Found
  //|  resource [logback.xml] 
at [file:/Users/garystruthers/git/dendrites/target/s
  //| 
cala-2.11/classes/logback.xml]
  //| 15:07:22,502 |-WARN in 
ch.qos.logback.classic.LoggerContext[default] - Resou
  //| rce [logback.xml] occurs 
multiple times on the classpath.
  //| 15:07:22,502 |-WARN in 
ch.qos.logback.classic.LoggerContext[default] - Resou
  //| rce [logback.xml] occurs 
at [file:/Users/garystruthers/git/dendrites/bin/log
  //| back.xml]
  //| 15:07:22,502 |-WARN in 
ch.qos.logback.classic.LoggerContext[default] - Resou
  //| rce [logback.xml] occurs 
at [file:/Users/garystruthers/git/dendrites/target/
  //| 
scala-2.11/classes/logback.xml]
  //| 15:07:22,592 |-INFO in 
ch.qos.logback.
  //| Output exceeds cutoff 
limit.
  val tp0 = new TopicPartition(topic, 0)  //> tp0  : 
org.apache.kafka.common.TopicPartition = debug-topic-0
  val topicPartitions = List(tp0).asJava  //> topicPartitions  : 
java.util.List[org.apache.kafka.common.TopicPartition] = 
  //| [debug-topic-0]\
  consumer.assign(topicPartitions)
  //consumer.subscribe(topicList)
 val records = consumer.poll(1000)

Gary

Re: Kafka 0.9 Consumer Group

2016-01-11 Thread Jason Gustafson
Ah, that actually makes sense. The consumer-groups.sh script only returns
offset data when the group is active. The offsets should still be there,
but it does seem unfortunate that there's no way to view them. We have
KAFKA-3059 which adds some additional capabilities for managing offsets
with this script, so maybe we can extend it to provide this feature as well.

-Jason

On Mon, Jan 11, 2016 at 11:33 AM, Wang, Howard 
wrote:

> Hi Jason,
>
> I used the kafka-consumer-groups.sh to check my consumer group :
> ~/GitHub/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server  name>   ‹group  test.group --describe   --new-consumer .
>
> I ran this command several times after my app was shut down. I always get
> "Consumer group `test.group` does not exist or is rebalancing.² response.
>
>
>
> I did set the enable.auto.commit to false. Below is how I set my
> KafkaConsumer.
>
> Properties props = new Properties();
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> myAppConfig.getKafkaBroker());
> props.put(ConsumerConfig.GROUP_ID_CONFIG,
> myAppConfig.getKafkaConsumerGroup());
> props.put(ConsumerConfig.CLIENT_ID_CONFIG,
> myAppConfig.getRandomNodeId());
> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
> myAppConfig.getKafkaConsumerSessionTimeout());
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer");
> consumer = new KafkaConsumer<>(props);
>
>
>
>
> Thanks.
>
> Howard
>
> On 1/11/16, 12:55 PM, "Jason Gustafson"  wrote:
>
> >Sorry, wrong property, I meant enable.auto.commit.
> >
> >-Jason
> >
> >On Mon, Jan 11, 2016 at 9:52 AM, Jason Gustafson 
> >wrote:
> >
> >> Hi Howard,
> >>
> >> The offsets are persisted in the __consumer_offsets topic indefinitely.
> >> Since you're using manual commit, have you ensured that
> >>auto.offset.reset
> >> is disabled? It might also help if you provide a little more detail on
> >>how
> >> you're verifying that offsets were lost.
> >>
> >> -Jason
> >>
> >> On Mon, Jan 11, 2016 at 7:42 AM, Wang, Howard  >
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I have a question regarding the Kafka 0.9 Consumer Group . I manually
> >>> commit offsets using the  Kafka 0.9 Consumer created with a consumer
> >>>group.
> >>>
> >>> However, after my app restarted totally from scratch, the consumer
> >>>group
> >>> seems to lose all the offsets. Is that true that the consumer offsets
> >>>are
> >>> transient and will be gone after the consumer group has no member and
> >>>gets
> >>> deleted?
> >>>
> >>> Thanks.
> >>>
> >>> Howard
> >>> --
> >>>  Howard Wang
> >>> Engineering - Big Data and Personalization
> >>> Washington Post Media
> >>>
> >>> 1150 15th St NW, Washington, DC 20071
> >>> p. 202-334-9195
> >>> Email: howard.w...@washpost.com
> >>>
> >>
> >>
>
>


fallout from upgrading to the new Kafka producers

2016-01-11 Thread Rajiv Kurian
We have recently upgraded some of our applications to use the Kafka 0.8.2
Java producers from the old Java wrappers over Scala producers.

We've noticed these log messages on our application since the upgrade:

2016-01-11T20:56:43.023Z WARN  [roducer-network-thread | producer-2]
[s.o.a.kafka.common.network.Selector ] {}: Error in I/O with
my_kafka_host/some_ip

java.io.EOFException: null

at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
~[kafka_2.10-0.8.2.2.jar:na]

at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
~[kafka_2.10-0.8.2.2.jar:na]

at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
[kafka_2.10-0.8.2.2jar:na]

at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
[kafka_2.10-0.8.2.2.jar:na]

at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
[kafka_2.10-0.8.2.2.jar:na]

at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]

They don't occur too often and may be harmless but it is pretty alarming to
see these. It happens with all the brokers we connect to so it doesn't seem
like a problem with a single broker. Our producer config looks a bit like
this:

final Properties config = new Properties();

 config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
OUR_KAFKA_CONNECT_STRING);

 config.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, false);  // non
blocking

 config.put(ProducerConfig.BUFFER_MEMORY_CONFIG,  10 * 1024 * 1024);  // 10
MB

 config.put(ProducerConfig.BATCH_SIZE_CONFIG,  16384);  // 16 KB

 config.put(ProducerConfig.LINGER_MS_CONFIG, 50);  // 50 ms


Thanks,

Rajiv


Re: kafka-producer-perf-test.sh - 0.8.2.1

2016-01-11 Thread Andrej Vladimirovich
Thanks! I also found old thread and Ewen did reply to that. So I will just
re-post so somebody else might find it helpful:

Ewen:
...
EndToEndLatency works with a single message at a time. It produces the
message then waits for the consumer to receive it. This approach guarantees
there is no delay due to queuing. The goal with this test is to evaluate
the *minimum* *latency*.

*ProducerPerformance* focuses on achieving maximum throughput. This means it
will enqueue lots of records so it will always have more data to send (and
can use batching to increase the throughput). Unlike EndToEndLatency, this
means records may just sit in a queue on the producer for awhile because
the maximum number of in flight requests has been reached and it needs to
wait for responses for those requests. Since EndToEndLatency only ever has
one record outstanding, it will never encounter this case.

...


Now it does make sense to me.

Thanks for your help!

Andrew

On Mon, Jan 11, 2016 at 1:31 PM, Jay Kreps  wrote:

> If you use the perf test without any bound on throughput it will
> always try to send data faster than it can go out and build up a queue
> of unsent data. So e.g. if your buffer is 1MB each send will be
> blocked on waiting for the full 1MB of queued data to clear out and
> get sent. This makes sense if you think about it.
>
> If you want to test latency under load you need to throttle the
> maximum throughput to something like what you think you would see in
> your application (there is an option for that in the command line
> options).
>
> -Jay
>
> On Mon, Jan 11, 2016 at 11:02 AM, Andrej Vladimirovich
>  wrote:
> > Ewen,
> >
> > One more question. I mentioned that *kafka-run-class.sh
> > org.apache.kafka.clients.tools.ProducerPerformance* latency is a lot
> higher
> > than *kafka-run-class.sh kafka.tools.TestEndToEndLatency.*
> >
> > Example:
> >
> > *ProducerPerformance:*
> >
> > 5000 records sent, 337463.891364 records/sec (32.18 MB/sec), *1548.51
> > ms avg latency*, 3186.00 ms max latency, 2478 ms 50th, 3071 ms 95th, 3118
> > ms 99th, 3179 ms 99.9th.
> >
> > *TestEndToEndLatency:*
> >
> > Percentiles: 50th = 8, 99th = 9, 99.9th = 20
> >
> > So 1548.51 ms vs 9 ms.Huge difference.
> >
> > I am using the same cluster, same server and same topic to run both
> tests.
> > It does not make any sense to me why would End to End be so low and
> > Producer to Kafka is so large?
> >
> > I did some research online and found other people having the same
> question
> > without any responses.
> >
> > Thanks a lot for your help!
> >
> > Andrew
> >
> > On Fri, Jan 8, 2016 at 5:44 PM, Ewen Cheslack-Postava  >
> > wrote:
> >
> >> It is single threaded in the sense that you can not request that
> multiple
> >> threads be used to call producer.send(). However, the producer has its
> own
> >> internal thread for doing network IO. When you have such a simple
> producer,
> >> depending on the size of messages you can saturate a 1Gbps link with a
> >> single thread, so usually using more threads isn't much help. If you
> still
> >> need more throughput, you can just use more processes.
> >>
> >> -Ewen
> >>
> >> On Fri, Jan 8, 2016 at 1:24 PM, Andrej Vladimirovich <
> >> udodizdu...@gmail.com>
> >> wrote:
> >>
> >> > Thanks Ewen. Do you know if kafka-run-class.sh
> >> > org.apache.kafka.clients.tools.ProducerPerformance
> >> > is single threaded? Or is there any way to specify number of threads?
> >> >
> >> > On Fri, Jan 8, 2016 at 1:24 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> >> >
> >> > wrote:
> >> >
> >> > > Ah, sorry, I missed the version number in your title. I think this
> tool
> >> > saw
> >> > > some rearrangement in 0.9.0 and I was looking at the latest version.
> >> > > Unfortunately it doesn't look like the old
> >> > kafka.tools.ProducerPerformance
> >> > > that is used in kafka-producer-perf-test.sh in 0.8.2.1 supports
> passing
> >> > in
> >> > > additional properties.
> >> > >
> >> > > -Ewen
> >> > >
> >> > > On Fri, Jan 8, 2016 at 9:10 AM, Andrej Vladimirovich <
> >> > > udodizdu...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Ewen,
> >> > > >
> >> > > > I tried that before like this:
> >> > > >
> >> > > > ./kafka-producer-perf-test.sh --broker-list test:9092 --topics
> >> test8-3
> >> > > > --messages 200 --new-producer --message-size 200
> >> > > > --show-detailed-stats max.request.size=1000
> >> > > >
> >> > > > and it does not work. It comletly ignore this option.
> >> > > >
> >> > > > And --producer-props is not a valid option for
> >> > > kafka-producer-perf-test.sh.
> >> > > > Maybe it is not the right syntax? But I tried a lot of different
> ways
> >> > and
> >> > > > have yet to find the right one.
> >> > > >
> >> > > > Thanks!
> >> > > >
> >> > > > Andrew
> >> > > >
> >> > > > On Fri, Jan 8, 2016 at 10:54 AM, Ewen Cheslack-Postava <
> >> > > e...@confluent.io>
> >> > > > wrote:
> >> > > >
> >> > > > > Andrew,
> >> > > > >
> >> > > > > kafka-producer-perf-test.sh is just a wrapper a

Re: Kafka 0.9 Consumer Group

2016-01-11 Thread Wang, Howard
Hi Jason,

I used the kafka-consumer-groups.sh to check my consumer group :
~/GitHub/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server‹group  test.group --describe   --new-consumer .

I ran this command several times after my app was shut down. I always get
"Consumer group `test.group` does not exist or is rebalancing.² response.



I did set the enable.auto.commit to false. Below is how I set my
KafkaConsumer. 

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
myAppConfig.getKafkaBroker());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
myAppConfig.getKafkaConsumerGroup());
props.put(ConsumerConfig.CLIENT_ID_CONFIG,
myAppConfig.getRandomNodeId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
myAppConfig.getKafkaConsumerSessionTimeout());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);




Thanks.

Howard

On 1/11/16, 12:55 PM, "Jason Gustafson"  wrote:

>Sorry, wrong property, I meant enable.auto.commit.
>
>-Jason
>
>On Mon, Jan 11, 2016 at 9:52 AM, Jason Gustafson 
>wrote:
>
>> Hi Howard,
>>
>> The offsets are persisted in the __consumer_offsets topic indefinitely.
>> Since you're using manual commit, have you ensured that
>>auto.offset.reset
>> is disabled? It might also help if you provide a little more detail on
>>how
>> you're verifying that offsets were lost.
>>
>> -Jason
>>
>> On Mon, Jan 11, 2016 at 7:42 AM, Wang, Howard 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a question regarding the Kafka 0.9 Consumer Group . I manually
>>> commit offsets using the  Kafka 0.9 Consumer created with a consumer
>>>group.
>>>
>>> However, after my app restarted totally from scratch, the consumer
>>>group
>>> seems to lose all the offsets. Is that true that the consumer offsets
>>>are
>>> transient and will be gone after the consumer group has no member and
>>>gets
>>> deleted?
>>>
>>> Thanks.
>>>
>>> Howard
>>> --
>>>  Howard Wang
>>> Engineering - Big Data and Personalization
>>> Washington Post Media
>>>
>>> 1150 15th St NW, Washington, DC 20071
>>> p. 202-334-9195
>>> Email: howard.w...@washpost.com
>>>
>>
>>



Re: kafka-producer-perf-test.sh - 0.8.2.1

2016-01-11 Thread Jay Kreps
If you use the perf test without any bound on throughput it will
always try to send data faster than it can go out and build up a queue
of unsent data. So e.g. if your buffer is 1MB each send will be
blocked on waiting for the full 1MB of queued data to clear out and
get sent. This makes sense if you think about it.

If you want to test latency under load you need to throttle the
maximum throughput to something like what you think you would see in
your application (there is an option for that in the command line
options).

-Jay

On Mon, Jan 11, 2016 at 11:02 AM, Andrej Vladimirovich
 wrote:
> Ewen,
>
> One more question. I mentioned that *kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance* latency is a lot higher
> than *kafka-run-class.sh kafka.tools.TestEndToEndLatency.*
>
> Example:
>
> *ProducerPerformance:*
>
> 5000 records sent, 337463.891364 records/sec (32.18 MB/sec), *1548.51
> ms avg latency*, 3186.00 ms max latency, 2478 ms 50th, 3071 ms 95th, 3118
> ms 99th, 3179 ms 99.9th.
>
> *TestEndToEndLatency:*
>
> Percentiles: 50th = 8, 99th = 9, 99.9th = 20
>
> So 1548.51 ms vs 9 ms.Huge difference.
>
> I am using the same cluster, same server and same topic to run both tests.
> It does not make any sense to me why would End to End be so low and
> Producer to Kafka is so large?
>
> I did some research online and found other people having the same question
> without any responses.
>
> Thanks a lot for your help!
>
> Andrew
>
> On Fri, Jan 8, 2016 at 5:44 PM, Ewen Cheslack-Postava 
> wrote:
>
>> It is single threaded in the sense that you can not request that multiple
>> threads be used to call producer.send(). However, the producer has its own
>> internal thread for doing network IO. When you have such a simple producer,
>> depending on the size of messages you can saturate a 1Gbps link with a
>> single thread, so usually using more threads isn't much help. If you still
>> need more throughput, you can just use more processes.
>>
>> -Ewen
>>
>> On Fri, Jan 8, 2016 at 1:24 PM, Andrej Vladimirovich <
>> udodizdu...@gmail.com>
>> wrote:
>>
>> > Thanks Ewen. Do you know if kafka-run-class.sh
>> > org.apache.kafka.clients.tools.ProducerPerformance
>> > is single threaded? Or is there any way to specify number of threads?
>> >
>> > On Fri, Jan 8, 2016 at 1:24 PM, Ewen Cheslack-Postava > >
>> > wrote:
>> >
>> > > Ah, sorry, I missed the version number in your title. I think this tool
>> > saw
>> > > some rearrangement in 0.9.0 and I was looking at the latest version.
>> > > Unfortunately it doesn't look like the old
>> > kafka.tools.ProducerPerformance
>> > > that is used in kafka-producer-perf-test.sh in 0.8.2.1 supports passing
>> > in
>> > > additional properties.
>> > >
>> > > -Ewen
>> > >
>> > > On Fri, Jan 8, 2016 at 9:10 AM, Andrej Vladimirovich <
>> > > udodizdu...@gmail.com>
>> > > wrote:
>> > >
>> > > > Ewen,
>> > > >
>> > > > I tried that before like this:
>> > > >
>> > > > ./kafka-producer-perf-test.sh --broker-list test:9092 --topics
>> test8-3
>> > > > --messages 200 --new-producer --message-size 200
>> > > > --show-detailed-stats max.request.size=1000
>> > > >
>> > > > and it does not work. It comletly ignore this option.
>> > > >
>> > > > And --producer-props is not a valid option for
>> > > kafka-producer-perf-test.sh.
>> > > > Maybe it is not the right syntax? But I tried a lot of different ways
>> > and
>> > > > have yet to find the right one.
>> > > >
>> > > > Thanks!
>> > > >
>> > > > Andrew
>> > > >
>> > > > On Fri, Jan 8, 2016 at 10:54 AM, Ewen Cheslack-Postava <
>> > > e...@confluent.io>
>> > > > wrote:
>> > > >
>> > > > > Andrew,
>> > > > >
>> > > > > kafka-producer-perf-test.sh is just a wrapper around
>> > > > > orga.apache.kafka.clients.tools.ProducerPerformance and all command
>> > > line
>> > > > > options should be forwarded. Can you just pass a --producer-props
>> to
>> > > set
>> > > > > max.request.size to a larger value?
>> > > > >
>> > > > > -Ewen
>> > > > >
>> > > > > On Fri, Jan 8, 2016 at 7:51 AM, Andrej Vladimirovich <
>> > > > > udodizdu...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi!
>> > > > > >
>> > > > > > I am testing Kafka's performance with large messages and would
>> like
>> > > to
>> > > > > > specify maximum request size when I run
>> > kafka-producer-perf-test.sh:
>> > > > > >
>> > > > > > ./kafka-producer-perf-test.sh --broker-list "test1:9092" --topics
>> > > > test8-3
>> > > > > > --messages 100 --new-producer --msage-size 110
>> > > > > > --show-detailed-stats
>> > > > > >
>> > > > > > I always get this message if I specify somethine larger than 1MB:
>> > > > > >
>> > > > > > ERROR Error when sending message to topic test8-3 with key: 1
>> > bytes,
>> > > > > value:
>> > > > > > 110 bytes with error: The message is 1100027 bytes when
>> > > serialized
>> > > > > > which is larger than the maximum request size you have configured
>> > > with
>> > > > > the
>> > > > > > max.request.size configurati

Re: mirror maker against 0.8.2 source cluster and 0.9.0 destination cluster

2016-01-11 Thread Hannes Stockner
The MirrorMaker 0.9.0 has some nice features like messageHandler which
would be great to use.

Any ideas how this new functionality could be used in a 0.8.2 source
cluster and 0.9.0 destination cluster in an efficient way?

Thanks

On Wed, Jan 6, 2016 at 11:30 AM, Ismael Juma  wrote:

> Hi Stephen,
>
> Newer brokers support older clients, but not the other way around. You
> could try 0.8.2 MirrorMaker against 0.8.2 source and 0.9.0 target clusters
> perhaps?
>
> Ismael
> On 6 Jan 2016 11:18, "Stephen Powis"  wrote:
>
> > Hey!
> >
> > So I'm trying to get mirror maker going between two different clusters.
> My
> > source cluster is version 0.8.2 and my destination cluster is 0.9.0
> running
> > the mirror maker code from the 0.9.0 release.  Does anyone know if this
> is
> > possible to do?  I'm aware that the protocol changed slightly between
> > versions.
> >
> > Attempting to run ./kafka-console-consumer.sh from the 0.9.0 release and
> > consume from my 0.8.2 cluster seems to fail, which is leading me to
> believe
> > that mirror maker will have the same issue.
> >
> > Attached is the errors I receive from kafka-console-consumer.sh running
> > from 0.9.0 release against a 0.8.2 cluster.
> >
> > ./kafka-console-consumer.sh  --zookeeper W.X.Y.Z:2181 --topic MyTopic
> >
> > [2016-01-06 06:17:40,587] WARN
> >
> >
> [ConsumerFetcherThread-console-consumer-73608_hostname-1452079050696-728e3f7e-0-100],
> > Error in fetch
> kafka.consumer.ConsumerFetcherThread$FetchRequest@6bb31fae.
> > Possible cause: java.lang.IllegalArgumentException
> > (kafka.consumer.ConsumerFetcherThread)
> > ...
> >
> > Thanks!
> >
>


Re: trouble sending produce requests to 0.9.0.0 broker cluster

2016-01-11 Thread dave
I created the following bug report:
https://issues.apache.org/jira/browse/KAFKA-3088


-Original Message-
From: "Dave Peterson" 
Sent: Monday, January 11, 2016 9:56am
To: users@kafka.apache.org
Subject: Re: trouble sending produce requests to 0.9.0.0 broker cluster

Ok, thanks for the information.  I tried setting the client ID to 
"bruce" and the
problem disappeared.  I looked at
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
and it doesn't say anything about an empty client ID not being allowed (even
if it isn't allowed, Kafka shouldn't crash on receiving such a request).
Therefore I think this should be considered a bug, and a regression from 
0.8.
As a workaround, I'll avoid sending requests with an empty client ID.

Thanks,
Dave


On 1/11/2016 9:22 AM, Dana Powers wrote:
> Looks like you aren't setting the request client-id, and server is crashing
> on it. I'm not sure whether server api is expected to work w/o client-id,
> but you can probably fix by sending one. Fwiw, kafka-python sends
> 'kafka-python' unless user specifies something else.
>
> -Dana
> On Jan 11, 2016 8:41 AM,  wrote:
>
>> I forgot to include some information about this problem.  When I sent
>> the produce request, the following appeared in server.log:
>>
>>  [2016-01-10 23:03:44,957] ERROR [KafkaApi-3] error when handling
>> request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
>> null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
>> [topic_1,3] -> 37 (kafka.server.KafkaApis)
>>  java.lang.NullPointerException
>> at
>> org.apache.kafka.common.metrics.JmxReporter.getMBeanName(JmxReporter.java:127)
>> at
>> org.apache.kafka.common.metrics.JmxReporter.addAttribute(JmxReporter.java:106)
>> at
>> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76)
>> at
>> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>> at
>> kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
>> at
>> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
>> at
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
>> at
>> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>> at
>> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>> at
>> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
>> at
>> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
>> at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>> at
>> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Later when I tried sending another produce request I got a somewhat
>> similar error:
>>
>>  [2016-01-11 08:15:05,153] ERROR [KafkaApi-3] error when handling
>> request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
>> null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
>> [topic_1,3] -> 37 (kafka.server.KafkaApis)
>>  java.lang.IllegalArgumentException: A metric named 'MetricName
>> [name=throttle-time, group=Produce, description=Tracking average
>> throttle-time per client, tags={client-id=null}]' already exists, can't
>> register another one.
>> at
>> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>> at
>> kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
>> at
>> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
>> at
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
>> at
>> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>> at
>> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>> at
>> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
>> at
>> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
>> at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>> at
>> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> In both cases I was running kafka-console-consumer.sh to consume
>> messages from the topic I was sending to, and the consumer did see the
>> message I sent.
>>
>>

Re: kafka-producer-perf-test.sh - 0.8.2.1

2016-01-11 Thread Andrej Vladimirovich
Ewen,

One more question. I mentioned that *kafka-run-class.sh
org.apache.kafka.clients.tools.ProducerPerformance* latency is a lot higher
than *kafka-run-class.sh kafka.tools.TestEndToEndLatency.*

Example:

*ProducerPerformance:*

5000 records sent, 337463.891364 records/sec (32.18 MB/sec), *1548.51
ms avg latency*, 3186.00 ms max latency, 2478 ms 50th, 3071 ms 95th, 3118
ms 99th, 3179 ms 99.9th.

*TestEndToEndLatency:*

Percentiles: 50th = 8, 99th = 9, 99.9th = 20

So 1548.51 ms vs 9 ms.Huge difference.

I am using the same cluster, same server and same topic to run both tests.
It does not make any sense to me why would End to End be so low and
Producer to Kafka is so large?

I did some research online and found other people having the same question
without any responses.

Thanks a lot for your help!

Andrew

On Fri, Jan 8, 2016 at 5:44 PM, Ewen Cheslack-Postava 
wrote:

> It is single threaded in the sense that you can not request that multiple
> threads be used to call producer.send(). However, the producer has its own
> internal thread for doing network IO. When you have such a simple producer,
> depending on the size of messages you can saturate a 1Gbps link with a
> single thread, so usually using more threads isn't much help. If you still
> need more throughput, you can just use more processes.
>
> -Ewen
>
> On Fri, Jan 8, 2016 at 1:24 PM, Andrej Vladimirovich <
> udodizdu...@gmail.com>
> wrote:
>
> > Thanks Ewen. Do you know if kafka-run-class.sh
> > org.apache.kafka.clients.tools.ProducerPerformance
> > is single threaded? Or is there any way to specify number of threads?
> >
> > On Fri, Jan 8, 2016 at 1:24 PM, Ewen Cheslack-Postava  >
> > wrote:
> >
> > > Ah, sorry, I missed the version number in your title. I think this tool
> > saw
> > > some rearrangement in 0.9.0 and I was looking at the latest version.
> > > Unfortunately it doesn't look like the old
> > kafka.tools.ProducerPerformance
> > > that is used in kafka-producer-perf-test.sh in 0.8.2.1 supports passing
> > in
> > > additional properties.
> > >
> > > -Ewen
> > >
> > > On Fri, Jan 8, 2016 at 9:10 AM, Andrej Vladimirovich <
> > > udodizdu...@gmail.com>
> > > wrote:
> > >
> > > > Ewen,
> > > >
> > > > I tried that before like this:
> > > >
> > > > ./kafka-producer-perf-test.sh --broker-list test:9092 --topics
> test8-3
> > > > --messages 200 --new-producer --message-size 200
> > > > --show-detailed-stats max.request.size=1000
> > > >
> > > > and it does not work. It comletly ignore this option.
> > > >
> > > > And --producer-props is not a valid option for
> > > kafka-producer-perf-test.sh.
> > > > Maybe it is not the right syntax? But I tried a lot of different ways
> > and
> > > > have yet to find the right one.
> > > >
> > > > Thanks!
> > > >
> > > > Andrew
> > > >
> > > > On Fri, Jan 8, 2016 at 10:54 AM, Ewen Cheslack-Postava <
> > > e...@confluent.io>
> > > > wrote:
> > > >
> > > > > Andrew,
> > > > >
> > > > > kafka-producer-perf-test.sh is just a wrapper around
> > > > > orga.apache.kafka.clients.tools.ProducerPerformance and all command
> > > line
> > > > > options should be forwarded. Can you just pass a --producer-props
> to
> > > set
> > > > > max.request.size to a larger value?
> > > > >
> > > > > -Ewen
> > > > >
> > > > > On Fri, Jan 8, 2016 at 7:51 AM, Andrej Vladimirovich <
> > > > > udodizdu...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi!
> > > > > >
> > > > > > I am testing Kafka's performance with large messages and would
> like
> > > to
> > > > > > specify maximum request size when I run
> > kafka-producer-perf-test.sh:
> > > > > >
> > > > > > ./kafka-producer-perf-test.sh --broker-list "test1:9092" --topics
> > > > test8-3
> > > > > > --messages 100 --new-producer --msage-size 110
> > > > > > --show-detailed-stats
> > > > > >
> > > > > > I always get this message if I specify somethine larger than 1MB:
> > > > > >
> > > > > > ERROR Error when sending message to topic test8-3 with key: 1
> > bytes,
> > > > > value:
> > > > > > 110 bytes with error: The message is 1100027 bytes when
> > > serialized
> > > > > > which is larger than the maximum request size you have configured
> > > with
> > > > > the
> > > > > > max.request.size configuration.
> > > > > >
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> > > > > >
> > > > > > I know I can specify maximum request size with kafka-run-class.sh
> > > > > > org.apache.kafka.clients.tools.ProducerPerformance but I would
> like
> > > to
> > > > > use
> > > > > > kafka-producer-perf-test.sh if possible.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Andrew
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Ewen
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: trouble sending produce requests to 0.9.0.0 broker cluster

2016-01-11 Thread Dave Peterson
Ok, thanks for the information.  I tried setting the client ID to 
"bruce" and the

problem disappeared.  I looked at
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
and it doesn't say anything about an empty client ID not being allowed (even
if it isn't allowed, Kafka shouldn't crash on receiving such a request).
Therefore I think this should be considered a bug, and a regression from 
0.8.

As a workaround, I'll avoid sending requests with an empty client ID.

Thanks,
Dave


On 1/11/2016 9:22 AM, Dana Powers wrote:

Looks like you aren't setting the request client-id, and server is crashing
on it. I'm not sure whether server api is expected to work w/o client-id,
but you can probably fix by sending one. Fwiw, kafka-python sends
'kafka-python' unless user specifies something else.

-Dana
On Jan 11, 2016 8:41 AM,  wrote:


I forgot to include some information about this problem.  When I sent
the produce request, the following appeared in server.log:

 [2016-01-10 23:03:44,957] ERROR [KafkaApi-3] error when handling
request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
[topic_1,3] -> 37 (kafka.server.KafkaApis)
 java.lang.NullPointerException
at
org.apache.kafka.common.metrics.JmxReporter.getMBeanName(JmxReporter.java:127)
at
org.apache.kafka.common.metrics.JmxReporter.addAttribute(JmxReporter.java:106)
at
org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76)
at
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
at
kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
at
kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
at
kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
at
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
at
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
at
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
at
kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)

Later when I tried sending another produce request I got a somewhat
similar error:

 [2016-01-11 08:15:05,153] ERROR [KafkaApi-3] error when handling
request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
[topic_1,3] -> 37 (kafka.server.KafkaApis)
 java.lang.IllegalArgumentException: A metric named 'MetricName
[name=throttle-time, group=Produce, description=Tracking average
throttle-time per client, tags={client-id=null}]' already exists, can't
register another one.
at
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
at
kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
at
kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
at
kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
at
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
at
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
at
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
at
kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)

In both cases I was running kafka-console-consumer.sh to consume
messages from the topic I was sending to, and the consumer did see the
message I sent.

Dave


-Original Message-
From: d...@dspeterson.com
Sent: Monday, January 11, 2016 12:32am
To: users@kafka.apache.org
Subject: trouble sending produce requests to 0.9.0.0 broker cluster

Hi,

I'm having trouble sending produce requests to a Kafka 0.9.0.0 broker
cluster consisting of 4 brokers with IDs 0, 1, 2, and 3.  All 4
brokers are running locally on my CentOS 7 development box, listening
on ports 9092, 9093, 9094, and 9095 respectively.  I am running my
own producer code (Bruce, see https:

Re: Kafka 0.9 Consumer Group

2016-01-11 Thread Jason Gustafson
Sorry, wrong property, I meant enable.auto.commit.

-Jason

On Mon, Jan 11, 2016 at 9:52 AM, Jason Gustafson  wrote:

> Hi Howard,
>
> The offsets are persisted in the __consumer_offsets topic indefinitely.
> Since you're using manual commit, have you ensured that auto.offset.reset
> is disabled? It might also help if you provide a little more detail on how
> you're verifying that offsets were lost.
>
> -Jason
>
> On Mon, Jan 11, 2016 at 7:42 AM, Wang, Howard 
> wrote:
>
>> Hi,
>>
>> I have a question regarding the Kafka 0.9 Consumer Group . I manually
>> commit offsets using the  Kafka 0.9 Consumer created with a consumer group.
>>
>> However, after my app restarted totally from scratch, the consumer group
>> seems to lose all the offsets. Is that true that the consumer offsets are
>> transient and will be gone after the consumer group has no member and gets
>> deleted?
>>
>> Thanks.
>>
>> Howard
>> --
>>  Howard Wang
>> Engineering - Big Data and Personalization
>> Washington Post Media
>>
>> 1150 15th St NW, Washington, DC 20071
>> p. 202-334-9195
>> Email: howard.w...@washpost.com
>>
>
>


Re: Kafka 0.9 Consumer Group

2016-01-11 Thread Jason Gustafson
Hi Howard,

The offsets are persisted in the __consumer_offsets topic indefinitely.
Since you're using manual commit, have you ensured that auto.offset.reset
is disabled? It might also help if you provide a little more detail on how
you're verifying that offsets were lost.

-Jason

On Mon, Jan 11, 2016 at 7:42 AM, Wang, Howard 
wrote:

> Hi,
>
> I have a question regarding the Kafka 0.9 Consumer Group . I manually
> commit offsets using the  Kafka 0.9 Consumer created with a consumer group.
>
> However, after my app restarted totally from scratch, the consumer group
> seems to lose all the offsets. Is that true that the consumer offsets are
> transient and will be gone after the consumer group has no member and gets
> deleted?
>
> Thanks.
>
> Howard
> --
>  Howard Wang
> Engineering - Big Data and Personalization
> Washington Post Media
>
> 1150 15th St NW, Washington, DC 20071
> p. 202-334-9195
> Email: howard.w...@washpost.com
>


Kafka Is Featured on HPE Matter

2016-01-11 Thread England, Laura (Interfuse)
Hello!

HPE Matter, the digital magazine from HPE where the 
brightest minds in business share their perspectives on a technology-driven 
world, recently launched its seventh issue. HPE Matter’s The Next Enterprise 
Issue explores how tech-driven enterprises of the future will create value 
faster than ever.

We're glad to let you know that Kafka is featured this 
week
 on our website! We hope you agree that it’s a great story. In fact, we would 
appreciate it if you could share the story with your social media followers 
online. We are more than happy to provide you with sample tweets as well as 
graphics sized for all major social platforms.

Thank you,
HPE Matter Social Team



This message is intended only for the person or entity to which it is addressed 
and may contain information that is privileged, confidential or otherwise 
protected from disclosure. Dissemination, distribution or copying of this 
message or the information herein by anyone other than the intended recipient, 
or an employee or agent responsible for delivering the message to the intended 
recipient, is prohibited. If you have received this message by mistake, please 
destroy it immediately and notify the sender.


Re: best python library to use?

2016-01-11 Thread Dana Powers
Agree - kafka-python was in hibernation waiting for 0.9.0.0 Kafka release,
so a few issues lingered longer than I would have liked. Most of my
comments relate to latest master, which we are hoping to release after a
bit more testing and polish.

Re librdkafka -- to be honest, I'm skeptical that C protocol bindings are
going to improve python performance much. In my experience, the devil is in
the client logic details, not the wire protocol parsing. Adding a C
compilation step also adds installation and operational overhead (install
gcc or manage linux wheels). So we have avoided adding that to kafka-python
without significant evidence showing performance benefits that can't be
duplicated in pure python.

-Dana
On Jan 11, 2016 9:02 AM, "Sam Pegler" 
wrote:

> kafka-python (https://github.com/dpkp/kafka-python) has also just merged
> performance improvements to the consumer in
> https://github.com/dpkp/kafka-python/issues/290 which should see a pretty
> decent boost in throughput.  We were somewhat put off by the poor
> performance in earlier versions, I imagine many people would have been in
> the same position so it's worth revisiting.
>
> Sam Pegler
>
> WEBOPS ENGINEER T. +44(0) 07 562 867 486 [image: Infectious Media]3-7
> Herbal Hill / London / EC1R 5EJwww.infectiousmedia.com [image: Infectious
> Media] [image: Facebook]
> [image: Twitter]
> [image: LinkedIn]
> [image: Youtube]
>    This email and any
> attachments are confidential and may also be privileged. If youare not the
> intended recipient, please notify the sender immediately, and do
> notdisclose
> the contents to another person, use it for any purpose, or store, or
> copythe
> information in any medium. Please also destroy and delete the message
> fromyour
> computer.
>
> On 11 January 2016 at 16:28, Andrew Otto  wrote:
>
> > pykafka’s balanced consumer is very useful. pykafka also has Python
> > bindings to the librdkafka C library that you can optionally enable,
> which
> > might get you some speed boosts.
> >
> > python-kafka (oh, I just saw this 0.9x version, hm!) was better at
> > producing than pykafka for us, so we am currently using pykafka for
> > consumption, and python-kafka for production.  python-kafka allows you to
> > produce to multiple topics using the same client instance.  (pykafka may
> > support this soon: https://github.com/Parsely/pykafka/issues/354)
> >
> >
> >
> > On Sat, Jan 9, 2016 at 10:04 AM, Dana Powers 
> > wrote:
> >
> > > pykafka uses a custom zookeeper implementation for consumer groups.
> > > kafka-python uses the 0.9.0.0 server apis to accomplish the same.
> > >
> > > -Dana
> > > On Jan 8, 2016 18:32, "chengxin Cai"  wrote:
> > >
> > > > Hi
> > > >
> > > > I heard that Pykakfa can create a balanced consumer.
> > > >
> > > > And there should be no other big difference.
> > > >
> > > >
> > > > Best Regards
> > > >
> > > > > 在 2016年1月9日,08:58,Dana Powers  写道:
> > > > >
> > > > > Hi Doug,
> > > > >
> > > > > The differences are fairly subtle. kafka-python is a
> community-backed
> > > > > project that aims to be consistent w/ the official java client;
> > pykafka
> > > > is
> > > > > sponsored by parse.ly and aims to provide a pythonic interface.
> > > > whichever
> > > > > you go with, I would love to hear your specific feedback on
> > > kafka-python.
> > > > >
> > > > > -Dana (kafka-python maintainer)
> > > > >
> > > > >> On Fri, Jan 8, 2016 at 4:32 PM, Doug Tomm 
> wrote:
> > > > >>
> > > > >> we're using kafka-python, weighing pykafka, and wondering if
> there's
> > > > >> another that is bettor to use.  does confluent endorse or
> recommend
> > a
> > > > >> particular python package (psorry for the alliteration)?
> > > > >>
> > > > >> doug
> > > > >>
> > > > >>
> > > >
> > >
> >
>


RE: trouble sending produce requests to 0.9.0.0 broker cluster

2016-01-11 Thread Dana Powers
Looks like you aren't setting the request client-id, and server is crashing
on it. I'm not sure whether server api is expected to work w/o client-id,
but you can probably fix by sending one. Fwiw, kafka-python sends
'kafka-python' unless user specifies something else.

-Dana
On Jan 11, 2016 8:41 AM,  wrote:

> I forgot to include some information about this problem.  When I sent
> the produce request, the following appeared in server.log:
>
> [2016-01-10 23:03:44,957] ERROR [KafkaApi-3] error when handling
> request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
> null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
> [topic_1,3] -> 37 (kafka.server.KafkaApis)
> java.lang.NullPointerException
>at
> org.apache.kafka.common.metrics.JmxReporter.getMBeanName(JmxReporter.java:127)
>at
> org.apache.kafka.common.metrics.JmxReporter.addAttribute(JmxReporter.java:106)
>at
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76)
>at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>at
> kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
>at
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
>at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
>at
> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
>at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>at java.lang.Thread.run(Thread.java:745)
>
> Later when I tried sending another produce request I got a somewhat
> similar error:
>
> [2016-01-11 08:15:05,153] ERROR [KafkaApi-3] error when handling
> request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
> null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
> [topic_1,3] -> 37 (kafka.server.KafkaApis)
> java.lang.IllegalArgumentException: A metric named 'MetricName
> [name=throttle-time, group=Produce, description=Tracking average
> throttle-time per client, tags={client-id=null}]' already exists, can't
> register another one.
>at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>at
> kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
>at
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
>at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
>at
> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
>at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>at java.lang.Thread.run(Thread.java:745)
>
> In both cases I was running kafka-console-consumer.sh to consume
> messages from the topic I was sending to, and the consumer did see the
> message I sent.
>
> Dave
>
>
> -Original Message-
> From: d...@dspeterson.com
> Sent: Monday, January 11, 2016 12:32am
> To: users@kafka.apache.org
> Subject: trouble sending produce requests to 0.9.0.0 broker cluster
>
> Hi,
>
> I'm having trouble sending produce requests to a Kafka 0.9.0.0 broker
> cluster consisting of 4 brokers with IDs 0, 1, 2, and 3.  All 4
> brokers are running locally on my CentOS 7 development box, listening
> on ports 9092, 9093, 9094, and 9095 respectively.  I am running my
> own producer code (Bruce, see https://github.com/ifwe/bruce), which
> works without problems with Kafka 0.8, but has problems with 0.9.0.0.
> When I send a produce request consisting of a single message, I often
> get a response consisting of error ACK -1 (Unknown, unexpected server
> error) although I have also seen other errors such as 6
> (NotLeaderForPartition).  During one observed instance of this
> behavior I saw the following:
>
> sent p

Re: best python library to use?

2016-01-11 Thread Sam Pegler
kafka-python (https://github.com/dpkp/kafka-python) has also just merged
performance improvements to the consumer in
https://github.com/dpkp/kafka-python/issues/290 which should see a pretty
decent boost in throughput.  We were somewhat put off by the poor
performance in earlier versions, I imagine many people would have been in
the same position so it's worth revisiting.

Sam Pegler

WEBOPS ENGINEER T. +44(0) 07 562 867 486 [image: Infectious Media]3-7
Herbal Hill / London / EC1R 5EJwww.infectiousmedia.com [image: Infectious
Media] [image: Facebook]
[image: Twitter]
[image: LinkedIn]
[image: Youtube]
   This email and any
attachments are confidential and may also be privileged. If youare not the
intended recipient, please notify the sender immediately, and do notdisclose
the contents to another person, use it for any purpose, or store, or copythe
information in any medium. Please also destroy and delete the message fromyour
computer.

On 11 January 2016 at 16:28, Andrew Otto  wrote:

> pykafka’s balanced consumer is very useful. pykafka also has Python
> bindings to the librdkafka C library that you can optionally enable, which
> might get you some speed boosts.
>
> python-kafka (oh, I just saw this 0.9x version, hm!) was better at
> producing than pykafka for us, so we am currently using pykafka for
> consumption, and python-kafka for production.  python-kafka allows you to
> produce to multiple topics using the same client instance.  (pykafka may
> support this soon: https://github.com/Parsely/pykafka/issues/354)
>
>
>
> On Sat, Jan 9, 2016 at 10:04 AM, Dana Powers 
> wrote:
>
> > pykafka uses a custom zookeeper implementation for consumer groups.
> > kafka-python uses the 0.9.0.0 server apis to accomplish the same.
> >
> > -Dana
> > On Jan 8, 2016 18:32, "chengxin Cai"  wrote:
> >
> > > Hi
> > >
> > > I heard that Pykakfa can create a balanced consumer.
> > >
> > > And there should be no other big difference.
> > >
> > >
> > > Best Regards
> > >
> > > > 在 2016年1月9日,08:58,Dana Powers  写道:
> > > >
> > > > Hi Doug,
> > > >
> > > > The differences are fairly subtle. kafka-python is a community-backed
> > > > project that aims to be consistent w/ the official java client;
> pykafka
> > > is
> > > > sponsored by parse.ly and aims to provide a pythonic interface.
> > > whichever
> > > > you go with, I would love to hear your specific feedback on
> > kafka-python.
> > > >
> > > > -Dana (kafka-python maintainer)
> > > >
> > > >> On Fri, Jan 8, 2016 at 4:32 PM, Doug Tomm  wrote:
> > > >>
> > > >> we're using kafka-python, weighing pykafka, and wondering if there's
> > > >> another that is bettor to use.  does confluent endorse or recommend
> a
> > > >> particular python package (psorry for the alliteration)?
> > > >>
> > > >> doug
> > > >>
> > > >>
> > >
> >
>


Broker Health check

2016-01-11 Thread Brice Dutheil
Hi,

I and my team are beginning with kafka so I/we may not have read everything
or understood everything yet.

The context is we’d like to bootstrap kafka in our own codebase, and first
step is to have a deployable walking skeleton that the team can grow along
our experience and needs.

We are starting with kafka 0.9.0.0, and surprisingly there’s no status
command for the broker, so we decided to do a comprehensive check but we
are still a bit confused on how to write a healthcheck that is not fragile.

Currently we would like to produce a message (e.g. a timestamp) on some
healthcheck topic, and expect a consumer to read this message, a grep would
check that the message has the same value.
However we are facing some behaviour that we don’t yet quite understand.

# first emit the message
timestamp=$(date -u "%Y-%m-%dT%H:%M%SZ)"echo $timestamp |
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
healthcheck
# try to consume the same message
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server
localhost:9092 --topic healthcheck --max-messages 1 --from-beginning |
grep $timestamp

[ 0 == $? ] && echo "kafka onfline" || echo "kafka offline"

In this case the consumer does not always *find* the matching message. We
have tried different options there ; with or without --max-messages,
--from-beginning, --timeout-ms, it does’nt behave as we thought by reading
the documentation. Sometime the message is read some time it isn’t, but we
don’t get why.
--

To be noted, I don’t understand how it is possible to read the latest
message,

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic yat 
> --partitions 1 --replication-factor 1
Created topic "yat".

> echo bob | bin/kafka-console-producer.sh --topic yat --broker-list 
> localhost:9092

> echo charles | bin/kafka-console-producer.sh --topic yat --broker-list 
> localhost:9092
> echo yohan | bin/kafka-console-producer.sh --topic yat --broker-list 
> localhost:9092
> echo carlos | bin/kafka-console-producer.sh --topic yat --broker-list 
> localhost:9092
# Running the simple command just outputs nothing, and I need to kill
the process with ctrl-c, note the messages count is off by 1, or I'm
missing something
> bin/kafka-console-consumer.sh --new-consumer --bootstrap-server 
> localhost:9092 --topic yat
^CProcessed a total of 1 messages
# Same with `--from-beginning`, the pocess don't finish up, and prints
everything since the beginning (that is to be expected).
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server
localhost:9092 --topic yat --from-beginning
bob
charles
yohan
carlos
carlos
yohan
brice
^CProcessed a total of 8 messages
# With `--from-beginning` and `--max-messages` the consumer stops at
the very first message
[root@3af0ccad3182 kafka]# bin/kafka-console-consumer.sh
--new-consumer --bootstrap-server localhost:9092 --topic yat
--from-beginning --max-messages 1
bob
Processed a total of 1 messages
# With `--max-messages` the latest message doesn't appear either and I
have to kill the consumer too
> bin/kafka-console-consumer.sh --new-consumer --bootstrap-server 
> localhost:9092 --topic yat --max-messages 1
^CProcessed a total of 1 messages

Also note the same happen with the old consumer with --zookeeper option.

Is there some limitation with the console consumer regarding offsets ? Is
there something I’m missing.

Should we write our own healthcheck ?

Thanks in advance for any feedback, pointers, etc.

Cheers,
— Brice
​


RE: trouble sending produce requests to 0.9.0.0 broker cluster

2016-01-11 Thread dave
I forgot to include some information about this problem.  When I sent
the produce request, the following appeared in server.log:

[2016-01-10 23:03:44,957] ERROR [KafkaApi-3] error when handling request 
Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId: null; 
RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition: [topic_1,3] -> 37 
(kafka.server.KafkaApis)
java.lang.NullPointerException
   at 
org.apache.kafka.common.metrics.JmxReporter.getMBeanName(JmxReporter.java:127)
   at 
org.apache.kafka.common.metrics.JmxReporter.addAttribute(JmxReporter.java:106)
   at 
org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76)
   at 
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
   at 
kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
   at 
kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
   at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
   at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
   at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
   at 
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)

Later when I tried sending another produce request I got a somewhat
similar error:

[2016-01-11 08:15:05,153] ERROR [KafkaApi-3] error when handling request 
Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId: null; 
RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition: [topic_1,3] -> 37 
(kafka.server.KafkaApis)
java.lang.IllegalArgumentException: A metric named 'MetricName 
[name=throttle-time, group=Produce, description=Tracking average throttle-time 
per client, tags={client-id=null}]' already exists, can't register another one.
   at 
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
   at 
kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
   at 
kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
   at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
   at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
   at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
   at 
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
   at java.lang.Thread.run(Thread.java:745)

In both cases I was running kafka-console-consumer.sh to consume
messages from the topic I was sending to, and the consumer did see the
message I sent.

Dave


-Original Message-
From: d...@dspeterson.com
Sent: Monday, January 11, 2016 12:32am
To: users@kafka.apache.org
Subject: trouble sending produce requests to 0.9.0.0 broker cluster

Hi,

I'm having trouble sending produce requests to a Kafka 0.9.0.0 broker
cluster consisting of 4 brokers with IDs 0, 1, 2, and 3.  All 4
brokers are running locally on my CentOS 7 development box, listening
on ports 9092, 9093, 9094, and 9095 respectively.  I am running my
own producer code (Bruce, see https://github.com/ifwe/bruce), which
works without problems with Kafka 0.8, but has problems with 0.9.0.0.
When I send a produce request consisting of a single message, I often
get a response consisting of error ACK -1 (Unknown, unexpected server
error) although I have also seen other errors such as 6
(NotLeaderForPartition).  During one observed instance of this
behavior I saw the following:

sent produce request

- single message set containing a single message
- topic: "topic_1"
- partition: 3
- empty key
- value: "hello world"
- API key: 0 (produce request)
- API version: 0
- correlation ID: 1
- empty client ID
- required ACKs: 1
- timeout: 1
- message magic byte: 0
- message attributes: 0 (no compression)
- destination broker ID: 3

received produce response
--

Re: best python library to use?

2016-01-11 Thread Andrew Otto
pykafka’s balanced consumer is very useful. pykafka also has Python
bindings to the librdkafka C library that you can optionally enable, which
might get you some speed boosts.

python-kafka (oh, I just saw this 0.9x version, hm!) was better at
producing than pykafka for us, so we am currently using pykafka for
consumption, and python-kafka for production.  python-kafka allows you to
produce to multiple topics using the same client instance.  (pykafka may
support this soon: https://github.com/Parsely/pykafka/issues/354)



On Sat, Jan 9, 2016 at 10:04 AM, Dana Powers  wrote:

> pykafka uses a custom zookeeper implementation for consumer groups.
> kafka-python uses the 0.9.0.0 server apis to accomplish the same.
>
> -Dana
> On Jan 8, 2016 18:32, "chengxin Cai"  wrote:
>
> > Hi
> >
> > I heard that Pykakfa can create a balanced consumer.
> >
> > And there should be no other big difference.
> >
> >
> > Best Regards
> >
> > > 在 2016年1月9日,08:58,Dana Powers  写道:
> > >
> > > Hi Doug,
> > >
> > > The differences are fairly subtle. kafka-python is a community-backed
> > > project that aims to be consistent w/ the official java client; pykafka
> > is
> > > sponsored by parse.ly and aims to provide a pythonic interface.
> > whichever
> > > you go with, I would love to hear your specific feedback on
> kafka-python.
> > >
> > > -Dana (kafka-python maintainer)
> > >
> > >> On Fri, Jan 8, 2016 at 4:32 PM, Doug Tomm  wrote:
> > >>
> > >> we're using kafka-python, weighing pykafka, and wondering if there's
> > >> another that is bettor to use.  does confluent endorse or recommend a
> > >> particular python package (psorry for the alliteration)?
> > >>
> > >> doug
> > >>
> > >>
> >
>


Kafka 0.9 Consumer Group

2016-01-11 Thread Wang, Howard
Hi,

I have a question regarding the Kafka 0.9 Consumer Group . I manually commit 
offsets using the  Kafka 0.9 Consumer created with a consumer group.

However, after my app restarted totally from scratch, the consumer group seems 
to lose all the offsets. Is that true that the consumer offsets are transient 
and will be gone after the consumer group has no member and gets deleted?

Thanks.

Howard
--
 Howard Wang
Engineering - Big Data and Personalization
Washington Post Media

1150 15th St NW, Washington, DC 20071
p. 202-334-9195
Email: howard.w...@washpost.com


Re: Upgraded from 0.8.2 to 0.9.0 and FetchFollower request time has increased

2016-01-11 Thread Stephen Powis
Hmm over the weekend looks like this slowly crept back up as shown in this
graph: http://i.imgur.com/DH9Qa09.png  I performed the upgrade on 1/8
around 12am as shown by the spike in the graph.  Here's a graph with a
little more history going back ~1 month.  This was pretty stable prior to
the upgrade.  http://i.imgur.com/95Gw0MS.png
It looks like its entirely "RemoteTime" - http://i.imgur.com/Uamy42q.png

Any ideas of what I should check?  I have num.replica.fetchers set to 8
currently.

Thanks!

On Sat, Jan 9, 2016 at 12:19 AM, Ismael Juma  wrote:

> Great!
> On 8 Jan 2016 13:53, "Stephen Powis"  wrote:
>
> > Hey Ismael,
> >
> > It did, just took a few hours :)   It seems like its just hit where it
> was
> > before the upgrade.
> >
> >
> >
> >
> > Thanks!
> > Stephen
> >
> > On Fri, Jan 8, 2016 at 8:03 PM, Ismael Juma  wrote:
> >
> > > Hi Stephen,
> > >
> > > Have things settled down since?
> > >
> > > Thanks,
> > > Ismael
> > >
> > > On Fri, Jan 8, 2016 at 7:39 AM, Stephen Powis 
> > > wrote:
> > >
> > > > Hey!
> > > >
> > > > So I upgraded our production kafka cluster from 0.8.2 to 0.9.0 this
> > > > morning, and it seems like everything went smoothly.
> > > >
> > > > As the last step when I changed the inter.broker.protocol.version to
> > > > 0.9.0.0 and did a rolling restart, I noticed that the FetchFollower
> > > request
> > > > times increased fairly significantly.  Does anyone know if this is
> > > expected
> > > > with the new protocol version?
> > > >
> > > > Link to graphite graph: http://i.imgur.com/uhvjCBH.png
> > > >
> > > > Perhaps I'm just being paranoid and it will settle out?  This is the
> > same
> > > > graph over a larger time period - http://i.imgur.com/FB5Ey2a.png
> > > >
> > > > Thanks!
> > > >
> > >
> >
>


Apache Kafka Usage

2016-01-11 Thread Joe San
Dear Apache Kafka users,

I'm currently looking into using Apache Kafka on our infrastructure and I
came up with lots of questions that I would like to clarify. I have created
a Stackoverflow post here:

http://stackoverflow.com/questions/34715556/apache-kafka-for-time-series-data-persistence

Could anyone throw some light on this please?

Regards,
Joe


trouble sending produce requests to 0.9.0.0 broker cluster

2016-01-11 Thread dave
Hi,

I'm having trouble sending produce requests to a Kafka 0.9.0.0 broker
cluster consisting of 4 brokers with IDs 0, 1, 2, and 3.  All 4
brokers are running locally on my CentOS 7 development box, listening
on ports 9092, 9093, 9094, and 9095 respectively.  I am running my
own producer code (Bruce, see https://github.com/ifwe/bruce), which
works without problems with Kafka 0.8, but has problems with 0.9.0.0.
When I send a produce request consisting of a single message, I often
get a response consisting of error ACK -1 (Unknown, unexpected server
error) although I have also seen other errors such as 6
(NotLeaderForPartition).  During one observed instance of this
behavior I saw the following:

sent produce request

- single message set containing a single message
- topic: "topic_1"
- partition: 3
- empty key
- value: "hello world"
- API key: 0 (produce request)
- API version: 0
- correlation ID: 1
- empty client ID
- required ACKs: 1
- timeout: 1
- message magic byte: 0
- message attributes: 0 (no compression)
- destination broker ID: 3

received produce response
-
- correlation ID: 1
- topic: "topic_1"
- partition: 3
- error code: -1

I have included tcpdump output for the above request and response
below.  The topic/partition layout is as follows:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181/kafka-0.9.0.0
Topic:topic_1   PartitionCount:4ReplicationFactor:2 Configs:
Topic: topic_1  Partition: 0Leader: 0   Replicas: 0,1   Isr: 0,1
Topic: topic_1  Partition: 1Leader: 1   Replicas: 1,2   Isr: 2,1
Topic: topic_1  Partition: 2Leader: 2   Replicas: 2,3   Isr: 2,3
Topic: topic_1  Partition: 3Leader: 3   Replicas: 3,0   Isr: 0,3
$

Does anyone have any ideas about what may be causing this behavior?

Additionally, I was using tcpdump to watch communication between
kafka-console-producer.sh and the broker cluster, and noticed that
the console producer sends a value of 1 for the API version in its
produce requests.  Is there any documentation describing protocol
changes between API versions 0 and 1?  I looked here:


https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

and didn't see anything about the new API version.  Any help is much
appreciated.

Thanks,
Dave


Produce request...

# tcpdump -i lo -X -s 0 -nn src port 51147
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo, link-type EN10MB (Ethernet), capture size 65535 bytes
23:03:44.947589 IP 127.0.0.1.51147 > 127.0.0.1.9095: Flags [P.], seq 
2040824198:2040824280, ack 1758705421, win 342, options [nop,nop,TS val 
71321820 ecr 71222139], length 82
0x:  4500 0086 4300 4000 4006 f96f 7f00 0001  E...C.@.@..o
0x0010:  7f00 0001 c7cb 2387 79a4 8186 68d3 b70d  ..#.y...h...
0x0020:  8018 0156 fe7a  0101 080a 0440 48dc  ...V.z...@H.
0x0030:  043e c37b  004e    0001  .>.{...N
0x0040:   0001  2710  0001 0007 746f  ..'...to
0x0050:  7069 635f 3100  0100  0300   pic_1...
0x0060:  2500      1973 acf7  %s..
0x0070:  7c00 00ff  ff00  0b68 656c 6c6f  |..hello
0x0080:  2077 6f72 6c64   .world
23:03:44.956525 IP 127.0.0.1.51147 > 127.0.0.1.9095: Flags [.], ack 40, win 
342, options [nop,nop,TS val 71321830 ecr 71321830], length 0
0x:  4500 0034 4301 4000 4006 f9c0 7f00 0001  E..4C.@.@...
0x0010:  7f00 0001 c7cb 2387 79a4 81d8 68d3 b734  ..#.y...h..4
0x0020:  8010 0156 fe28  0101 080a 0440 48e6  ...V.(...@H.
0x0030:  0440 48e6.@H.
^C
2 packets captured
4 packets received by filter
0 packets dropped by kernel
#

Produce response...

# tcpdump -i lo -X -s 0 -nn dst port 51147
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo, link-type EN10MB (Ethernet), capture size 65535 bytes
23:03:44.947661 IP 127.0.0.1.9095 > 127.0.0.1.51147: Flags [.], ack 
2040824280, win 32742, options [nop,nop,TS val 71321821 ecr 71321820], length 0
0x:  4500 0034 7476 4000 4006 c84b 7f00 0001  E..4tv@.@..K
0x0010:  7f00 0001 2387 c7cb 68d3 b70d 79a4 81d8  #...h...y...
0x0020:  8010 7fe6 fe28  0101 080a 0440 48dd  .(...@H.
0x0030:  0440 48dc.@H.
23:03:44.956499 IP 127.0.0.1.9095 > 127.0.0.1.51147: Flags [P.], seq 0:39, 
ack 1, win 32742, options [nop,nop,TS val 71321830 ecr 71321820], length 39
0x:  4500 005b 7477 4000 4006 c823 7f00 0001  E..[tw@.@..#
0x0010:  7f00 0001 2387 c7