Re: kafka consumer not consuming messages

2014-02-10 Thread Arjun

Hi Jun,

No its not that problem. I am not getting what the problem is can you 
please help.


thanks
Arjun Narasimha Kota

On Monday 10 February 2014 09:10 PM, Jun Rao wrote:

Does
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
apply?

Thanks,

Jun


On Sun, Feb 9, 2014 at 10:27 PM, Arjun  wrote:


Hi,

I started using kafka some time back. I was experimenting with 0.8. My
problem is the kafka is unable to consume the messages. My configuration
is kafka broker on the local host and zookeeper on the local host. I
have only one broker and one consumer at present.

What have I done:
  1) I used the java examples in the kafka src and pushed some 600
messages to the broker
  2) I used the console consumer to check weather the messages are
there in the broker or not. Console consumer printed all 600 messages
  3) Now i used the java Consumer code, and tried to get those
messages. This is not printing any messages. It just got stuck

When was it working earlier:
  -When i tried with three brokers and three consumers in the same
machine, with the same configuration it worked fine.
  -I changed the properties accordingly when i tried to make it work
with one broker and one consumer

What does log say:
  - attaching the logs even

If some one points me where I am doing wrong it would be helpful.

Thanks
Arjun Narasimha Kota






RE: Dropping messages ?

2014-02-10 Thread Kat Walker
Hi Jun/Guozhang

We might have to retry our QA tests in its entirety. We simply cannot reset 
consumer offset as there is a lot of processing involved after consuming those 
messages. This might take almost a week. The Kafka message also contains 
`volatile` data which is fetched from a database and destroyed after consuming  
that Kafka message. 

What is puzzling is that we have been running Kafka in Production for over 4 
months now and we have never faced this issue. Our peak volume is <= 1000 
messages / second. On an average, it is less than 100.

Our zookeeper version is 3.3.6. What I suspect is that we managed to `roll 
over` a few messages due to inconsistencies in zookeeper offsets. Recall that 
we only had only 1 Kafka broker. Once again, this is just a `wild` speculation. 

The topics were created before we started our tests. 

Has someone ever lost a Kafka message before ? Are there any guarantees ? We 
are okay with all kinds of delays provided, the messages arrive in order and 
are delivered regardless.

Thanks
Kat

> Date: Mon, 10 Feb 2014 07:38:01 -0800
> Subject: Re: Dropping messages ?
> From: jun...@gmail.com
> To: users@kafka.apache.org
> 
> If you reset the consumer offset and try to consume those messages again,
> do you see the same drop?
> 
> Thanks,
> 
> Jun
> 
> 
> On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:
> 
> > Hi
> >
> > We have been using Kafka(0.8) for the past few months with the following
> > setup
> > Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
> >
> > Yesterday, while running Stress tests in one of the QA machines , we
> > observed that a few messages which were produced within a couple of
> > milliseconds of each other did not reach the Kafka consumer. ie There was
> > no trace of that message at the consumer end.
> >
> > We decided to check whether we had any errors at our side or there was a
> > network issue. We did not find any issue. We then decided to check whether
> > we can find that message in one of the Kafka partitions. The message was
> > found in one of the topic partitions.
> >
> > We are not sure why Kafka did not notify any consumers about the message.
> > Are there any special cases where Kafka silently drops a message ?
> >
> > We also found a delay in the notifications/watches triggered from
> > zookeeper. We are not sure whether these are related ? It will be difficult
> > to reproduce as the test probably took a few days to complete. But surely
> > we did lose approximately 5% of the messages. We have logs of messages
> > being produced at the producer side and corresponding entries in Kafka
> > partitions logs. But nothing at the consumer side. The only repeating
> > pattern was that the messages were probably produced within the same
> > millisecond. So if you have a sequence of messages which was produced in
> > the same millisecond like M0, M1, M2, M3 ie 4 messages. We probably have
> > M0,M1,M3 but not M2. This is puzzling as to how only message is dropped out
> > of the given 4.
> >
> >
> > We use the High Level Kafka Producer and Consumer. Both are single
> > threaded(at our end).
> >
> > Does kafka need its own dedicated zookeeper ensemble ? We also use the
> > same zookeeper ensemble as  our configuration service.
> >
> > Unfortunately, we did not have DEBUG messages at the server enabled during
> > the setup. Although, NO error messages were observed during the same time
> > period.
> >
> >
> > Before we try running the same Tests again, can someone please shed more
> > light as to the reasons why kafka dropped a few messages ?
> >
> > Kat
> >
  

Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-10 Thread Philip O'Toole
Yes, there might be - we experience link resets every so often, and definitely 
did today. 

Assume it is this, are you surprised the thread went down? Perhaps we need to 
catch this?

Philip

> On Feb 10, 2014, at 8:38 PM, Jun Rao  wrote:
> 
> This indicates that message checksum validation failed. Is there any issue
> with the network?
> 
> Thanks,
> 
> Jun
> 
> 
>> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole  wrote:
>> 
>> Saw this thrown today, which brought down a Consumer thread -- we're using
>> Consumers built on the High-level consumer framework. What may have
>> happened here? We are using a custom C++ Producer which does not do
>> compression, and which hasn't changed in months, but this error is
>> relatively new to us, and is occurring occasionally. We are running the Sun
>> JDK:
>> 
>>java version "1.7.0_25"
>>Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
>>Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
>> 
>> Restarting the Consumer clears it up, so the message on the Broker itself
>> does not appear to be problematic. We are running 3 Consumers, each of
>> which has 48 ConsumerConnector objects. Our code explicitly calls commit(),
>> we do not auto-commit.
>> 
>> Thanks,
>> 
>> Philip
>> 
>> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
>> FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed
>> offset = 120758878080
>> kafka.message.InvalidMessageException: message is invalid, compression
>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
>> 120758878080
>>at
>> 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>at
>> 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>>at
>> 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>at
>> 
>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
>>at
>> 
>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
>>at
>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
>>at
>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
>>at
>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
>>at
>> 
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>>at scala.collection.immutable.List.foreach(List.scala:45)
>>at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
>> FetcherRunnable
>> kafka.message.InvalidMessageException: message is invalid, compression
>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
>> 120758878080
>>at
>> 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>at
>> 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>>at
>> 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>at
>> 
>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
>>at
>> 
>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
>>at
>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
>>at
>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
>>at
>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
>>at
>> 
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>>at scala.collection.immutable.List.foreach(List.scala:45)
>>at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>> 


Re: code + sbt tips

2014-02-10 Thread Jun Rao
In trunk, we have the gradle support now. It seems to build the intellij
project cleaner.

Thanks,

Jun


On Mon, Feb 10, 2014 at 7:14 PM, S Ahmed  wrote:

> Few quick questions that I hope people can help me with:
>
>
> 1. most of you guys use intellij, do you always build using sbt?  i.e. you
> lose out on the bulid with IDE features like clicking on an error that
> jumps to that part of the code etc.
>
> 2. do you just build using the default scala version 2.8 during
> development?
>
>
> I have attached a screenshot of what it looks like on some .scala files,
> does yours have this issue also? (see screenshot)
>


Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-10 Thread Jun Rao
This indicates that message checksum validation failed. Is there any issue
with the network?

Thanks,

Jun


On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole  wrote:

> Saw this thrown today, which brought down a Consumer thread -- we're using
> Consumers built on the High-level consumer framework. What may have
> happened here? We are using a custom C++ Producer which does not do
> compression, and which hasn't changed in months, but this error is
> relatively new to us, and is occurring occasionally. We are running the Sun
> JDK:
>
> java version "1.7.0_25"
> Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
> Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
>
> Restarting the Consumer clears it up, so the message on the Broker itself
> does not appear to be problematic. We are running 3 Consumers, each of
> which has 48 ConsumerConnector objects. Our code explicitly calls commit(),
> we do not auto-commit.
>
> Thanks,
>
> Philip
>
> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
> FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed
> offset = 120758878080
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
> 120758878080
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at
>
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> at
>
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> at
>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> at
>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
> FetcherRunnable
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
> 120758878080
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at
>
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> at
>
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> at
>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> at
>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>


Re: Mirrormaker clients not balanced

2014-02-10 Thread Jun Rao
This is probably your issue:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-IamusingtheZK-basedproducerin0.7andIseedataonlyproducedonsomeofthebrokers,butnotall,why
?

Thanks,

Jun


On Mon, Feb 10, 2014 at 4:04 PM, Tomas Nunez  wrote:

> I don't see anything in kafabe03. In fact, I just restarted it, and I
> saw in the logs a lot of other topics registering. First, lines like
> this:
> (...)
> [2014-02-10 20:30:07,195] INFO Loading log 'topic2-0'
> (kafka.log.LogManager)
> [2014-02-10 20:30:07,201] INFO Loading the last segment
> /var/kafka/topic2-0/000559732292.kafka in mutable mode,
> recovery false (kafka.log.Log)
> (...)
> [2014-02-10 20:30:07,464] INFO Registering broker /brokers/ids/3
> (kafka.server.KafkaZooKeeper)
> [2014-02-10 20:30:07,486] INFO Registering broker /brokers/ids/3
> succeeded with
> id:3,creatorId:kafkabe03-1392064207464,host:kafkabe03,port:9092
> (kafka.server.KafkaZooKeeper)
> [2014-02-10 20:30:07,516] INFO Begin registering broker topic
> /brokers/topics/topic2/3 with 1 partitions
> (kafka.server.KafkaZooKeeper)
> [2014-02-10 20:30:07,522] INFO End registering broker topic
> /brokers/topics/topic2/3 (kafka.server.KafkaZooKeeper)
> (...)
>
> But no trace of this particular topic, topic1.
>
> Any idea why this particular topic isn't registering? Where can I look?
>
> Thanks again
> Tomàs
>
>
> One of the be brokers (3) is not registered in ZK. Do you see ZK session
> expiration (potentially due to GC) in that broker?
>
> Thanks,
>
> Jun
>
>
>
>
> On Mon, Feb 10, 2014 at 4:28 PM, Tomas Nunez  wrote:
>
> > Hi
> >
> > I'm new around here and I'm dealing with a problem, and reading the
> > documentation I don't know where else to look.
> >
> > I have a cross-dc mirrormaker setup: Mirrormaker is consuming from 5
> > frontend servers in each DC (10 in total) and 5 backend servers are
> > consuming from mirrormaker. That's working for most of the topics, but
> some
> > of them are not being consumed from all backend servers.
> >
> > as an example, browsing with zkcli I find this:
> >
> > kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
> > zookeeperbe01:2181 ls /brokers/topics/topic1
> > Connecting to zookeeperbe01:2181
> >
> > WATCHER::
> >
> > WatchedEvent state:SyncConnected type:None path:null
> > [2, 1, 5, 4]
> > kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
> > zookeeperfe01:2181 ls /brokers/topics/topic1
> > Connecting to zookeeperfe01:2181
> >
> > WATCHER::
> >
> > WatchedEvent state:SyncConnected type:None path:null
> > [3, 2, 1, 5, 4]
> >
> > What can be wrong? Where can I look to get more info to troubleshoot
> this?
> > Any hint?
> >
> > Kafka version is 0.7.2, in mirromaker config I have "--num.streams 5
> > --num.producers 5".  Zookeeper version is 3.3.5,
> >
> > Here you can see kafka-console-consumer.sh connecting to both fe and be
> > servers:
> > https://gist.github.com/pythiannunez/1623934cb538678f053e
> >
> > Thanks!
> >
> >
>
> --
>
>
> --
>
>
>
>


Re: How to compile with a newer version of zookeeper

2014-02-10 Thread Neha Narkhede
0.8-beta already depends on zookeeper 3.3.4. Also, Kafka 0.8 final is
better and more stable compared to 0.8-beta

Thanks,
Neha


On Mon, Feb 10, 2014 at 6:19 PM, Libo Yu  wrote:

> Hi team,
>
> We are using Kafka 0.8-beta1. The zookeeper in it is 3.3.3 (although the
> version in the license file is 3.3.4).
> I want to upgrade to a newer version. Any idea what I need to do in order
> to compile broker with a newer version
> of zookeeper? Thanks.
>
> Libo
>


How to compile with a newer version of zookeeper

2014-02-10 Thread Libo Yu
Hi team,

We are using Kafka 0.8-beta1. The zookeeper in it is 3.3.3 (although the 
version in the license file is 3.3.4).
I want to upgrade to a newer version. Any idea what I need to do in order to 
compile broker with a newer version 
of zookeeper? Thanks.

Libo
  

code + sbt tips

2014-02-10 Thread S Ahmed
Few quick questions that I hope people can help me with:


1. most of you guys use intellij, do you always build using sbt?  i.e. you
lose out on the bulid with IDE features like clicking on an error that
jumps to that part of the code etc.

2. do you just build using the default scala version 2.8 during development?


I have attached a screenshot of what it looks like on some .scala files,
does yours have this issue also? (see screenshot)


Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-10 Thread Philip O'Toole
I should we *think* this exception brought down the Consumer thread. The
problematic partition on our system was 2-29, so this is definitely the
related thread.

Philip


On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole  wrote:

> Saw this thrown today, which brought down a Consumer thread -- we're using
> Consumers built on the High-level consumer framework. What may have
> happened here? We are using a custom C++ Producer which does not do
> compression, and which hasn't changed in months, but this error is
> relatively new to us, and is occurring occasionally. We are running the Sun
> JDK:
>
> java version "1.7.0_25"
> Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
> Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
>
> Restarting the Consumer clears it up, so the message on the Broker itself
> does not appear to be problematic. We are running 3 Consumers, each of
> which has 48 ConsumerConnector objects. Our code explicitly calls commit(),
> we do not auto-commit.
>
> Thanks,
>
> Philip
>
> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
> FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed
> offset = 120758878080
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
> 120758878080
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> at
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> at
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> at
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
> FetcherRunnable
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
> 120758878080
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> at
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> at
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> at
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>


0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-10 Thread Philip O'Toole
Saw this thrown today, which brought down a Consumer thread -- we're using
Consumers built on the High-level consumer framework. What may have
happened here? We are using a custom C++ Producer which does not do
compression, and which hasn't changed in months, but this error is
relatively new to us, and is occurring occasionally. We are running the Sun
JDK:

java version "1.7.0_25"
Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)

Restarting the Consumer clears it up, so the message on the Broker itself
does not appear to be problematic. We are running 3 Consumers, each of
which has 48 ConsumerConnector objects. Our code explicitly calls commit(),
we do not auto-commit.

Thanks,

Philip

2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed
offset = 120758878080
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
120758878080
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at
kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
at
kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
at
kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
at
kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
FetcherRunnable
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset:
120758878080
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at
kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
at
kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
at
kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
at
kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)


Re: Config for new clients (and server)

2014-02-10 Thread Jay Kreps
Yeah I am aware of how zookeeper behaves, I think it is kind of gross.

I think logging it at DEBUG gets you what you want--by default we don't
pollute logs, but anyone who wants to log this can enable DEBUG logging on
org.apache.kafka.clients.producer.ProducerConfig.

If we want this on by default at LinkedIn we can just set this logger to
debug in our wrapper, we don't need to inflict this on everyone.

The point is that spewing out each config IS a debug according to our
definition:
  http://kafka.apache.org/coding-guide.html

-Jay


On Mon, Feb 10, 2014 at 2:01 PM, Jun Rao  wrote:

> I actually prefer to see those at INFO level. The reason is that the config
> system in an application can be complex. Some configs can be overridden in
> different layers and it may not be easy to determine what the final binding
> value is. The logging in Kafka will serve as the source of truth.
>
> For reference, ZK client logs all overridden values during initialization.
> It's a one time thing during starting up, so shouldn't add much noise. It's
> very useful for debugging subtle config issues.
>
> Exposing final configs programmatically is potentially useful. If we don't
> want to log overridden values out of box, an app can achieve the same thing
> using the programming api. The only missing thing is that we won't know
> those unused property keys, which is probably less important than seeing
> the overridden values.
>
> Thanks,
>
> Jun
>
>
> On Mon, Feb 10, 2014 at 10:15 AM, Jay Kreps  wrote:
>
> > Hey Jun,
> >
> > I think that is reasonable but would object to having it be debug
> logging?
> > I think logging out a bunch of noise during normal operation in a client
> > library is pretty ugly. Also, is there value in exposing the final
> configs
> > programmatically?
> >
> > -Jay
> >
> >
> >
> > On Sun, Feb 9, 2014 at 9:23 PM, Jun Rao  wrote:
> >
> > > +1 on the new config. Just one comment. Currently, when initiating a
> > config
> > > (e.g. ProducerConfig), we log those overridden property values and
> unused
> > > property keys (likely due to mis-spelling). This has been very useful
> for
> > > config verification. It would be good to add similar support in the new
> > > config.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Feb 4, 2014 at 9:34 AM, Jay Kreps  wrote:
> > >
> > > > We touched on this a bit in previous discussions, but I wanted to
> draw
> > > out
> > > > the approach to config specifically as an item of discussion.
> > > >
> > > > The new producer and consumer use a similar key-value config approach
> > as
> > > > the existing scala clients but have different implementation code to
> > help
> > > > define these configs. The plan is to use the same approach on the
> > server,
> > > > once the new clients are complete; so if we agree on this approach it
> > > will
> > > > be the new default across the board.
> > > >
> > > > Let me split this into two parts. First I will try to motivate the
> use
> > of
> > > > key-value pairs as a configuration api. Then let me discuss the
> > mechanics
> > > > of specifying and parsing these. If we agree on the public api then
> the
> > > > public api then the implementation details are interesting as this
> will
> > > be
> > > > shared across producer, consumer, and broker and potentially some
> > tools;
> > > > but if we disagree about the api then there is no point in discussing
> > the
> > > > implementation.
> > > >
> > > > Let me explain the rationale for this. In a sense a key-value map of
> > > > configs is the worst possible API to the programmer using the
> clients.
> > > Let
> > > > me contrast the pros and cons versus a POJO and motivate why I think
> it
> > > is
> > > > still superior overall.
> > > >
> > > > Pro: An application can externalize the configuration of its kafka
> > > clients
> > > > into its own configuration. Whatever config management system the
> > client
> > > > application is using will likely support key-value pairs, so the
> client
> > > > should be able to directly pull whatever configurations are present
> and
> > > use
> > > > them in its client. This means that any configuration the client
> > supports
> > > > can be added to any application at runtime. With the pojo approach
> the
> > > > client application has to expose each pojo getter as some config
> > > parameter.
> > > > The result of many applications doing this is that the config is
> > > different
> > > > for each and it is very hard to have a standard client config shared
> > > > across. Moving config into config files allows the usual tooling
> > (version
> > > > control, review, audit, config deployments separate from code pushes,
> > > > etc.).
> > > >
> > > > Pro: Backwards and forwards compatibility. Provided we stick to our
> > java
> > > > api many internals can evolve and expose new configs. The application
> > can
> > > > support both the new and old client by just specifying a config that
> > will
> > > > be unused in the older version (and 

Re: Mirrormaker clients not balanced

2014-02-10 Thread Tomas Nunez
I don't see anything in kafabe03. In fact, I just restarted it, and I
saw in the logs a lot of other topics registering. First, lines like
this:
(...)
[2014-02-10 20:30:07,195] INFO Loading log 'topic2-0' (kafka.log.LogManager)
[2014-02-10 20:30:07,201] INFO Loading the last segment
/var/kafka/topic2-0/000559732292.kafka in mutable mode,
recovery false (kafka.log.Log)
(...)
[2014-02-10 20:30:07,464] INFO Registering broker /brokers/ids/3
(kafka.server.KafkaZooKeeper)
[2014-02-10 20:30:07,486] INFO Registering broker /brokers/ids/3
succeeded with id:3,creatorId:kafkabe03-1392064207464,host:kafkabe03,port:9092
(kafka.server.KafkaZooKeeper)
[2014-02-10 20:30:07,516] INFO Begin registering broker topic
/brokers/topics/topic2/3 with 1 partitions
(kafka.server.KafkaZooKeeper)
[2014-02-10 20:30:07,522] INFO End registering broker topic
/brokers/topics/topic2/3 (kafka.server.KafkaZooKeeper)
(...)

But no trace of this particular topic, topic1.

Any idea why this particular topic isn't registering? Where can I look?

Thanks again
Tomàs


One of the be brokers (3) is not registered in ZK. Do you see ZK session
expiration (potentially due to GC) in that broker?

Thanks,

Jun




On Mon, Feb 10, 2014 at 4:28 PM, Tomas Nunez  wrote:

> Hi
>
> I'm new around here and I'm dealing with a problem, and reading the
> documentation I don't know where else to look.
>
> I have a cross-dc mirrormaker setup: Mirrormaker is consuming from 5
> frontend servers in each DC (10 in total) and 5 backend servers are
> consuming from mirrormaker. That's working for most of the topics, but some
> of them are not being consumed from all backend servers.
>
> as an example, browsing with zkcli I find this:
>
> kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
> zookeeperbe01:2181 ls /brokers/topics/topic1
> Connecting to zookeeperbe01:2181
>
> WATCHER::
>
> WatchedEvent state:SyncConnected type:None path:null
> [2, 1, 5, 4]
> kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
> zookeeperfe01:2181 ls /brokers/topics/topic1
> Connecting to zookeeperfe01:2181
>
> WATCHER::
>
> WatchedEvent state:SyncConnected type:None path:null
> [3, 2, 1, 5, 4]
>
> What can be wrong? Where can I look to get more info to troubleshoot this?
> Any hint?
>
> Kafka version is 0.7.2, in mirromaker config I have "--num.streams 5
> --num.producers 5".  Zookeeper version is 3.3.5,
>
> Here you can see kafka-console-consumer.sh connecting to both fe and be
> servers:
> https://gist.github.com/pythiannunez/1623934cb538678f053e
>
> Thanks!
>
>

-- 


--





Re: New Consumer API discussion

2014-02-10 Thread Pradeep Gollakota
WRT to hierarchical topics, I'm referring to
KAFKA-1175.
I would just like to think through the implications for the Consumer API if
and when we do implement hierarchical topics. For example, in the
proposalwritten
by Jay, he says that initially wildcard subscriptions are not going
to be supported. But does that mean that they will be supported in v2? If
that's the case, that would change the semantics of the Consumer API.

As to having classes for Topic, PartitionId, etc. it looks like I was
referring to the TopicPartition and TopicPartitionOffset classes (I didn't
realize these were already there). I was only looking at the confluence
page which shows List[(String, Int, Long)] instead of
List[TopicParitionOffset] (as is shown in the javadoc). However, I did
notice that we're not being consistent in the Java version. E.g. we have
commit(TopicPartitionOffset... offsets) and
lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the
other hand we have subscribe(String topic, int... partitions). I agree that
creating a class for TopicId today would probably not make too much sense
today. But with hierarchical topics, I may change my mind. This is exactly
what was done in the HBase API in 0.96 when namespaces were added. 0.96
HBase API introduced a class called 'TableName' to represent the namespace
and table name.


On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede wrote:

> Thanks for the feedback.
>
> Mattijs -
>
> - Constructors link to
> http://kafka.apache.org/documentation.html#consumerconfigs for valid
> configurations, which lists zookeeper.connect rather than
> metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
> Fixed it to just point to ConsumerConfig for now until we finalize the new
> configs
> - Docs for poll(long) mention consumer.commit(true), which I can't find in
> the Consumer docs. For a simple consumer setup, that call is something that
> would make a lot of sense.
> Missed changing the examples to use consumer.commit(true, offsets). The
> suggestions by Jay would change it to commit(offsets) and
> commitAsync(offsets), which will hopefully make it easier to understand
> those commit APIs.
> - Love the addition of MockConsumer, awesome for unittesting :)
> I'm not quite satisfied with what it does as of right now, but we will
> surely improve it as we start writing the consumer.
>
> Jay -
>
> 1. ConsumerRebalanceCallback
> a. Makes sense. Renamed to onPartitionsRevoked
> b. Ya, it will be good to make it forward compatible with Java 8
> capabilities. We can change it to PartitionsAssignedCallback and
>  PartitionsRevokedCallback or RebalanceBeginCallback and
> RebalanceEndCallback?
> c. Ya, I thought about that but then didn't name it just
> RebalanceCallback since there could be a conflict with a controller side
> rebalance callback if/when we have one. However, you can argue that at that
> time we can name it ControllerRebalanceCallback instead of polluting a user
> facing API. So agree with you here.
> 2. Ya, that is a good idea. Changed to subscribe(String topic,
> int...partitions).
> 3. lastCommittedOffset() is not necessarily a local access since the
> consumer can potentially ask for the last committed offsets of partitions
> that the consumer does not consume and maintain the offsets for. That's the
> reason it is batched right now.
> 4. Yes, look at
>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
> 5. Sure, but that is not part of the consumer API right? I think you're
> suggesting looking at OffsetRequest to see if it would do that properly?
> 6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
> negative timeout will poll indefinitely?
> 7. Good point. Changed to commit(...) and commitAsync(...)
> 8. To commit the current position for all partitions owned by the consumer,
> you can use commit(). If you don't use group management, then
> commit(customListOfPartitions)
> 9. Forgot to include unsubscribe. Done now.
> 10. positions() can be called at any time and affects the next fetch on the
> next poll(). Fixed the places that said "starting fetch offsets"
> 11. Can we not look that up by going through the messages returned and
> getting the offset from the ConsumerRecord?
>
> One thing that I really found helpful for the API design was writing out
> actual code for different scenarios against the API. I think it might be
> good to do that for this too--i.e. enumerate the various use cases and code
> that use case up to see how it looks
> The javadocs include examples for almost all possible scenarios of consumer
> usage, that I could come up with. Will add more to the javadocs as I get
> more feedback from our users. The advantage of having the examples in the
> javadoc itself is to that the

Re: Config for new clients (and server)

2014-02-10 Thread Pradeep Gollakota
+1 Jun.


On Mon, Feb 10, 2014 at 2:17 PM, Sriram Subramanian <
srsubraman...@linkedin.com> wrote:

> +1 on Jun's suggestion.
>
> On 2/10/14 2:01 PM, "Jun Rao"  wrote:
>
> >I actually prefer to see those at INFO level. The reason is that the
> >config
> >system in an application can be complex. Some configs can be overridden in
> >different layers and it may not be easy to determine what the final
> >binding
> >value is. The logging in Kafka will serve as the source of truth.
> >
> >For reference, ZK client logs all overridden values during initialization.
> >It's a one time thing during starting up, so shouldn't add much noise.
> >It's
> >very useful for debugging subtle config issues.
> >
> >Exposing final configs programmatically is potentially useful. If we don't
> >want to log overridden values out of box, an app can achieve the same
> >thing
> >using the programming api. The only missing thing is that we won't know
> >those unused property keys, which is probably less important than seeing
> >the overridden values.
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Mon, Feb 10, 2014 at 10:15 AM, Jay Kreps  wrote:
> >
> >> Hey Jun,
> >>
> >> I think that is reasonable but would object to having it be debug
> >>logging?
> >> I think logging out a bunch of noise during normal operation in a client
> >> library is pretty ugly. Also, is there value in exposing the final
> >>configs
> >> programmatically?
> >>
> >> -Jay
> >>
> >>
> >>
> >> On Sun, Feb 9, 2014 at 9:23 PM, Jun Rao  wrote:
> >>
> >> > +1 on the new config. Just one comment. Currently, when initiating a
> >> config
> >> > (e.g. ProducerConfig), we log those overridden property values and
> >>unused
> >> > property keys (likely due to mis-spelling). This has been very useful
> >>for
> >> > config verification. It would be good to add similar support in the
> >>new
> >> > config.
> >> >
> >> > Thanks,
> >> >
> >> > Jun
> >> >
> >> >
> >> > On Tue, Feb 4, 2014 at 9:34 AM, Jay Kreps 
> wrote:
> >> >
> >> > > We touched on this a bit in previous discussions, but I wanted to
> >>draw
> >> > out
> >> > > the approach to config specifically as an item of discussion.
> >> > >
> >> > > The new producer and consumer use a similar key-value config
> >>approach
> >> as
> >> > > the existing scala clients but have different implementation code to
> >> help
> >> > > define these configs. The plan is to use the same approach on the
> >> server,
> >> > > once the new clients are complete; so if we agree on this approach
> >>it
> >> > will
> >> > > be the new default across the board.
> >> > >
> >> > > Let me split this into two parts. First I will try to motivate the
> >>use
> >> of
> >> > > key-value pairs as a configuration api. Then let me discuss the
> >> mechanics
> >> > > of specifying and parsing these. If we agree on the public api then
> >>the
> >> > > public api then the implementation details are interesting as this
> >>will
> >> > be
> >> > > shared across producer, consumer, and broker and potentially some
> >> tools;
> >> > > but if we disagree about the api then there is no point in
> >>discussing
> >> the
> >> > > implementation.
> >> > >
> >> > > Let me explain the rationale for this. In a sense a key-value map of
> >> > > configs is the worst possible API to the programmer using the
> >>clients.
> >> > Let
> >> > > me contrast the pros and cons versus a POJO and motivate why I
> >>think it
> >> > is
> >> > > still superior overall.
> >> > >
> >> > > Pro: An application can externalize the configuration of its kafka
> >> > clients
> >> > > into its own configuration. Whatever config management system the
> >> client
> >> > > application is using will likely support key-value pairs, so the
> >>client
> >> > > should be able to directly pull whatever configurations are present
> >>and
> >> > use
> >> > > them in its client. This means that any configuration the client
> >> supports
> >> > > can be added to any application at runtime. With the pojo approach
> >>the
> >> > > client application has to expose each pojo getter as some config
> >> > parameter.
> >> > > The result of many applications doing this is that the config is
> >> > different
> >> > > for each and it is very hard to have a standard client config shared
> >> > > across. Moving config into config files allows the usual tooling
> >> (version
> >> > > control, review, audit, config deployments separate from code
> >>pushes,
> >> > > etc.).
> >> > >
> >> > > Pro: Backwards and forwards compatibility. Provided we stick to our
> >> java
> >> > > api many internals can evolve and expose new configs. The
> >>application
> >> can
> >> > > support both the new and old client by just specifying a config that
> >> will
> >> > > be unused in the older version (and of course the reverse--we can
> >> remove
> >> > > obsolete configs).
> >> > >
> >> > > Pro: We can use a similar mechanism for both the client and the
> >>server.
> >> > > Since most people run the server as a stand-alone process it needs a

Re: New Consumer API discussion

2014-02-10 Thread Neha Narkhede
Thanks for the feedback.

Mattijs -

- Constructors link to
http://kafka.apache.org/documentation.html#consumerconfigs for valid
configurations, which lists zookeeper.connect rather than
metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
Fixed it to just point to ConsumerConfig for now until we finalize the new
configs
- Docs for poll(long) mention consumer.commit(true), which I can't find in
the Consumer docs. For a simple consumer setup, that call is something that
would make a lot of sense.
Missed changing the examples to use consumer.commit(true, offsets). The
suggestions by Jay would change it to commit(offsets) and
commitAsync(offsets), which will hopefully make it easier to understand
those commit APIs.
- Love the addition of MockConsumer, awesome for unittesting :)
I'm not quite satisfied with what it does as of right now, but we will
surely improve it as we start writing the consumer.

Jay -

1. ConsumerRebalanceCallback
a. Makes sense. Renamed to onPartitionsRevoked
b. Ya, it will be good to make it forward compatible with Java 8
capabilities. We can change it to PartitionsAssignedCallback and
 PartitionsRevokedCallback or RebalanceBeginCallback and
RebalanceEndCallback?
c. Ya, I thought about that but then didn't name it just
RebalanceCallback since there could be a conflict with a controller side
rebalance callback if/when we have one. However, you can argue that at that
time we can name it ControllerRebalanceCallback instead of polluting a user
facing API. So agree with you here.
2. Ya, that is a good idea. Changed to subscribe(String topic,
int...partitions).
3. lastCommittedOffset() is not necessarily a local access since the
consumer can potentially ask for the last committed offsets of partitions
that the consumer does not consume and maintain the offsets for. That's the
reason it is batched right now.
4. Yes, look at
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
5. Sure, but that is not part of the consumer API right? I think you're
suggesting looking at OffsetRequest to see if it would do that properly?
6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
negative timeout will poll indefinitely?
7. Good point. Changed to commit(...) and commitAsync(...)
8. To commit the current position for all partitions owned by the consumer,
you can use commit(). If you don't use group management, then
commit(customListOfPartitions)
9. Forgot to include unsubscribe. Done now.
10. positions() can be called at any time and affects the next fetch on the
next poll(). Fixed the places that said "starting fetch offsets"
11. Can we not look that up by going through the messages returned and
getting the offset from the ConsumerRecord?

One thing that I really found helpful for the API design was writing out
actual code for different scenarios against the API. I think it might be
good to do that for this too--i.e. enumerate the various use cases and code
that use case up to see how it looks
The javadocs include examples for almost all possible scenarios of consumer
usage, that I could come up with. Will add more to the javadocs as I get
more feedback from our users. The advantage of having the examples in the
javadoc itself is to that the usage is self explanatory to new users.

Pradeep -

2. Changed to poll(long, TimeUnit) and a negative value for the timeout
would block in the poll forever until there is new data
3. We don't have hierarchical topics support. Would you mind explaining
what you meant?
4. I'm not so sure that we need a class to express a topic which is a
string and a separate class for just partition id. We do have a class for
TopicPartition which uniquely identifies a partition of a topic

Thanks,
Neha


On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota wrote:

> Couple of very quick thoughts.
>
> 1. +1 about renaming commit(...) and commitAsync(...)
> 2. I'd also like to extend the above for the poll()  method as well. poll()
> and pollWithTimeout(long, TimeUnit)?
> 3. Have you guys given any thought around how this API would be used with
> hierarchical topics?
> 4. Would it make sense to add classes such as TopicId, PartitionId, etc?
> Seems like it would be easier to read code with these classes as opposed to
> string and longs.
>
> - Pradeep
>
>
> On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps  wrote:
>
> > A few items:
> > 1. ConsumerRebalanceCallback
> >a. onPartitionsRevoked would be a better name.
> >b. We should discuss the possibility of splitting this into two
> > interfaces. The motivation would be that in Java 8 single method
> interfaces
> > can directly take methods which might be more intuitive.
> >c. If we stick with a single interface I would prefer the name
> > RebalanceCallback as its more concise
> > 2. Should subscribe(String topic, int partition) should be
> subscribe(String
> > topic, int...partition)?
> > 3.

Re: New Consumer API discussion

2014-02-10 Thread Guozhang Wang
Hi Mattijs:

2. As Neha said, one design of the new consumer is to have non-blocking
consuming API instead of blocking API. Do you have a strong reason in mind
to still keep the blocking API instead of just using "while(no-data)
poll(timeout)"?

3. No we have not thought about hierarchical topics. Could you elaborate on
some use cases?

4. Consumer will share some of the common code as Producer, in which the
ProduceRecord has

private final String topic;
private final Integer partition;
private final byte[] key;
private final byte[] value;

Thanks,

Guozhang


On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang  wrote:

> Hello Jay,
>
> Thanks for the detailed comments.
>
> 1. Yeah we could discuss a bit more on that.
>
> 2. Since subscribe() is incremental, adding one topic-partition is OK, and
> personally I think it is cleaner than subscribe(String topic,
> int...partition)?
>
> 3. Originally I was thinking about two interfaces:
>
> getOffsets() // offsets for all partitions that I am consuming now
>
> getOffset(topc-partition) // offset of the specified topic-partition, will
> throw exception if it is not currently consumed.
>
> What do you think about these?
>
> 4. Yes, that remains a config.
>
> 5. Agree.
>
> 6. If the time out value is null then it will "logically" return
> immediately with whatever data is available. I think an indefinitely poll()
> function could be replaced with just
>
> while (true) poll(some-time)?
>
> 7. I am open with either approach.
>
> 8. I was thinking about two interfaces for the commit functionality:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>
> Do those sound better?
>
> 9. Currently I think about un-subscribe as "close and re-subscribe", and
> would like to hear people's opinion about it.
>
> 10. Yes. Position() is an API function, and as and API it means "be called
> at any time" and will change the next fetching starting offset.
>
> 11. The ConsumerRecord would have the offset info of the message. Is that
> what you want?
>
> About use cases: great point. I will add some more examples of using the
> API functions in the wiki pages.
>
> Guozhang
>
>
>
>
> On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps  wrote:
>
>> A few items:
>> 1. ConsumerRebalanceCallback
>>a. onPartitionsRevoked would be a better name.
>>b. We should discuss the possibility of splitting this into two
>> interfaces. The motivation would be that in Java 8 single method
>> interfaces
>> can directly take methods which might be more intuitive.
>>c. If we stick with a single interface I would prefer the name
>> RebalanceCallback as its more concise
>> 2. Should subscribe(String topic, int partition) should be
>> subscribe(String
>> topic, int...partition)?
>> 3. Is lastCommittedOffset call just a local access? If so it would be more
>> convenient not to batch it.
>> 4. How are we going to handle the earliest/latest starting position
>> functionality we currently have. Does that remain a config?
>> 5. Do we need to expose the general ability to get known positions from
>> the
>> log? E.g. the functionality in the OffsetRequest...? That would make the
>> ability to change position a little easier.
>> 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
>> unit)? Is it Long because it allows null? If so should we just add a
>> poll()
>> that polls indefinitely?
>> 7. I recommend we remove the boolean parameter from commit as it is really
>> hard to read code that has boolean parameters without named arguments. Can
>> we make it something like commit(...) and commitAsync(...)?
>> 8. What about the common case where you just want to commit the current
>> position for all partitions?
>> 9. How do you unsubscribe?
>> 10. You say in a few places that positions() only impacts the starting
>> position, but surely that isn't the case, right? Surely it controls the
>> fetch position for that partition and can be called at any time? Otherwise
>> it is a pretty weird api, right?
>> 11. How do I get my current position? Not the committed position but the
>> offset of the next message that will be given to me?
>>
>> One thing that I really found helpful for the API design was writing out
>> actual code for different scenarios against the API. I think it might be
>> good to do that for this too--i.e. enumerate the various use cases and
>> code
>> that use case up to see how it looks. I'm not sure if it would be useful
>> to
>> collect these kinds of scenarios from people. I know they have
>> sporadically
>> popped up on the mailing list.
>>
>> -Jay
>>
>>
>> On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede > >wrote:
>>
>> > As mentioned in previous emails, we are also working on a
>> re-implementation
>> > of the consumer. I would like to use this email thread to discuss the
>> > details of the public API. I would also like us to be picky about this
>> > public api now so it is as good as possible and we don't need to break
>> it
>> > i

Re: New Consumer API discussion

2014-02-10 Thread Guozhang Wang
Hello Jay,

Thanks for the detailed comments.

1. Yeah we could discuss a bit more on that.

2. Since subscribe() is incremental, adding one topic-partition is OK, and
personally I think it is cleaner than subscribe(String topic,
int...partition)?

3. Originally I was thinking about two interfaces:

getOffsets() // offsets for all partitions that I am consuming now

getOffset(topc-partition) // offset of the specified topic-partition, will
throw exception if it is not currently consumed.

What do you think about these?

4. Yes, that remains a config.

5. Agree.

6. If the time out value is null then it will "logically" return
immediately with whatever data is available. I think an indefinitely poll()
function could be replaced with just

while (true) poll(some-time)?

7. I am open with either approach.

8. I was thinking about two interfaces for the commit functionality:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Do those sound better?

9. Currently I think about un-subscribe as "close and re-subscribe", and
would like to hear people's opinion about it.

10. Yes. Position() is an API function, and as and API it means "be called
at any time" and will change the next fetching starting offset.

11. The ConsumerRecord would have the offset info of the message. Is that
what you want?

About use cases: great point. I will add some more examples of using the
API functions in the wiki pages.

Guozhang




On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps  wrote:

> A few items:
> 1. ConsumerRebalanceCallback
>a. onPartitionsRevoked would be a better name.
>b. We should discuss the possibility of splitting this into two
> interfaces. The motivation would be that in Java 8 single method interfaces
> can directly take methods which might be more intuitive.
>c. If we stick with a single interface I would prefer the name
> RebalanceCallback as its more concise
> 2. Should subscribe(String topic, int partition) should be subscribe(String
> topic, int...partition)?
> 3. Is lastCommittedOffset call just a local access? If so it would be more
> convenient not to batch it.
> 4. How are we going to handle the earliest/latest starting position
> functionality we currently have. Does that remain a config?
> 5. Do we need to expose the general ability to get known positions from the
> log? E.g. the functionality in the OffsetRequest...? That would make the
> ability to change position a little easier.
> 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
> unit)? Is it Long because it allows null? If so should we just add a poll()
> that polls indefinitely?
> 7. I recommend we remove the boolean parameter from commit as it is really
> hard to read code that has boolean parameters without named arguments. Can
> we make it something like commit(...) and commitAsync(...)?
> 8. What about the common case where you just want to commit the current
> position for all partitions?
> 9. How do you unsubscribe?
> 10. You say in a few places that positions() only impacts the starting
> position, but surely that isn't the case, right? Surely it controls the
> fetch position for that partition and can be called at any time? Otherwise
> it is a pretty weird api, right?
> 11. How do I get my current position? Not the committed position but the
> offset of the next message that will be given to me?
>
> One thing that I really found helpful for the API design was writing out
> actual code for different scenarios against the API. I think it might be
> good to do that for this too--i.e. enumerate the various use cases and code
> that use case up to see how it looks. I'm not sure if it would be useful to
> collect these kinds of scenarios from people. I know they have sporadically
> popped up on the mailing list.
>
> -Jay
>
>
> On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede  >wrote:
>
> > As mentioned in previous emails, we are also working on a
> re-implementation
> > of the consumer. I would like to use this email thread to discuss the
> > details of the public API. I would also like us to be picky about this
> > public api now so it is as good as possible and we don't need to break it
> > in the future.
> >
> > The best way to get a feel for the API is actually to take a look at the
> > javadoc<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >,
> > the hope is to get the api docs good enough so that it is
> self-explanatory.
> > You can also take a look at the configs
> > here<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > >
> >
> > Some background info on implementation:
> >
> > At a high level the primary difference in this consumer is that it
> removes
> > the distinction between the "high-level" and "low-level" consumer. The
> new
> > consumer API is non blocking and instead of returning a blocking
> iterator,
> > the consume

Re: Config for new clients (and server)

2014-02-10 Thread Sriram Subramanian
+1 on Jun's suggestion.

On 2/10/14 2:01 PM, "Jun Rao"  wrote:

>I actually prefer to see those at INFO level. The reason is that the
>config
>system in an application can be complex. Some configs can be overridden in
>different layers and it may not be easy to determine what the final
>binding
>value is. The logging in Kafka will serve as the source of truth.
>
>For reference, ZK client logs all overridden values during initialization.
>It's a one time thing during starting up, so shouldn't add much noise.
>It's
>very useful for debugging subtle config issues.
>
>Exposing final configs programmatically is potentially useful. If we don't
>want to log overridden values out of box, an app can achieve the same
>thing
>using the programming api. The only missing thing is that we won't know
>those unused property keys, which is probably less important than seeing
>the overridden values.
>
>Thanks,
>
>Jun
>
>
>On Mon, Feb 10, 2014 at 10:15 AM, Jay Kreps  wrote:
>
>> Hey Jun,
>>
>> I think that is reasonable but would object to having it be debug
>>logging?
>> I think logging out a bunch of noise during normal operation in a client
>> library is pretty ugly. Also, is there value in exposing the final
>>configs
>> programmatically?
>>
>> -Jay
>>
>>
>>
>> On Sun, Feb 9, 2014 at 9:23 PM, Jun Rao  wrote:
>>
>> > +1 on the new config. Just one comment. Currently, when initiating a
>> config
>> > (e.g. ProducerConfig), we log those overridden property values and
>>unused
>> > property keys (likely due to mis-spelling). This has been very useful
>>for
>> > config verification. It would be good to add similar support in the
>>new
>> > config.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> >
>> > On Tue, Feb 4, 2014 at 9:34 AM, Jay Kreps  wrote:
>> >
>> > > We touched on this a bit in previous discussions, but I wanted to
>>draw
>> > out
>> > > the approach to config specifically as an item of discussion.
>> > >
>> > > The new producer and consumer use a similar key-value config
>>approach
>> as
>> > > the existing scala clients but have different implementation code to
>> help
>> > > define these configs. The plan is to use the same approach on the
>> server,
>> > > once the new clients are complete; so if we agree on this approach
>>it
>> > will
>> > > be the new default across the board.
>> > >
>> > > Let me split this into two parts. First I will try to motivate the
>>use
>> of
>> > > key-value pairs as a configuration api. Then let me discuss the
>> mechanics
>> > > of specifying and parsing these. If we agree on the public api then
>>the
>> > > public api then the implementation details are interesting as this
>>will
>> > be
>> > > shared across producer, consumer, and broker and potentially some
>> tools;
>> > > but if we disagree about the api then there is no point in
>>discussing
>> the
>> > > implementation.
>> > >
>> > > Let me explain the rationale for this. In a sense a key-value map of
>> > > configs is the worst possible API to the programmer using the
>>clients.
>> > Let
>> > > me contrast the pros and cons versus a POJO and motivate why I
>>think it
>> > is
>> > > still superior overall.
>> > >
>> > > Pro: An application can externalize the configuration of its kafka
>> > clients
>> > > into its own configuration. Whatever config management system the
>> client
>> > > application is using will likely support key-value pairs, so the
>>client
>> > > should be able to directly pull whatever configurations are present
>>and
>> > use
>> > > them in its client. This means that any configuration the client
>> supports
>> > > can be added to any application at runtime. With the pojo approach
>>the
>> > > client application has to expose each pojo getter as some config
>> > parameter.
>> > > The result of many applications doing this is that the config is
>> > different
>> > > for each and it is very hard to have a standard client config shared
>> > > across. Moving config into config files allows the usual tooling
>> (version
>> > > control, review, audit, config deployments separate from code
>>pushes,
>> > > etc.).
>> > >
>> > > Pro: Backwards and forwards compatibility. Provided we stick to our
>> java
>> > > api many internals can evolve and expose new configs. The
>>application
>> can
>> > > support both the new and old client by just specifying a config that
>> will
>> > > be unused in the older version (and of course the reverse--we can
>> remove
>> > > obsolete configs).
>> > >
>> > > Pro: We can use a similar mechanism for both the client and the
>>server.
>> > > Since most people run the server as a stand-alone process it needs a
>> > config
>> > > file.
>> > >
>> > > Pro: Systems like Samza that need to ship configs across the network
>> can
>> > > easily do so as configs have a natural serialized form. This can be
>> done
>> > > with pojos using java serialization but it is ugly and has bizare
>> failure
>> > > cases.
>> > >
>> > > Con: The IDE gives nice auto-completion for pojos.
>> > >
>> > > Con: There 

Re: New Consumer API discussion

2014-02-10 Thread Guozhang Wang
Hi Mattijs:

We have not updated the wiki pages for config yet, and it will not be
updated until we release 0.9 with these changes.

Currently consumers do have a commitOffsets function that can be called by
the users, but for most use cases auto.commit is turned on and this
function gets called by the consumer client itself.

Guozhang



On Mon, Feb 10, 2014 at 11:18 AM, Mattijs Ugen  wrote:

> Hey Neha,
>
> This looks really promising, I particularly like the ability to commit
> offsets for topic/partition tuples over just commit(). Some remarks:
>
> - Constructors link to http://kafka.apache.org/documentation.html#
> consumerconfigs for valid configurations, which lists zookeeper.connect
> rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in
> ConsumerConfig.
> - Docs for poll(long) mention consumer.commit(true), which I can't find in
> the Consumer docs. For a simple consumer setup, that call is something that
> would make a lot of sense.
> - Love the addition of MockConsumer, awesome for unittesting :)
>
> Digging these open discussions on API changes on the mailing list btw,
> keep up the good work :)
>
> Kind regards,
>
> Mattijs
>



-- 
-- Guozhang


Re: Config for new clients (and server)

2014-02-10 Thread Jun Rao
I actually prefer to see those at INFO level. The reason is that the config
system in an application can be complex. Some configs can be overridden in
different layers and it may not be easy to determine what the final binding
value is. The logging in Kafka will serve as the source of truth.

For reference, ZK client logs all overridden values during initialization.
It's a one time thing during starting up, so shouldn't add much noise. It's
very useful for debugging subtle config issues.

Exposing final configs programmatically is potentially useful. If we don't
want to log overridden values out of box, an app can achieve the same thing
using the programming api. The only missing thing is that we won't know
those unused property keys, which is probably less important than seeing
the overridden values.

Thanks,

Jun


On Mon, Feb 10, 2014 at 10:15 AM, Jay Kreps  wrote:

> Hey Jun,
>
> I think that is reasonable but would object to having it be debug logging?
> I think logging out a bunch of noise during normal operation in a client
> library is pretty ugly. Also, is there value in exposing the final configs
> programmatically?
>
> -Jay
>
>
>
> On Sun, Feb 9, 2014 at 9:23 PM, Jun Rao  wrote:
>
> > +1 on the new config. Just one comment. Currently, when initiating a
> config
> > (e.g. ProducerConfig), we log those overridden property values and unused
> > property keys (likely due to mis-spelling). This has been very useful for
> > config verification. It would be good to add similar support in the new
> > config.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Feb 4, 2014 at 9:34 AM, Jay Kreps  wrote:
> >
> > > We touched on this a bit in previous discussions, but I wanted to draw
> > out
> > > the approach to config specifically as an item of discussion.
> > >
> > > The new producer and consumer use a similar key-value config approach
> as
> > > the existing scala clients but have different implementation code to
> help
> > > define these configs. The plan is to use the same approach on the
> server,
> > > once the new clients are complete; so if we agree on this approach it
> > will
> > > be the new default across the board.
> > >
> > > Let me split this into two parts. First I will try to motivate the use
> of
> > > key-value pairs as a configuration api. Then let me discuss the
> mechanics
> > > of specifying and parsing these. If we agree on the public api then the
> > > public api then the implementation details are interesting as this will
> > be
> > > shared across producer, consumer, and broker and potentially some
> tools;
> > > but if we disagree about the api then there is no point in discussing
> the
> > > implementation.
> > >
> > > Let me explain the rationale for this. In a sense a key-value map of
> > > configs is the worst possible API to the programmer using the clients.
> > Let
> > > me contrast the pros and cons versus a POJO and motivate why I think it
> > is
> > > still superior overall.
> > >
> > > Pro: An application can externalize the configuration of its kafka
> > clients
> > > into its own configuration. Whatever config management system the
> client
> > > application is using will likely support key-value pairs, so the client
> > > should be able to directly pull whatever configurations are present and
> > use
> > > them in its client. This means that any configuration the client
> supports
> > > can be added to any application at runtime. With the pojo approach the
> > > client application has to expose each pojo getter as some config
> > parameter.
> > > The result of many applications doing this is that the config is
> > different
> > > for each and it is very hard to have a standard client config shared
> > > across. Moving config into config files allows the usual tooling
> (version
> > > control, review, audit, config deployments separate from code pushes,
> > > etc.).
> > >
> > > Pro: Backwards and forwards compatibility. Provided we stick to our
> java
> > > api many internals can evolve and expose new configs. The application
> can
> > > support both the new and old client by just specifying a config that
> will
> > > be unused in the older version (and of course the reverse--we can
> remove
> > > obsolete configs).
> > >
> > > Pro: We can use a similar mechanism for both the client and the server.
> > > Since most people run the server as a stand-alone process it needs a
> > config
> > > file.
> > >
> > > Pro: Systems like Samza that need to ship configs across the network
> can
> > > easily do so as configs have a natural serialized form. This can be
> done
> > > with pojos using java serialization but it is ugly and has bizare
> failure
> > > cases.
> > >
> > > Con: The IDE gives nice auto-completion for pojos.
> > >
> > > Con: There are some advantages to javadoc as a documentation mechanism
> > for
> > > java people.
> > >
> > > Basically to me this is about operability versus niceness of api and I
> > > think operability is more important.
> > >
> > > Let me now g

RE: Building a producer/consumer supporting exactly-once messaging

2014-02-10 Thread Garry Turkington
Thanks Jay for the info, and Neha for adding it to the FAQ!

On the producer side I've been going down Jay's second route, i.e. adding 
metadata to the messages as they are published. Though in my case I don't just 
want to avoid duplicates on a per-message basis but be able to quickly identify 
a partially ingested file so I can quickly drop any related messages.

Since I'll have multiple producers I'm looking to ZooKeeper to help ensure only 
one reads a given file at a time so I can add to each message the filename and 
a producer uuid then after a file is fully written either publish completion 
notices to a different topic or mark the file's ZNode appropriately. A client 
can then tell if the copy of a given message it is reading comes from the 
'committed' ingest of the file (matching producer uuid) or a file that was only 
partially ingested and should be ignored.

I think this holds together and given my single file reader requirement I'll 
always need extra machinery outside of Kafka but if things like producer 
idempotence are possible/truly cheaper server side then that'd be very 
interesting.

I found Jay's wiki page on the idempotent producer support and that looks 
really good. Since it looks like in that model the pid is something the client 
sends with each message then I could change my workflow to be:

1. Producer gains ZK lock on a file ZNode
2. Producer adds the pid as an attribute on the file ZNode if none is already 
associated with it
3. Producer starts reading/sending messages
4. If a producer fails another can  look for the pid attribute and use it when 
resending the messages

Very interested in this whole topic.

Garry

-Original Message-
From: Jay Kreps [mailto:jay.kr...@gmail.com] 
Sent: 10 February 2014 17:47
To: users@kafka.apache.org
Subject: Re: Building a producer/consumer supporting exactly-once messaging

The out-of-the-box support for this in Kafka isn't great right now.

Exactly once semantics has two parts: avoiding duplication during data 
production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data 
production.

1. Use a single-writer per partition and every time you get a network error 
check the last message in that partition to see if your last write succeeded 2. 
Include a primary key (UUID or something) in the message and deduplicate on the 
consumer.

If you do one of these things the log that Kafka hosts will be duplicate free. 
However reading without duplicates depends on some co-operation from the 
consumer too. If the consumer is periodically checkpointing its position then 
if it fails and restarts it will restart from the checkpointed position. Thus 
if the data output and the checkpoint are not written atomically it will be 
possible to get duplicates here as well. This problem is particular to your 
storage system. For example if you are using a database you could commit these 
together in a transaction. The HDFS loader Camus that LinkedIn wrote does 
something like this for Hadoop loads.
The other alternative that doesn't require a transaction is to store the offset 
with the data loaded and deduplicate using the topic/partition/offset 
combination.

I think there are two improvements that would make this a lot easier:
1. I think producer idempotence is something that could be done automatically 
and much more cheaply by optionally integrating support for this on the server.
2. The existing high-level consumer doesn't expose a lot of the more fine 
grained control of offsets (e.g. to reset your position). We will be working on 
that soon.

-Jay







On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington < 
g.turking...@improvedigital.com> wrote:

> Hi,
>
> I've been doing some prototyping on Kafka for a few months now and 
> like what I see. It's a good fit for some of my use cases in the areas 
> of data distribution but also for processing - liking a lot of what I see in 
> Samza.
> I'm now working through some of the operational issues and have a 
> question to the community.
>
> I have several data sources that I want to push into Kafka but some of 
> the most important are arriving as a stream of files being dropped 
> either into a SFTP location or S3. Conceptually the data is really a 
> stream but its being chunked and made more batch by the deployment 
> model of the operational servers. So pulling the data into Kafka and 
> seeing it more as a stream again is a big plus.
>
> But, I really don't want duplicate messages. I know Kafka provides at 
> least once semantics and that's fine, I'm happy to have the de-dupe 
> logic external to Kafka. And if I look at my producer I can build up a 
> protocol around adding record metadata and using Zookeeper to give me 
> pretty high confidence that my clients will know if they are reading 
> from a file that was fully published into Kafka or not.
>
> I had assumed that this wouldn't be a unique use case but on doing a 
> bunch

Re: New Consumer API discussion

2014-02-10 Thread Pradeep Gollakota
Couple of very quick thoughts.

1. +1 about renaming commit(...) and commitAsync(...)
2. I'd also like to extend the above for the poll()  method as well. poll()
and pollWithTimeout(long, TimeUnit)?
3. Have you guys given any thought around how this API would be used with
hierarchical topics?
4. Would it make sense to add classes such as TopicId, PartitionId, etc?
Seems like it would be easier to read code with these classes as opposed to
string and longs.

- Pradeep


On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps  wrote:

> A few items:
> 1. ConsumerRebalanceCallback
>a. onPartitionsRevoked would be a better name.
>b. We should discuss the possibility of splitting this into two
> interfaces. The motivation would be that in Java 8 single method interfaces
> can directly take methods which might be more intuitive.
>c. If we stick with a single interface I would prefer the name
> RebalanceCallback as its more concise
> 2. Should subscribe(String topic, int partition) should be subscribe(String
> topic, int...partition)?
> 3. Is lastCommittedOffset call just a local access? If so it would be more
> convenient not to batch it.
> 4. How are we going to handle the earliest/latest starting position
> functionality we currently have. Does that remain a config?
> 5. Do we need to expose the general ability to get known positions from the
> log? E.g. the functionality in the OffsetRequest...? That would make the
> ability to change position a little easier.
> 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
> unit)? Is it Long because it allows null? If so should we just add a poll()
> that polls indefinitely?
> 7. I recommend we remove the boolean parameter from commit as it is really
> hard to read code that has boolean parameters without named arguments. Can
> we make it something like commit(...) and commitAsync(...)?
> 8. What about the common case where you just want to commit the current
> position for all partitions?
> 9. How do you unsubscribe?
> 10. You say in a few places that positions() only impacts the starting
> position, but surely that isn't the case, right? Surely it controls the
> fetch position for that partition and can be called at any time? Otherwise
> it is a pretty weird api, right?
> 11. How do I get my current position? Not the committed position but the
> offset of the next message that will be given to me?
>
> One thing that I really found helpful for the API design was writing out
> actual code for different scenarios against the API. I think it might be
> good to do that for this too--i.e. enumerate the various use cases and code
> that use case up to see how it looks. I'm not sure if it would be useful to
> collect these kinds of scenarios from people. I know they have sporadically
> popped up on the mailing list.
>
> -Jay
>
>
> On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede  >wrote:
>
> > As mentioned in previous emails, we are also working on a
> re-implementation
> > of the consumer. I would like to use this email thread to discuss the
> > details of the public API. I would also like us to be picky about this
> > public api now so it is as good as possible and we don't need to break it
> > in the future.
> >
> > The best way to get a feel for the API is actually to take a look at the
> > javadoc<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >,
> > the hope is to get the api docs good enough so that it is
> self-explanatory.
> > You can also take a look at the configs
> > here<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > >
> >
> > Some background info on implementation:
> >
> > At a high level the primary difference in this consumer is that it
> removes
> > the distinction between the "high-level" and "low-level" consumer. The
> new
> > consumer API is non blocking and instead of returning a blocking
> iterator,
> > the consumer provides a poll() API that returns a list of records. We
> think
> > this is better compared to the blocking iterators since it effectively
> > decouples the threading strategy used for processing messages from the
> > consumer. It is worth noting that the consumer is entirely single
> threaded
> > and runs in the user thread. The advantage is that it can be easily
> > rewritten in less multi-threading-friendly languages. The consumer
> batches
> > data and multiplexes I/O over TCP connections to each of the brokers it
> > communicates with, for high throughput. The consumer also allows long
> poll
> > to reduce the end-to-end message latency for low throughput data.
> >
> > The consumer provides a group management facility that supports the
> concept
> > of a group with multiple consumer instances (just like the current
> > consumer). This is done through a custom heartbeat and group management
> > protocol transparent to the user. At the same time, it allows users the
> >

Re: New Consumer API discussion

2014-02-10 Thread Jay Kreps
A few items:
1. ConsumerRebalanceCallback
   a. onPartitionsRevoked would be a better name.
   b. We should discuss the possibility of splitting this into two
interfaces. The motivation would be that in Java 8 single method interfaces
can directly take methods which might be more intuitive.
   c. If we stick with a single interface I would prefer the name
RebalanceCallback as its more concise
2. Should subscribe(String topic, int partition) should be subscribe(String
topic, int...partition)?
3. Is lastCommittedOffset call just a local access? If so it would be more
convenient not to batch it.
4. How are we going to handle the earliest/latest starting position
functionality we currently have. Does that remain a config?
5. Do we need to expose the general ability to get known positions from the
log? E.g. the functionality in the OffsetRequest...? That would make the
ability to change position a little easier.
6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
unit)? Is it Long because it allows null? If so should we just add a poll()
that polls indefinitely?
7. I recommend we remove the boolean parameter from commit as it is really
hard to read code that has boolean parameters without named arguments. Can
we make it something like commit(...) and commitAsync(...)?
8. What about the common case where you just want to commit the current
position for all partitions?
9. How do you unsubscribe?
10. You say in a few places that positions() only impacts the starting
position, but surely that isn't the case, right? Surely it controls the
fetch position for that partition and can be called at any time? Otherwise
it is a pretty weird api, right?
11. How do I get my current position? Not the committed position but the
offset of the next message that will be given to me?

One thing that I really found helpful for the API design was writing out
actual code for different scenarios against the API. I think it might be
good to do that for this too--i.e. enumerate the various use cases and code
that use case up to see how it looks. I'm not sure if it would be useful to
collect these kinds of scenarios from people. I know they have sporadically
popped up on the mailing list.

-Jay


On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede wrote:

> As mentioned in previous emails, we are also working on a re-implementation
> of the consumer. I would like to use this email thread to discuss the
> details of the public API. I would also like us to be picky about this
> public api now so it is as good as possible and we don't need to break it
> in the future.
>
> The best way to get a feel for the API is actually to take a look at the
> javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >,
> the hope is to get the api docs good enough so that it is self-explanatory.
> You can also take a look at the configs
> here<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> >
>
> Some background info on implementation:
>
> At a high level the primary difference in this consumer is that it removes
> the distinction between the "high-level" and "low-level" consumer. The new
> consumer API is non blocking and instead of returning a blocking iterator,
> the consumer provides a poll() API that returns a list of records. We think
> this is better compared to the blocking iterators since it effectively
> decouples the threading strategy used for processing messages from the
> consumer. It is worth noting that the consumer is entirely single threaded
> and runs in the user thread. The advantage is that it can be easily
> rewritten in less multi-threading-friendly languages. The consumer batches
> data and multiplexes I/O over TCP connections to each of the brokers it
> communicates with, for high throughput. The consumer also allows long poll
> to reduce the end-to-end message latency for low throughput data.
>
> The consumer provides a group management facility that supports the concept
> of a group with multiple consumer instances (just like the current
> consumer). This is done through a custom heartbeat and group management
> protocol transparent to the user. At the same time, it allows users the
> option to subscribe to a fixed set of partitions and not use group
> management at all. The offset management strategy defaults to Kafka based
> offset management and the API provides a way for the user to use a
> customized offset store to manage the consumer's offsets.
>
> A key difference in this consumer also is the fact that it does not depend
> on zookeeper at all.
>
> More details about the new consumer design are
> here<
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >
>
> Please take a look at the new
> API<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >and
> give us any thoughts y

Re: New Consumer API discussion

2014-02-10 Thread Mattijs Ugen

Hey Neha,

This looks really promising, I particularly like the ability to commit 
offsets for topic/partition tuples over just commit(). Some remarks:


- Constructors link to 
http://kafka.apache.org/documentation.html#consumerconfigs for valid 
configurations, which lists zookeeper.connect rather than 
metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
- Docs for poll(long) mention consumer.commit(true), which I can't find 
in the Consumer docs. For a simple consumer setup, that call is 
something that would make a lot of sense.

- Love the addition of MockConsumer, awesome for unittesting :)

Digging these open discussions on API changes on the mailing list btw, 
keep up the good work :)


Kind regards,

Mattijs


New Consumer API discussion

2014-02-10 Thread Neha Narkhede
As mentioned in previous emails, we are also working on a re-implementation
of the consumer. I would like to use this email thread to discuss the
details of the public API. I would also like us to be picky about this
public api now so it is as good as possible and we don't need to break it
in the future.

The best way to get a feel for the API is actually to take a look at the
javadoc,
the hope is to get the api docs good enough so that it is self-explanatory.
You can also take a look at the configs
here

Some background info on implementation:

At a high level the primary difference in this consumer is that it removes
the distinction between the "high-level" and "low-level" consumer. The new
consumer API is non blocking and instead of returning a blocking iterator,
the consumer provides a poll() API that returns a list of records. We think
this is better compared to the blocking iterators since it effectively
decouples the threading strategy used for processing messages from the
consumer. It is worth noting that the consumer is entirely single threaded
and runs in the user thread. The advantage is that it can be easily
rewritten in less multi-threading-friendly languages. The consumer batches
data and multiplexes I/O over TCP connections to each of the brokers it
communicates with, for high throughput. The consumer also allows long poll
to reduce the end-to-end message latency for low throughput data.

The consumer provides a group management facility that supports the concept
of a group with multiple consumer instances (just like the current
consumer). This is done through a custom heartbeat and group management
protocol transparent to the user. At the same time, it allows users the
option to subscribe to a fixed set of partitions and not use group
management at all. The offset management strategy defaults to Kafka based
offset management and the API provides a way for the user to use a
customized offset store to manage the consumer's offsets.

A key difference in this consumer also is the fact that it does not depend
on zookeeper at all.

More details about the new consumer design are
here

Please take a look at the new
APIand
give us any thoughts you may have.

Thanks,
Neha


Re: Building a producer/consumer supporting exactly-once messaging

2014-02-10 Thread Jay Kreps
Ack, nice, should have thought of doing that...

-Jay


On Mon, Feb 10, 2014 at 10:12 AM, Neha Narkhede wrote:

> Added this to our FAQ -
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactlyonemessagingfromKafka
> ?
>
>
>
> On Mon, Feb 10, 2014 at 9:46 AM, Jay Kreps  wrote:
>
> > The out-of-the-box support for this in Kafka isn't great right now.
> >
> > Exactly once semantics has two parts: avoiding duplication during data
> > production and avoiding duplicates during data consumption.
> >
> > There are two approaches to getting exactly once semantics during data
> > production.
> >
> > 1. Use a single-writer per partition and every time you get a network
> error
> > check the last message in that partition to see if your last write
> > succeeded
> > 2. Include a primary key (UUID or something) in the message and
> deduplicate
> > on the consumer.
> >
> > If you do one of these things the log that Kafka hosts will be duplicate
> > free. However reading without duplicates depends on some co-operation
> from
> > the consumer too. If the consumer is periodically checkpointing its
> > position then if it fails and restarts it will restart from the
> > checkpointed position. Thus if the data output and the checkpoint are not
> > written atomically it will be possible to get duplicates here as well.
> This
> > problem is particular to your storage system. For example if you are
> using
> > a database you could commit these together in a transaction. The HDFS
> > loader Camus that LinkedIn wrote does something like this for Hadoop
> loads.
> > The other alternative that doesn't require a transaction is to store the
> > offset with the data loaded and deduplicate using the
> > topic/partition/offset combination.
> >
> > I think there are two improvements that would make this a lot easier:
> > 1. I think producer idempotence is something that could be done
> > automatically and much more cheaply by optionally integrating support for
> > this on the server.
> > 2. The existing high-level consumer doesn't expose a lot of the more fine
> > grained control of offsets (e.g. to reset your position). We will be
> > working on that soon.
> >
> > -Jay
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
> > g.turking...@improvedigital.com> wrote:
> >
> > > Hi,
> > >
> > > I've been doing some prototyping on Kafka for a few months now and like
> > > what I see. It's a good fit for some of my use cases in the areas of
> data
> > > distribution but also for processing - liking a lot of what I see in
> > Samza.
> > > I'm now working through some of the operational issues and have a
> > question
> > > to the community.
> > >
> > > I have several data sources that I want to push into Kafka but some of
> > the
> > > most important are arriving as a stream of files being dropped either
> > into
> > > a SFTP location or S3. Conceptually the data is really a stream but its
> > > being chunked and made more batch by the deployment model of the
> > > operational servers. So pulling the data into Kafka and seeing it more
> > as a
> > > stream again is a big plus.
> > >
> > > But, I really don't want duplicate messages. I know Kafka provides at
> > > least once semantics and that's fine, I'm happy to have the de-dupe
> logic
> > > external to Kafka. And if I look at my producer I can build up a
> protocol
> > > around adding record metadata and using Zookeeper to give me pretty
> high
> > > confidence that my clients will know if they are reading from a file
> that
> > > was fully published into Kafka or not.
> > >
> > > I had assumed that this wouldn't be a unique use case but on doing a
> > bunch
> > > of searches I really don't find much in terms of either tools that help
> > or
> > > even just best practice patterns for handling this type of need to
> > support
> > > exactly-once message processing.
> > >
> > > So now I'm thinking that either I just need better web search skills or
> > > that actually this isn't something many others are doing and if so then
> > > there's likely a reason for that.
> > >
> > > Any thoughts?
> > >
> > > Thanks
> > > Garry
> > >
> > >
> >
>


Re: Config for new clients (and server)

2014-02-10 Thread Jay Kreps
Hey Jun,

I think that is reasonable but would object to having it be debug logging?
I think logging out a bunch of noise during normal operation in a client
library is pretty ugly. Also, is there value in exposing the final configs
programmatically?

-Jay



On Sun, Feb 9, 2014 at 9:23 PM, Jun Rao  wrote:

> +1 on the new config. Just one comment. Currently, when initiating a config
> (e.g. ProducerConfig), we log those overridden property values and unused
> property keys (likely due to mis-spelling). This has been very useful for
> config verification. It would be good to add similar support in the new
> config.
>
> Thanks,
>
> Jun
>
>
> On Tue, Feb 4, 2014 at 9:34 AM, Jay Kreps  wrote:
>
> > We touched on this a bit in previous discussions, but I wanted to draw
> out
> > the approach to config specifically as an item of discussion.
> >
> > The new producer and consumer use a similar key-value config approach as
> > the existing scala clients but have different implementation code to help
> > define these configs. The plan is to use the same approach on the server,
> > once the new clients are complete; so if we agree on this approach it
> will
> > be the new default across the board.
> >
> > Let me split this into two parts. First I will try to motivate the use of
> > key-value pairs as a configuration api. Then let me discuss the mechanics
> > of specifying and parsing these. If we agree on the public api then the
> > public api then the implementation details are interesting as this will
> be
> > shared across producer, consumer, and broker and potentially some tools;
> > but if we disagree about the api then there is no point in discussing the
> > implementation.
> >
> > Let me explain the rationale for this. In a sense a key-value map of
> > configs is the worst possible API to the programmer using the clients.
> Let
> > me contrast the pros and cons versus a POJO and motivate why I think it
> is
> > still superior overall.
> >
> > Pro: An application can externalize the configuration of its kafka
> clients
> > into its own configuration. Whatever config management system the client
> > application is using will likely support key-value pairs, so the client
> > should be able to directly pull whatever configurations are present and
> use
> > them in its client. This means that any configuration the client supports
> > can be added to any application at runtime. With the pojo approach the
> > client application has to expose each pojo getter as some config
> parameter.
> > The result of many applications doing this is that the config is
> different
> > for each and it is very hard to have a standard client config shared
> > across. Moving config into config files allows the usual tooling (version
> > control, review, audit, config deployments separate from code pushes,
> > etc.).
> >
> > Pro: Backwards and forwards compatibility. Provided we stick to our java
> > api many internals can evolve and expose new configs. The application can
> > support both the new and old client by just specifying a config that will
> > be unused in the older version (and of course the reverse--we can remove
> > obsolete configs).
> >
> > Pro: We can use a similar mechanism for both the client and the server.
> > Since most people run the server as a stand-alone process it needs a
> config
> > file.
> >
> > Pro: Systems like Samza that need to ship configs across the network can
> > easily do so as configs have a natural serialized form. This can be done
> > with pojos using java serialization but it is ugly and has bizare failure
> > cases.
> >
> > Con: The IDE gives nice auto-completion for pojos.
> >
> > Con: There are some advantages to javadoc as a documentation mechanism
> for
> > java people.
> >
> > Basically to me this is about operability versus niceness of api and I
> > think operability is more important.
> >
> > Let me now give some details of the config support classes in
> > kafka.common.config and how they are intended to be used.
> >
> > The goal of this code is the following:
> > 1. Make specifying configs, their expected type (string, numbers, lists,
> > etc) simple and declarative
> > 2. Allow for validating simple checks (numeric range checks, etc)
> > 3. Make the config "self-documenting". I.e. we should be able to write
> code
> > that generates the configuration documentation off the config def.
> > 4. Specify default values.
> > 5. Track which configs actually get used.
> > 6. Make it easy to get config values.
> >
> > There are two classes there: ConfigDef and AbstractConfig. ConfigDef
> > defines the specification of the accepted configurations and
> AbstractConfig
> > is a helper class for implementing the configuration class. The
> difference
> > is kind of like the difference between a "class" and an "object":
> ConfigDef
> > is for specifying the configurations that are accepted, AbstractConfig is
> > the base class for an instance of these configs.
> >
> > You can see this in action here:

Re: Building a producer/consumer supporting exactly-once messaging

2014-02-10 Thread Neha Narkhede
Added this to our FAQ -
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactlyonemessagingfromKafka
?



On Mon, Feb 10, 2014 at 9:46 AM, Jay Kreps  wrote:

> The out-of-the-box support for this in Kafka isn't great right now.
>
> Exactly once semantics has two parts: avoiding duplication during data
> production and avoiding duplicates during data consumption.
>
> There are two approaches to getting exactly once semantics during data
> production.
>
> 1. Use a single-writer per partition and every time you get a network error
> check the last message in that partition to see if your last write
> succeeded
> 2. Include a primary key (UUID or something) in the message and deduplicate
> on the consumer.
>
> If you do one of these things the log that Kafka hosts will be duplicate
> free. However reading without duplicates depends on some co-operation from
> the consumer too. If the consumer is periodically checkpointing its
> position then if it fails and restarts it will restart from the
> checkpointed position. Thus if the data output and the checkpoint are not
> written atomically it will be possible to get duplicates here as well. This
> problem is particular to your storage system. For example if you are using
> a database you could commit these together in a transaction. The HDFS
> loader Camus that LinkedIn wrote does something like this for Hadoop loads.
> The other alternative that doesn't require a transaction is to store the
> offset with the data loaded and deduplicate using the
> topic/partition/offset combination.
>
> I think there are two improvements that would make this a lot easier:
> 1. I think producer idempotence is something that could be done
> automatically and much more cheaply by optionally integrating support for
> this on the server.
> 2. The existing high-level consumer doesn't expose a lot of the more fine
> grained control of offsets (e.g. to reset your position). We will be
> working on that soon.
>
> -Jay
>
>
>
>
>
>
>
> On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
> g.turking...@improvedigital.com> wrote:
>
> > Hi,
> >
> > I've been doing some prototyping on Kafka for a few months now and like
> > what I see. It's a good fit for some of my use cases in the areas of data
> > distribution but also for processing - liking a lot of what I see in
> Samza.
> > I'm now working through some of the operational issues and have a
> question
> > to the community.
> >
> > I have several data sources that I want to push into Kafka but some of
> the
> > most important are arriving as a stream of files being dropped either
> into
> > a SFTP location or S3. Conceptually the data is really a stream but its
> > being chunked and made more batch by the deployment model of the
> > operational servers. So pulling the data into Kafka and seeing it more
> as a
> > stream again is a big plus.
> >
> > But, I really don't want duplicate messages. I know Kafka provides at
> > least once semantics and that's fine, I'm happy to have the de-dupe logic
> > external to Kafka. And if I look at my producer I can build up a protocol
> > around adding record metadata and using Zookeeper to give me pretty high
> > confidence that my clients will know if they are reading from a file that
> > was fully published into Kafka or not.
> >
> > I had assumed that this wouldn't be a unique use case but on doing a
> bunch
> > of searches I really don't find much in terms of either tools that help
> or
> > even just best practice patterns for handling this type of need to
> support
> > exactly-once message processing.
> >
> > So now I'm thinking that either I just need better web search skills or
> > that actually this isn't something many others are doing and if so then
> > there's likely a reason for that.
> >
> > Any thoughts?
> >
> > Thanks
> > Garry
> >
> >
>


Re: Building a producer/consumer supporting exactly-once messaging

2014-02-10 Thread Jay Kreps
The out-of-the-box support for this in Kafka isn't great right now.

Exactly once semantics has two parts: avoiding duplication during data
production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data
production.

1. Use a single-writer per partition and every time you get a network error
check the last message in that partition to see if your last write succeeded
2. Include a primary key (UUID or something) in the message and deduplicate
on the consumer.

If you do one of these things the log that Kafka hosts will be duplicate
free. However reading without duplicates depends on some co-operation from
the consumer too. If the consumer is periodically checkpointing its
position then if it fails and restarts it will restart from the
checkpointed position. Thus if the data output and the checkpoint are not
written atomically it will be possible to get duplicates here as well. This
problem is particular to your storage system. For example if you are using
a database you could commit these together in a transaction. The HDFS
loader Camus that LinkedIn wrote does something like this for Hadoop loads.
The other alternative that doesn't require a transaction is to store the
offset with the data loaded and deduplicate using the
topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:
1. I think producer idempotence is something that could be done
automatically and much more cheaply by optionally integrating support for
this on the server.
2. The existing high-level consumer doesn't expose a lot of the more fine
grained control of offsets (e.g. to reset your position). We will be
working on that soon.

-Jay







On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
g.turking...@improvedigital.com> wrote:

> Hi,
>
> I've been doing some prototyping on Kafka for a few months now and like
> what I see. It's a good fit for some of my use cases in the areas of data
> distribution but also for processing - liking a lot of what I see in Samza.
> I'm now working through some of the operational issues and have a question
> to the community.
>
> I have several data sources that I want to push into Kafka but some of the
> most important are arriving as a stream of files being dropped either into
> a SFTP location or S3. Conceptually the data is really a stream but its
> being chunked and made more batch by the deployment model of the
> operational servers. So pulling the data into Kafka and seeing it more as a
> stream again is a big plus.
>
> But, I really don't want duplicate messages. I know Kafka provides at
> least once semantics and that's fine, I'm happy to have the de-dupe logic
> external to Kafka. And if I look at my producer I can build up a protocol
> around adding record metadata and using Zookeeper to give me pretty high
> confidence that my clients will know if they are reading from a file that
> was fully published into Kafka or not.
>
> I had assumed that this wouldn't be a unique use case but on doing a bunch
> of searches I really don't find much in terms of either tools that help or
> even just best practice patterns for handling this type of need to support
> exactly-once message processing.
>
> So now I'm thinking that either I just need better web search skills or
> that actually this isn't something many others are doing and if so then
> there's likely a reason for that.
>
> Any thoughts?
>
> Thanks
> Garry
>
>


Re: Building a producer/consumer supporting exactly-once messaging

2014-02-10 Thread Pradeep Gollakota
Have you read this part of the documentation?
http://kafka.apache.org/documentation.html#semantics

Just wondering if that solves your use case.


On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington <
g.turking...@improvedigital.com> wrote:

> Hi,
>
> I've been doing some prototyping on Kafka for a few months now and like
> what I see. It's a good fit for some of my use cases in the areas of data
> distribution but also for processing - liking a lot of what I see in Samza.
> I'm now working through some of the operational issues and have a question
> to the community.
>
> I have several data sources that I want to push into Kafka but some of the
> most important are arriving as a stream of files being dropped either into
> a SFTP location or S3. Conceptually the data is really a stream but its
> being chunked and made more batch by the deployment model of the
> operational servers. So pulling the data into Kafka and seeing it more as a
> stream again is a big plus.
>
> But, I really don't want duplicate messages. I know Kafka provides at
> least once semantics and that's fine, I'm happy to have the de-dupe logic
> external to Kafka. And if I look at my producer I can build up a protocol
> around adding record metadata and using Zookeeper to give me pretty high
> confidence that my clients will know if they are reading from a file that
> was fully published into Kafka or not.
>
> I had assumed that this wouldn't be a unique use case but on doing a bunch
> of searches I really don't find much in terms of either tools that help or
> even just best practice patterns for handling this type of need to support
> exactly-once message processing.
>
> So now I'm thinking that either I just need better web search skills or
> that actually this isn't something many others are doing and if so then
> there's likely a reason for that.
>
> Any thoughts?
>
> Thanks
> Garry
>
>


Re: Dropping messages ?

2014-02-10 Thread Guozhang Wang
Are these messages sent to a newly created topic?

Guozhang


On Mon, Feb 10, 2014 at 7:38 AM, Jun Rao  wrote:

> If you reset the consumer offset and try to consume those messages again,
> do you see the same drop?
>
> Thanks,
>
> Jun
>
>
> On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:
>
> > Hi
> >
> > We have been using Kafka(0.8) for the past few months with the following
> > setup
> > Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
> >
> > Yesterday, while running Stress tests in one of the QA machines , we
> > observed that a few messages which were produced within a couple of
> > milliseconds of each other did not reach the Kafka consumer. ie There was
> > no trace of that message at the consumer end.
> >
> > We decided to check whether we had any errors at our side or there was a
> > network issue. We did not find any issue. We then decided to check
> whether
> > we can find that message in one of the Kafka partitions. The message was
> > found in one of the topic partitions.
> >
> > We are not sure why Kafka did not notify any consumers about the message.
> > Are there any special cases where Kafka silently drops a message ?
> >
> > We also found a delay in the notifications/watches triggered from
> > zookeeper. We are not sure whether these are related ? It will be
> difficult
> > to reproduce as the test probably took a few days to complete. But surely
> > we did lose approximately 5% of the messages. We have logs of messages
> > being produced at the producer side and corresponding entries in Kafka
> > partitions logs. But nothing at the consumer side. The only repeating
> > pattern was that the messages were probably produced within the same
> > millisecond. So if you have a sequence of messages which was produced in
> > the same millisecond like M0, M1, M2, M3 ie 4 messages. We probably have
> > M0,M1,M3 but not M2. This is puzzling as to how only message is dropped
> out
> > of the given 4.
> >
> >
> > We use the High Level Kafka Producer and Consumer. Both are single
> > threaded(at our end).
> >
> > Does kafka need its own dedicated zookeeper ensemble ? We also use the
> > same zookeeper ensemble as  our configuration service.
> >
> > Unfortunately, we did not have DEBUG messages at the server enabled
> during
> > the setup. Although, NO error messages were observed during the same time
> > period.
> >
> >
> > Before we try running the same Tests again, can someone please shed more
> > light as to the reasons why kafka dropped a few messages ?
> >
> > Kat
> >
>



-- 
-- Guozhang


Building a producer/consumer supporting exactly-once messaging

2014-02-10 Thread Garry Turkington
Hi,

I've been doing some prototyping on Kafka for a few months now and like what I 
see. It's a good fit for some of my use cases in the areas of data distribution 
but also for processing - liking a lot of what I see in Samza. I'm now working 
through some of the operational issues and have a question to the community.

I have several data sources that I want to push into Kafka but some of the most 
important are arriving as a stream of files being dropped either into a SFTP 
location or S3. Conceptually the data is really a stream but its being chunked 
and made more batch by the deployment model of the operational servers. So 
pulling the data into Kafka and seeing it more as a stream again is a big plus.

But, I really don't want duplicate messages. I know Kafka provides at least 
once semantics and that's fine, I'm happy to have the de-dupe logic external to 
Kafka. And if I look at my producer I can build up a protocol around adding 
record metadata and using Zookeeper to give me pretty high confidence that my 
clients will know if they are reading from a file that was fully published into 
Kafka or not.

I had assumed that this wouldn't be a unique use case but on doing a bunch of 
searches I really don't find much in terms of either tools that help or even 
just best practice patterns for handling this type of need to support 
exactly-once message processing.

So now I'm thinking that either I just need better web search skills or that 
actually this isn't something many others are doing and if so then there's 
likely a reason for that.

Any thoughts?

Thanks
Garry



Re: Mirrormaker clients not balanced

2014-02-10 Thread Jun Rao
One of the be brokers (3) is not registered in ZK. Do you see ZK session
expiration (potentially due to GC) in that broker?

Thanks,

Jun


On Mon, Feb 10, 2014 at 7:28 AM, Tomas Nunez  wrote:

> Hi
>
> I'm new around here and I'm dealing with a problem, and reading the
> documentation I don't know where else to look.
>
> I have a cross-dc mirrormaker setup: Mirrormaker is consuming from 5
> frontend servers in each DC (10 in total) and 5 backend servers are
> consuming from mirrormaker. That's working for most of the topics, but some
> of them are not being consumed from all backend servers.
>
> as an example, browsing with zkcli I find this:
>
> kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
> zookeeperbe01:2181 ls /brokers/topics/topic1
> Connecting to zookeeperbe01:2181
>
> WATCHER::
>
> WatchedEvent state:SyncConnected type:None path:null
> [2, 1, 5, 4]
> kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
> zookeeperfe01:2181 ls /brokers/topics/topic1
> Connecting to zookeeperfe01:2181
>
> WATCHER::
>
> WatchedEvent state:SyncConnected type:None path:null
> [3, 2, 1, 5, 4]
>
> What can be wrong? Where can I look to get more info to troubleshoot this?
> Any hint?
>
> Kafka version is 0.7.2, in mirromaker config I have "--num.streams 5
> --num.producers 5".  Zookeeper version is 3.3.5,
>
> Here you can see kafka-console-consumer.sh connecting to both fe and be
> servers:
> https://gist.github.com/pythiannunez/1623934cb538678f053e
>
> Thanks!
>
> --
>
>
> --
>
>
>
>


Mirrormaker clients not balanced

2014-02-10 Thread Tomas Nunez
Hi

I'm new around here and I'm dealing with a problem, and reading the
documentation I don't know where else to look.

I have a cross-dc mirrormaker setup: Mirrormaker is consuming from 5
frontend servers in each DC (10 in total) and 5 backend servers are
consuming from mirrormaker. That's working for most of the topics, but some
of them are not being consumed from all backend servers.

as an example, browsing with zkcli I find this:

kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
zookeeperbe01:2181 ls /brokers/topics/topic1
Connecting to zookeeperbe01:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[2, 1, 5, 4]
kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh -server
zookeeperfe01:2181 ls /brokers/topics/topic1
Connecting to zookeeperfe01:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[3, 2, 1, 5, 4]

What can be wrong? Where can I look to get more info to troubleshoot this?
Any hint?

Kafka version is 0.7.2, in mirromaker config I have "--num.streams 5
--num.producers 5".  Zookeeper version is 3.3.5,

Here you can see kafka-console-consumer.sh connecting to both fe and be
servers:
https://gist.github.com/pythiannunez/1623934cb538678f053e

Thanks!

-- 


--





Re: Querry regarding setting up Kafka server on offline linux machine.

2014-02-10 Thread Jun Rao
This seems like a maven issue. Not sure the reason though. You could also
just try the binary release for 0.8.0.

Thanks,

Jun


On Mon, Feb 10, 2014 at 1:55 AM, Saurabh Gupta A <
saurabh.a.gu...@ericsson.com> wrote:

> Hello,
> I am new to Kafka, facing below problem while setting up Kafka.
> I downloaded "kafka-0.8.0-src.gz".
> Then I followed below steps:
>
> > tar xzf kafka-.tgz
> > cd kafka-
> > ./sbt update
>
> Below is the error I am facing:
> Please note that I am using linux machine which is offline and I have to
> install kafka on a offline machine only
>
> bl460cx2425:/kafka/kafka-0.8.0-src# uname -a
> Linux bl460cx2425 2.6.32-220.el6.x86_64 #1 SMP Wed Nov 9 08:03:13 EST 2011
> x86_64 x86_64 x86_64 GNU/Linux
>
>
> bl460cx2425:/kafka/kafka-0.8.0-src# ./sbt update
> Getting net.java.dev.jna jna 3.2.3 ...
> You probably access the destination server through a proxy server that is
> not well configured.
> You probably access the destination server through a proxy server that is
> not well configured.
>
> :: problems summary ::
>  WARNINGS
> Host repo.typesafe.com not found. url=
> http://repo.typesafe.com/typesafe/ivy-releases/net.java.dev.jna/jna/3.2.3/ivys/ivy.xml
>
> Host oss.sonatype.org not found. url=
> https://oss.sonatype.org/content/repositories/snapshots/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom
>
> module not found: net.java.dev.jna#jna;3.2.3
>
>  local: tried
>
>   /root/.ivy2/local/net.java.dev.jna/jna/3.2.3/ivys/ivy.xml
>
>  typesafe-ivy-releases: tried
>
>
> http://repo.typesafe.com/typesafe/ivy-releases/net.java.dev.jna/jna/3.2.3/ivys/ivy.xml
>
>  Maven Central: tried
>
>
> http://repo1.maven.org/maven2/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom
>
>  sonatype-snapshots: tried
>
>
> https://oss.sonatype.org/content/repositories/snapshots/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom
>
> ::
>
> ::  UNRESOLVED DEPENDENCIES ::
>
> ::
>
> :: net.java.dev.jna#jna;3.2.3: not found
>
> ::
>
>
>  ERRORS
> Server access Error: Connection timed out url=
> http://repo1.maven.org/maven2/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom
>
>
> :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
> unresolved dependency: net.java.dev.jna#jna;3.2.3: not found
> Error during sbt execution: Error retrieving required libraries
>   (see /root/.sbt/boot/update.log for complete log)
> Error: Could not retrieve jna 3.2.3
>
>


Re: kafka consumer not consuming messages

2014-02-10 Thread Jun Rao
Does
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
apply?

Thanks,

Jun


On Sun, Feb 9, 2014 at 10:27 PM, Arjun  wrote:

> Hi,
>
> I started using kafka some time back. I was experimenting with 0.8. My
> problem is the kafka is unable to consume the messages. My configuration
> is kafka broker on the local host and zookeeper on the local host. I
> have only one broker and one consumer at present.
>
> What have I done:
>  1) I used the java examples in the kafka src and pushed some 600
> messages to the broker
>  2) I used the console consumer to check weather the messages are
> there in the broker or not. Console consumer printed all 600 messages
>  3) Now i used the java Consumer code, and tried to get those
> messages. This is not printing any messages. It just got stuck
>
> When was it working earlier:
>  -When i tried with three brokers and three consumers in the same
> machine, with the same configuration it worked fine.
>  -I changed the properties accordingly when i tried to make it work
> with one broker and one consumer
>
> What does log say:
>  - attaching the logs even
>
> If some one points me where I am doing wrong it would be helpful.
>
> Thanks
> Arjun Narasimha Kota
>
>


Re: Dropping messages ?

2014-02-10 Thread Jun Rao
If you reset the consumer offset and try to consume those messages again,
do you see the same drop?

Thanks,

Jun


On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:

> Hi
>
> We have been using Kafka(0.8) for the past few months with the following
> setup
> Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
>
> Yesterday, while running Stress tests in one of the QA machines , we
> observed that a few messages which were produced within a couple of
> milliseconds of each other did not reach the Kafka consumer. ie There was
> no trace of that message at the consumer end.
>
> We decided to check whether we had any errors at our side or there was a
> network issue. We did not find any issue. We then decided to check whether
> we can find that message in one of the Kafka partitions. The message was
> found in one of the topic partitions.
>
> We are not sure why Kafka did not notify any consumers about the message.
> Are there any special cases where Kafka silently drops a message ?
>
> We also found a delay in the notifications/watches triggered from
> zookeeper. We are not sure whether these are related ? It will be difficult
> to reproduce as the test probably took a few days to complete. But surely
> we did lose approximately 5% of the messages. We have logs of messages
> being produced at the producer side and corresponding entries in Kafka
> partitions logs. But nothing at the consumer side. The only repeating
> pattern was that the messages were probably produced within the same
> millisecond. So if you have a sequence of messages which was produced in
> the same millisecond like M0, M1, M2, M3 ie 4 messages. We probably have
> M0,M1,M3 but not M2. This is puzzling as to how only message is dropped out
> of the given 4.
>
>
> We use the High Level Kafka Producer and Consumer. Both are single
> threaded(at our end).
>
> Does kafka need its own dedicated zookeeper ensemble ? We also use the
> same zookeeper ensemble as  our configuration service.
>
> Unfortunately, we did not have DEBUG messages at the server enabled during
> the setup. Although, NO error messages were observed during the same time
> period.
>
>
> Before we try running the same Tests again, can someone please shed more
> light as to the reasons why kafka dropped a few messages ?
>
> Kat
>


Re: Pattern for using kafka producer API

2014-02-10 Thread Jun Rao
If you are only worried about throughput, you can use one producer in async
mode. You can tune the batch size and time for better performance.

Thanks,

Jun


On Sun, Feb 9, 2014 at 11:42 PM, pushkar priyadarshi <
priyadarshi.push...@gmail.com> wrote:

> What is the most appropriate design for using kafka producer from
> performance view point.I had few in my mind.
>
> 1.Since single kafka producer object have synchronization; using single
> producer object from multiple thread might not be efficient.so one way
> would be to use multiple kafka producer from inside same thread.
>
> 2.Have multiple thread each having it's own instance of producer.This has
> thread overheads if kafka internally using the same semantics.
>
> It would be great if someone can comment on these approaches or suggest
> widely used one.
> P.S. im using 0.8.0 and mostly concerned with async producer.
>
> Thanks And Regards,
> Pushkar
>


Querry regarding setting up Kafka server on offline linux machine.

2014-02-10 Thread Saurabh Gupta A
Hello,
I am new to Kafka, facing below problem while setting up Kafka.
I downloaded "kafka-0.8.0-src.gz".
Then I followed below steps:

> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update

Below is the error I am facing:
Please note that I am using linux machine which is offline and I have to 
install kafka on a offline machine only

bl460cx2425:/kafka/kafka-0.8.0-src# uname -a
Linux bl460cx2425 2.6.32-220.el6.x86_64 #1 SMP Wed Nov 9 08:03:13 EST 2011 
x86_64 x86_64 x86_64 GNU/Linux


bl460cx2425:/kafka/kafka-0.8.0-src# ./sbt update
Getting net.java.dev.jna jna 3.2.3 ...
You probably access the destination server through a proxy server that is not 
well configured.
You probably access the destination server through a proxy server that is not 
well configured.

:: problems summary ::
 WARNINGS
Host repo.typesafe.com not found. 
url=http://repo.typesafe.com/typesafe/ivy-releases/net.java.dev.jna/jna/3.2.3/ivys/ivy.xml

Host oss.sonatype.org not found. 
url=https://oss.sonatype.org/content/repositories/snapshots/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom

module not found: net.java.dev.jna#jna;3.2.3

 local: tried

  /root/.ivy2/local/net.java.dev.jna/jna/3.2.3/ivys/ivy.xml

 typesafe-ivy-releases: tried

  
http://repo.typesafe.com/typesafe/ivy-releases/net.java.dev.jna/jna/3.2.3/ivys/ivy.xml

 Maven Central: tried

  http://repo1.maven.org/maven2/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom

 sonatype-snapshots: tried

  
https://oss.sonatype.org/content/repositories/snapshots/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom

::

::  UNRESOLVED DEPENDENCIES ::

::

:: net.java.dev.jna#jna;3.2.3: not found

::


 ERRORS
Server access Error: Connection timed out 
url=http://repo1.maven.org/maven2/net/java/dev/jna/jna/3.2.3/jna-3.2.3.pom


:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
unresolved dependency: net.java.dev.jna#jna;3.2.3: not found
Error during sbt execution: Error retrieving required libraries
  (see /root/.sbt/boot/update.log for complete log)
Error: Could not retrieve jna 3.2.3



Re: kafka consumer not consuming messages

2014-02-10 Thread Arjun
On extension to the same problem i am seeing this "INFO Closing socket 
connection to /127.0.0.1. (kafka.network.Processor)" in my log 
continuously. I searched the web and found this code in an exception 
block " 
https://apache.googlesource.com/kafka/+/40a80fa7b7ae3d49e32c40fbaad1a4e402b2ac71/core/src/main/scala/kafka/network/SocketServer.scala";. 
Does this have anything to do with the problem?


Why will this come up, i tried to look at the code but i am lost in it. 
Can some one please point me in a direction where i can find the answer.


Thanks
Arjun Narasimha Kota

On Monday 10 February 2014 11:57 AM, Arjun wrote:

Hi,

I started using kafka some time back. I was experimenting with 0.8. My
problem is the kafka is unable to consume the messages. My configuration
is kafka broker on the local host and zookeeper on the local host. I
have only one broker and one consumer at present.

What have I done:
 1) I used the java examples in the kafka src and pushed some 600
messages to the broker
 2) I used the console consumer to check weather the messages are
there in the broker or not. Console consumer printed all 600 messages
 3) Now i used the java Consumer code, and tried to get those
messages. This is not printing any messages. It just got stuck

When was it working earlier:
 -When i tried with three brokers and three consumers in the same
machine, with the same configuration it worked fine.
 -I changed the properties accordingly when i tried to make it work
with one broker and one consumer

What does log say:
 - attaching the logs even

If some one points me where I am doing wrong it would be helpful.

Thanks
Arjun Narasimha Kota





Dropping messages ?

2014-02-10 Thread A A
Hi

We have been using Kafka(0.8) for the past few months with the following setup
Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3

Yesterday, while running Stress tests in one of the QA machines , we observed 
that a few messages which were produced within a couple of milliseconds of each 
other did not reach the Kafka consumer. ie There was no trace of that message 
at the consumer end. 

We decided to check whether we had any errors at our side or there was a 
network issue. We did not find any issue. We then decided to check whether we 
can find that message in one of the Kafka partitions. The message was found in 
one of the topic partitions. 

We are not sure why Kafka did not notify any consumers about the message. Are 
there any special cases where Kafka silently drops a message ? 

We also found a delay in the notifications/watches triggered from zookeeper. We 
are not sure whether these are related ? It will be difficult to reproduce as 
the test probably took a few days to complete. But surely we did lose 
approximately 5% of the messages. We have logs of messages being produced at 
the producer side and corresponding entries in Kafka partitions logs. But 
nothing at the consumer side. The only repeating pattern was that the messages 
were probably produced within the same millisecond. So if you have a sequence 
of messages which was produced in the same millisecond like M0, M1, M2, M3 ie 4 
messages. We probably have M0,M1,M3 but not M2. This is puzzling as to how only 
message is dropped out of the given 4. 


We use the High Level Kafka Producer and Consumer. Both are single threaded(at 
our end).

Does kafka need its own dedicated zookeeper ensemble ? We also use the same 
zookeeper ensemble as  our configuration service. 

Unfortunately, we did not have DEBUG messages at the server enabled during the 
setup. Although, NO error messages were observed during the same time period. 


Before we try running the same Tests again, can someone please shed more light 
as to the reasons why kafka dropped a few messages ?

Kat