Publishing/marking multiple event messages as single transaction?

2014-02-11 Thread Vjeran Marcinko
Hi,

 

And again, maybe this is more archtectural question, but I would like to
hear how people tackle this problem.

 

Anyway, as far as I can tell, a producer can publish multiple events as part
of one transaction on its side. Examples are:

-   Multiple entities are created as part of one action and we want to
report each entity's creation as one event message - CreateAccountCommand
being processed transactionally can result in creation of Account entitiy as
well as some Bonus entitiy that you get for free when you open your first
account (whatever that means); which means a producer publishes
AccountCreatedEvent and BonusCreatedEvent as part of that transaction

-   Sometimes an action being processed can publish not just event that
designates the action, but also the result of that processing - eg. In some
game engine, a player can do MovePlayerUpCommand (action), which publishes
PlayerMovedUp and only sometimes GameWonEvent if the move was crucial to
winning the game.

 

Thus if some consumer want to consume these events for sake of replicating
producer state, it should also be able to consume these events in batches
that correspond to original transaction, right? Otherwise, the consumer
would reconstruct state in some point in time that has never occurred in
producing side, which we surely don't want, right?

 

Anyone experienced this problem? 

 

Or LinkedIn folks don't use Kafka for state replication, but Databus (which
maybe takes care of transaction thing - dunno, didn't researched it)?

 

Regards,

Vjeran

 



Re: kafka consumer not consuming messages

2014-02-11 Thread Arjun

The topic name is correct, the o/p of the ConsumerOffserChecker is

arjunn@arjunn-lt:~/Downloads/Kafka0.8/new/kafka_2.8.0-0.8.0$ 
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 
--zkconnect 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic 
taf.referral.emails.service
Group   Topic  Pid Offset 
logSize Lag Owner
group1  taf.referral.emails.service0   2 4   2 
group1_arjunn-lt-1392133080519-e24b249b-0
group1  taf.referral.emails.service1   2 4   2 
group1_arjunn-lt-1392133080519-e24b249b-0


thanks
Arjun Narasimha Kota



On Wednesday 12 February 2014 10:21 AM, Jun Rao wrote:

Could you double check that you used the correct topic name? If so, could
you run ConsumerOffsetChecker as described in
https://cwiki.apache.org/confluence/display/KAFKA/FAQ and see if there is
any lag?

Thanks,

Jun


On Tue, Feb 11, 2014 at 8:45 AM, Arjun Kota  wrote:


fetch.wait.max.ms=1
fetch.min.bytes=128

My message size is much more than that.
On Feb 11, 2014 9:21 PM, "Jun Rao"  wrote:


What's the fetch.wait.max.ms and fetch.min.bytes you used?

Thanks,

Jun


On Tue, Feb 11, 2014 at 12:54 AM, Arjun  wrote:


With the same group id from the console consumer its working fine.


On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:


Arjun,

Are you using the same group name for the console consumer and the

java

consumer?

Guozhang


On Mon, Feb 10, 2014 at 11:38 PM, Arjun 

wrote:

  Hi Jun,

No its not that problem. I am not getting what the problem is can you
please help.

thanks
Arjun Narasimha Kota


On Monday 10 February 2014 09:10 PM, Jun Rao wrote:

  Does

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
Whydoesmyconsumernevergetanydata?
apply?

Thanks,

Jun


On Sun, Feb 9, 2014 at 10:27 PM, Arjun 

wrote:

   Hi,


I started using kafka some time back. I was experimenting with 0.8.

My

problem is the kafka is unable to consume the messages. My
configuration
is kafka broker on the local host and zookeeper on the local host.

I

have only one broker and one consumer at present.

What have I done:
1) I used the java examples in the kafka src and pushed some

600

messages to the broker
2) I used the console consumer to check weather the messages

are

there in the broker or not. Console consumer printed all 600

messages

3) Now i used the java Consumer code, and tried to get those
messages. This is not printing any messages. It just got stuck

When was it working earlier:
-When i tried with three brokers and three consumers in the

same

machine, with the same configuration it worked fine.
-I changed the properties accordingly when i tried to make

it

work
with one broker and one consumer

What does log say:
- attaching the logs even

If some one points me where I am doing wrong it would be helpful.

Thanks
Arjun Narasimha Kota








Re: Dropping messages ?

2014-02-11 Thread Jun Rao
I haven't seen an issue like this before: a consumer doesn't see some
messages existing in a broker. So, some more debugging is needed. If you
start a new consumer consuming from the beginning, do you see those missing
messages?

Thanks,

Jun


On Tue, Feb 11, 2014 at 9:43 PM, Kat Walker  wrote:

> > Are you committing offsets manually?
>
> We are not committing offsets manually. As indicated in my first email, we
> are using the high level consumer API.
>
> >How do you realize that some messages
> >are lost? Do you log every message returned to Kafka consumer client?
>
> Yes we log every message received and sent. We can track every message. We
> have a unique identifier for every message.
>
> >Is it
> > possible that a message is returned to the consumer, but is lost in the
> > application logic?
>
> No. That is not possible. As soon as we receive a message, we print our
> unique identifier. And most of the contents of the message which is about
> to be processed. Recall that we have only 1 broker and a zookeeper ensemble
> consisting of 3 zookeeper servers. We did not observe any issue during the
> testing time period.
>
> It will be nice if you answer my previous questions about Kafka's message
> guarantees. As of now, it seems you are asking questions which I have
> already answered in my previous emails.  Please refer the entire email
> thread.
>
>
> > > Has someone ever lost a Kafka message before ? Are there any
> guarantees ?
> > > We are okay with all kinds of delays provided, the messages arrive in
> order
> > > and are delivered regardless.
>
>
> Thanks
> Kat
>
> > Date: Tue, 11 Feb 2014 08:02:16 -0800
> > Subject: Re: Dropping messages ?
> > From: jun...@gmail.com
> > To: users@kafka.apache.org
> >
> > Are you committing offsets manually? How do you realize that some
> messages
> > are lost? Do you log every message returned to Kafka consumer client? Is
> it
> > possible that a message is returned to the consumer, but is lost in the
> > application logic?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Feb 10, 2014 at 10:23 PM, Kat Walker 
> wrote:
> >
> > > Hi Jun/Guozhang
> > >
> > > We might have to retry our QA tests in its entirety. We simply cannot
> > > reset consumer offset as there is a lot of processing involved after
> > > consuming those messages. This might take almost a week. The Kafka
> message
> > > also contains `volatile` data which is fetched from a database and
> > > destroyed after consuming  that Kafka message.
> > >
> > > What is puzzling is that we have been running Kafka in Production for
> over
> > > 4 months now and we have never faced this issue. Our peak volume is <=
> 1000
> > > messages / second. On an average, it is less than 100.
> > >
> > > Our zookeeper version is 3.3.6. What I suspect is that we managed to
> `roll
> > > over` a few messages due to inconsistencies in zookeeper offsets.
> Recall
> > > that we only had only 1 Kafka broker. Once again, this is just a `wild`
> > > speculation.
> > >
> > > The topics were created before we started our tests.
> > >
> > > Has someone ever lost a Kafka message before ? Are there any
> guarantees ?
> > > We are okay with all kinds of delays provided, the messages arrive in
> order
> > > and are delivered regardless.
> > >
> > > Thanks
> > > Kat
> > >
> > > > Date: Mon, 10 Feb 2014 07:38:01 -0800
> > > > Subject: Re: Dropping messages ?
> > > > From: jun...@gmail.com
> > > > To: users@kafka.apache.org
> > > >
> > > > If you reset the consumer offset and try to consume those messages
> again,
> > > > do you see the same drop?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > We have been using Kafka(0.8) for the past few months with the
> > > following
> > > > > setup
> > > > > Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
> > > > >
> > > > > Yesterday, while running Stress tests in one of the QA machines ,
> we
> > > > > observed that a few messages which were produced within a couple of
> > > > > milliseconds of each other did not reach the Kafka consumer. ie
> There
> > > was
> > > > > no trace of that message at the consumer end.
> > > > >
> > > > > We decided to check whether we had any errors at our side or there
> was
> > > a
> > > > > network issue. We did not find any issue. We then decided to check
> > > whether
> > > > > we can find that message in one of the Kafka partitions. The
> message
> > > was
> > > > > found in one of the topic partitions.
> > > > >
> > > > > We are not sure why Kafka did not notify any consumers about the
> > > message.
> > > > > Are there any special cases where Kafka silently drops a message ?
> > > > >
> > > > > We also found a delay in the notifications/watches triggered from
> > > > > zookeeper. We are not sure whether these are related ? It will be
> > > difficult
> > > > > to reproduce as the test probably took a few days to complete. But
> > > surely
> > >

RE: Dropping messages ?

2014-02-11 Thread Kat Walker
> Are you committing offsets manually? 

We are not committing offsets manually. As indicated in my first email, we are 
using the high level consumer API. 

>How do you realize that some messages
>are lost? Do you log every message returned to Kafka consumer client?

Yes we log every message received and sent. We can track every message. We have 
a unique identifier for every message. 

>Is it
> possible that a message is returned to the consumer, but is lost in the
> application logic?

No. That is not possible. As soon as we receive a message, we print our unique 
identifier. And most of the contents of the message which is about to be 
processed. Recall that we have only 1 broker and a zookeeper ensemble 
consisting of 3 zookeeper servers. We did not observe any issue during the 
testing time period. 

It will be nice if you answer my previous questions about Kafka's message 
guarantees. As of now, it seems you are asking questions which I have already 
answered in my previous emails.  Please refer the entire email thread. 


> > Has someone ever lost a Kafka message before ? Are there any guarantees ?
> > We are okay with all kinds of delays provided, the messages arrive in order
> > and are delivered regardless.


Thanks
Kat

> Date: Tue, 11 Feb 2014 08:02:16 -0800
> Subject: Re: Dropping messages ?
> From: jun...@gmail.com
> To: users@kafka.apache.org
> 
> Are you committing offsets manually? How do you realize that some messages
> are lost? Do you log every message returned to Kafka consumer client? Is it
> possible that a message is returned to the consumer, but is lost in the
> application logic?
> 
> Thanks,
> 
> Jun
> 
> 
> On Mon, Feb 10, 2014 at 10:23 PM, Kat Walker  wrote:
> 
> > Hi Jun/Guozhang
> >
> > We might have to retry our QA tests in its entirety. We simply cannot
> > reset consumer offset as there is a lot of processing involved after
> > consuming those messages. This might take almost a week. The Kafka message
> > also contains `volatile` data which is fetched from a database and
> > destroyed after consuming  that Kafka message.
> >
> > What is puzzling is that we have been running Kafka in Production for over
> > 4 months now and we have never faced this issue. Our peak volume is <= 1000
> > messages / second. On an average, it is less than 100.
> >
> > Our zookeeper version is 3.3.6. What I suspect is that we managed to `roll
> > over` a few messages due to inconsistencies in zookeeper offsets. Recall
> > that we only had only 1 Kafka broker. Once again, this is just a `wild`
> > speculation.
> >
> > The topics were created before we started our tests.
> >
> > Has someone ever lost a Kafka message before ? Are there any guarantees ?
> > We are okay with all kinds of delays provided, the messages arrive in order
> > and are delivered regardless.
> >
> > Thanks
> > Kat
> >
> > > Date: Mon, 10 Feb 2014 07:38:01 -0800
> > > Subject: Re: Dropping messages ?
> > > From: jun...@gmail.com
> > > To: users@kafka.apache.org
> > >
> > > If you reset the consumer offset and try to consume those messages again,
> > > do you see the same drop?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:
> > >
> > > > Hi
> > > >
> > > > We have been using Kafka(0.8) for the past few months with the
> > following
> > > > setup
> > > > Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
> > > >
> > > > Yesterday, while running Stress tests in one of the QA machines , we
> > > > observed that a few messages which were produced within a couple of
> > > > milliseconds of each other did not reach the Kafka consumer. ie There
> > was
> > > > no trace of that message at the consumer end.
> > > >
> > > > We decided to check whether we had any errors at our side or there was
> > a
> > > > network issue. We did not find any issue. We then decided to check
> > whether
> > > > we can find that message in one of the Kafka partitions. The message
> > was
> > > > found in one of the topic partitions.
> > > >
> > > > We are not sure why Kafka did not notify any consumers about the
> > message.
> > > > Are there any special cases where Kafka silently drops a message ?
> > > >
> > > > We also found a delay in the notifications/watches triggered from
> > > > zookeeper. We are not sure whether these are related ? It will be
> > difficult
> > > > to reproduce as the test probably took a few days to complete. But
> > surely
> > > > we did lose approximately 5% of the messages. We have logs of messages
> > > > being produced at the producer side and corresponding entries in Kafka
> > > > partitions logs. But nothing at the consumer side. The only repeating
> > > > pattern was that the messages were probably produced within the same
> > > > millisecond. So if you have a sequence of messages which was produced
> > in
> > > > the same millisecond like M0, M1, M2, M3 ie 4 messages. We probably
> > have
> > > > M0,M1,M3 but not M2. This is puzzling as to how only message i

Re: kafka consumer not consuming messages

2014-02-11 Thread Jun Rao
Could you double check that you used the correct topic name? If so, could
you run ConsumerOffsetChecker as described in
https://cwiki.apache.org/confluence/display/KAFKA/FAQ and see if there is
any lag?

Thanks,

Jun


On Tue, Feb 11, 2014 at 8:45 AM, Arjun Kota  wrote:

> fetch.wait.max.ms=1
> fetch.min.bytes=128
>
> My message size is much more than that.
> On Feb 11, 2014 9:21 PM, "Jun Rao"  wrote:
>
> > What's the fetch.wait.max.ms and fetch.min.bytes you used?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Feb 11, 2014 at 12:54 AM, Arjun  wrote:
> >
> > > With the same group id from the console consumer its working fine.
> > >
> > >
> > > On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:
> > >
> > >> Arjun,
> > >>
> > >> Are you using the same group name for the console consumer and the
> java
> > >> consumer?
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Feb 10, 2014 at 11:38 PM, Arjun 
> wrote:
> > >>
> > >>  Hi Jun,
> > >>>
> > >>> No its not that problem. I am not getting what the problem is can you
> > >>> please help.
> > >>>
> > >>> thanks
> > >>> Arjun Narasimha Kota
> > >>>
> > >>>
> > >>> On Monday 10 February 2014 09:10 PM, Jun Rao wrote:
> > >>>
> > >>>  Does
> >  https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
> >  Whydoesmyconsumernevergetanydata?
> >  apply?
> > 
> >  Thanks,
> > 
> >  Jun
> > 
> > 
> >  On Sun, Feb 9, 2014 at 10:27 PM, Arjun 
> wrote:
> > 
> >    Hi,
> > 
> > > I started using kafka some time back. I was experimenting with 0.8.
> > My
> > > problem is the kafka is unable to consume the messages. My
> > > configuration
> > > is kafka broker on the local host and zookeeper on the local host.
> I
> > > have only one broker and one consumer at present.
> > >
> > > What have I done:
> > >1) I used the java examples in the kafka src and pushed some
> > 600
> > > messages to the broker
> > >2) I used the console consumer to check weather the messages
> > are
> > > there in the broker or not. Console consumer printed all 600
> messages
> > >3) Now i used the java Consumer code, and tried to get those
> > > messages. This is not printing any messages. It just got stuck
> > >
> > > When was it working earlier:
> > >-When i tried with three brokers and three consumers in the
> > same
> > > machine, with the same configuration it worked fine.
> > >-I changed the properties accordingly when i tried to make
> it
> > > work
> > > with one broker and one consumer
> > >
> > > What does log say:
> > >- attaching the logs even
> > >
> > > If some one points me where I am doing wrong it would be helpful.
> > >
> > > Thanks
> > > Arjun Narasimha Kota
> > >
> > >
> > >
> > >
> > >>
> > >
> >
>


Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-11 Thread Philip O'Toole
Works for me. It's the least disruptive option. 

> On Feb 11, 2014, at 3:11 PM, Suyog Rao  wrote:
> 
> Actually, looking at the code, the consumer client code can also catch this 
> exception while iterating for messages. The fetcher thread inserts a special 
> message before dying, which triggers an exception while client calls next 
> message
> 
> https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/FetcherRunnable.scala#L87
> 
> So, we could create a new ConsumerConnector object when this error happens, 
> which will re-initialize the fetcher threads. 
> 
> Thanks,
> Suyog
> 
>> On Feb 11, 2014, at 12:16 PM, vinh  wrote:
>> 
>> In that case, is there a way to detect that a consumer instance is no longer 
>> usable, so that we can recreate the instance on the fly again to have it 
>> reconnect?  Without having to restart our app?
>> 
>> Thanks,
>> -Vinh
>> 
>>> On Feb 11, 2014, at 7:45 AM, Jun Rao  wrote:
>>> 
>>> We do catch the exception. However, we don't know what to do with it.
>>> Retrying may not fix the problem. So, we just log it and let the thread die.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
 On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole  wrote:
 
 Yes, there might be - we experience link resets every so often, and
 definitely did today.
 
 Assume it is this, are you surprised the thread went down? Perhaps we need
 to catch this?
 
 Philip
 
> On Feb 10, 2014, at 8:38 PM, Jun Rao  wrote:
> 
> This indicates that message checksum validation failed. Is there any
 issue
> with the network?
> 
> Thanks,
> 
> Jun
> 
> 
>> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole 
 wrote:
>> 
>> Saw this thrown today, which brought down a Consumer thread -- we're
 using
>> Consumers built on the High-level consumer framework. What may have
>> happened here? We are using a custom C++ Producer which does not do
>> compression, and which hasn't changed in months, but this error is
>> relatively new to us, and is occurring occasionally. We are running the
 Sun
>> JDK:
>> 
>> java version "1.7.0_25"
>> Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
>> Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
>> 
>> Restarting the Consumer clears it up, so the message on the Broker
 itself
>> does not appear to be problematic. We are running 3 Consumers, each of
>> which has 48 ConsumerConnector objects. Our code explicitly calls
 commit(),
>> we do not auto-commit.
>> 
>> Thanks,
>> 
>> Philip
>> 
>> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
>> FetcherRunnable for premapped:2-29: fetched offset = 120758878080:
 consumed
>> offset = 120758878080
>> kafka.message.InvalidMessageException: message is invalid, compression
>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
 offset:
>> 120758878080
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>> at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> at
 kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
>> at
>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
>> at
>> 
>> 
 kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
>> at
>> 
>> 
 kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
>> at
>> 
>> 
 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>> at scala.collection.immutable.List.foreach(List.scala:45)
>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
>> FetcherRunnable
>> kafka.message.InvalidMessageException: message is invalid, compression
>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
 offset:
>> 120758878080
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>> at
>> 
>> 
 kafka.message.ByteBufferMessageSet$$anon$1.makeNext(Byt

Re: New Consumer API discussion

2014-02-11 Thread Jay Kreps
Comments inline:


On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang  wrote:

> Hello Jay,
>
> Thanks for the detailed comments.
>
> 1. Yeah we could discuss a bit more on that.
>
> 2. Since subscribe() is incremental, adding one topic-partition is OK, and
> personally I think it is cleaner than subscribe(String topic,
> int...partition)?
>
I am not too particular. Have you actually tried this? I think writing
actual sample code is important.


> 3. Originally I was thinking about two interfaces:
>
> getOffsets() // offsets for all partitions that I am consuming now
>
> getOffset(topc-partition) // offset of the specified topic-partition, will
> throw exception if it is not currently consumed.
>
> What do you think about these?
>

The naming needs to distinguish committed offset position versus fetch
offset position. Also we aren't using the getX convention.


> 4. Yes, that remains a config.
>

Does that make sense given that you change your position via an api now?


> 5. Agree.
>
> 6. If the time out value is null then it will "logically" return
> immediately with whatever data is available. I think an indefinitely poll()
> function could be replaced with just
>
> while (true) poll(some-time)?
>

That is fine but we should provide a no arg poll for that, poll(null) isn't
clear. We should add the timeunit as per the post java 5 convention as that
makes the call more readable. E.g.
   poll(5) vs poll(5, TimeUnit.MILLISECONDS)


> 7. I am open with either approach.
>

Cool.

8. I was thinking about two interfaces for the commit functionality:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>
> Do those sound better?
>

Well none kind of address the common case which is to commit all
partitions. For these I was thinking just
   commit();
The advantage of this simpler method is that you don't need to bother about
partitions you just consume the messages given to you and then commit them.

9. Currently I think about un-subscribe as "close and re-subscribe", and
> would like to hear people's opinion about it.
>

Hmm, I think it is a little weird if there is a subscribe which can be
called at any time but no unsubscribe. Would this be hard to do.


> 10. Yes. Position() is an API function, and as and API it means "be called
> at any time" and will change the next fetching starting offset.
>

Cool.


> 11. The ConsumerRecord would have the offset info of the message. Is that
> what you want?
>

But that is only after I have gotten a message. I'm not sure if that covers
all cases or not.


> About use cases: great point. I will add some more examples of using the
> API functions in the wiki pages.
>
> Guozhang
>
>
>
>
> On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps  wrote:
>
> > A few items:
> > 1. ConsumerRebalanceCallback
> >a. onPartitionsRevoked would be a better name.
> >b. We should discuss the possibility of splitting this into two
> > interfaces. The motivation would be that in Java 8 single method
> interfaces
> > can directly take methods which might be more intuitive.
> >c. If we stick with a single interface I would prefer the name
> > RebalanceCallback as its more concise
> > 2. Should subscribe(String topic, int partition) should be
> subscribe(String
> > topic, int...partition)?
> > 3. Is lastCommittedOffset call just a local access? If so it would be
> more
> > convenient not to batch it.
> > 4. How are we going to handle the earliest/latest starting position
> > functionality we currently have. Does that remain a config?
> > 5. Do we need to expose the general ability to get known positions from
> the
> > log? E.g. the functionality in the OffsetRequest...? That would make the
> > ability to change position a little easier.
> > 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
> > unit)? Is it Long because it allows null? If so should we just add a
> poll()
> > that polls indefinitely?
> > 7. I recommend we remove the boolean parameter from commit as it is
> really
> > hard to read code that has boolean parameters without named arguments.
> Can
> > we make it something like commit(...) and commitAsync(...)?
> > 8. What about the common case where you just want to commit the current
> > position for all partitions?
> > 9. How do you unsubscribe?
> > 10. You say in a few places that positions() only impacts the starting
> > position, but surely that isn't the case, right? Surely it controls the
> > fetch position for that partition and can be called at any time?
> Otherwise
> > it is a pretty weird api, right?
> > 11. How do I get my current position? Not the committed position but the
> > offset of the next message that will be given to me?
> >
> > One thing that I really found helpful for the API design was writing out
> > actual code for different scenarios against the API. I think it might be
> > good to do that for this too--i.e. enumerate the various use cases and
> code
> > that use case up to see how it looks. I'm not sure

Re: some brokers cannot register themselves with zookeeper

2014-02-11 Thread Guozhang Wang
Hello Libo,

Which Kafka version are you using? Pre-0.8.1 there is a bug that can cause
a registration path to be deleted:

https://issues.apache.org/jira/browse/KAFKA-992

And this has been fixed in 0.8.1

Guozhang


On Tue, Feb 11, 2014 at 1:16 PM, Yu, Libo  wrote:

> Hi team,
>
> This is an issue that has frustrated me for quit some time. One of our
> clusters has
> three hosts. In my startup script, three zookeeper processes are brought
> up first followed
> by three kafka processes. The problem we have is that after three kafka
> processes are up,
> only one broker has been registered in zookeeper (In this case, host
> three). If I manually
> kill the kafka processes on host one and host two and restart them, they
> can register
> themselves with zookeeper successfully. I've attached logs from host one.
> The log indicated
> broker 1 was registered at /brokers/ids. When I checked zookeeper, I found
> only broker 3
> was registered. It seems there is a race condition.
>
>
> [2014-02-11 15:20:55,266] INFO Session establishment complete on server
> cfgtps1q
> -phys/HostOne:9181, sessionid = 0x144229beb98, negotiated timeout = 100
> 00 (org.apache.zookeeper.ClientCnxn)
> [2014-02-11 15:20:55,268] INFO zookeeper state changed (SyncConnected)
> (org.I0It
> ec.zkclient.ZkClient)
> [2014-02-11 15:20:55,378] INFO /brokers/ids/1 exists with value {
> "host":"cfgtps
> 1q-phys.nam.nsroot.net", "jmx_port":, "port":11934, "version":1 }
> during con
> nection loss; this is ok (kafka.utils.ZkUtils$)
> [2014-02-11 15:20:55,379] INFO Registered broker 1 at path /brokers/ids/1
> with a
> ddress hostone.xxx.xx.net:11934. (kafka.utils.ZkUtils$)
> [2014-02-11 15:20:55,380] INFO [Kafka Server 1], Connecting to ZK: HostOne
> :9181, HostTwo:9181, HostThree:9181 (kafka.server.KafkaServer)
> [2014-02-11 15:20:55,511] INFO Will not load MX4J, mx4j-tools.jar is not
> in the
> classpath (kafka.utils.Mx4jLoader$)
> [2014-02-11 15:20:55,520] INFO conflict in /controller data: 1 stored
> data: 3 (k
> afka.utils.ZkUtils$)
> [2014-02-11 15:20:55,538] INFO [Kafka Server 1], Started
> (kafka.server.KafkaServ
> er)
> [2014-02-11 15:20:58,015] INFO 1 successfully elected as leader
> (kafka.server.Zo
> okeeperLeaderElector)
> [2014-02-11 15:20:58,605] INFO Accepted socket connection from
> /HostThree:52420 (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:20:58,609] INFO Client attempting to establish new session
> at /HostThree:52420 (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:20:58,616] INFO Established session 0x144229beb980001 with
> negotiated timeout 1 for client /HostThree:52420
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:21:01,064] INFO New leader is 1
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2014-02-11 15:21:36,375] INFO Accepted socket connection from
> /xx.xx.xxx.xx:54709 (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:21:36,378] INFO Client attempting to establish new session
> at /xx.xx.xxx.xx:54709 (org.apache.zookeeper.server.NIOServerCnxn)
>
> Regards,
>
> Libo
>



-- 
-- Guozhang


Re: New Consumer API discussion

2014-02-11 Thread Guozhang Wang
Hi Imran,

1. I think choosing between a) and b) is really dependent on the consuming
traffic. We decided to make the consumer client single-threaded and let
users to decide using one or multiple clients based on traffic mainly
because with a multi-thread client, the fetcher thread could die silently
while the user thread still works and gets blocked forever.

2. Yes. If the subcription is a list of topics, which means it relies on
Kafka to assign partitions, then the first pool will trigger the group
management protocol and upon receiving the partitions the callback function
will be executed.

3. The wiki page (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design)
have some example usages of the new consumer API (there might be some minor
function signature differences with the javadoc). Would you want to take a
look at give some thoughts about that?

Guozhang


On Tue, Feb 11, 2014 at 1:50 PM, Imran Rashid  wrote:

> Hi,
>
> thanks for sharing this and getting feedback.  Sorry I am probably missing
> something basic, but I'm not sure how a multi-threaded consumer would
> work.  I can imagine its either:
>
> a) I just have one thread poll kafka.  If I want to process msgs in
> multiple threads, than I deal w/ that after polling, eg. stick them into a
> blocking queue or something, and have more threads that read from the
> queue.
>
> b) each thread creates its own KafkaConsumer.  They are all registered the
> same way, and I leave it to kafka to figure out what data to give to each
> one.
>
>
> (a) certainly makes things simple, but I worry about throughput -- is that
> just as good as having one thread trying to consumer each partition?
>
> (b) makes it a bit of a pain to figure out how many threads to use.  I
> assume there is no point in using more threads than there are partitions,
> so first you've got to figure out how many partitions there are in each
> topic.  Might be nice if there were some util functions to simplify this.
>
>
> Also, since the initial call to subscribe doesn't give the partition
> assignment, does that mean the first call to poll() will always call the
> ConsumerRebalanceCallback?
>
> probably a short code-sample would clear up all my questions.  I'm
> imagining pseudo-code like:
>
>
> int numPartitions = ...
> int numThreads = min(maxThreads, numPartitions);
> //maybe should be something even more complicated, to take into account how
> many other active consumers there are right now for the given group
>
> List consumers = new ArrayList();
> for (int i = 0; i < numThreads; i++) {
>   MyConsumer c = new MyConsumer();
>   c.subscribe(...);
>   //if subscribe is expensive, then this should already happen in another
> thread
>   consumers.add(c);
> }
>
> // if each subscribe() happened in a different thread, we should put a
> barrier in here, so everybody subscribes before they begin polling
>
> //now launch a thread per consumer, where they each poll
>
>
>
> If I'm on the right track, I'd like to expand this example, showing how
> each "MyConsumer" can keep track of its partitions & offsets, even in the
> face of rebalances.  As Jay said, I think a minimal code example could
> really help us see the utility & faults of the api.
>
> overall I really like what I see, seems like a big improvement!
>
> thanks,
> Imran
>
>
>
> On Mon, Feb 10, 2014 at 12:54 PM, Neha Narkhede  >wrote:
>
> > As mentioned in previous emails, we are also working on a
> re-implementation
> > of the consumer. I would like to use this email thread to discuss the
> > details of the public API. I would also like us to be picky about this
> > public api now so it is as good as possible and we don't need to break it
> > in the future.
> >
> > The best way to get a feel for the API is actually to take a look at the
> > javadoc<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > >,
> > the hope is to get the api docs good enough so that it is
> self-explanatory.
> > You can also take a look at the configs
> > here<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > >
> >
> > Some background info on implementation:
> >
> > At a high level the primary difference in this consumer is that it
> removes
> > the distinction between the "high-level" and "low-level" consumer. The
> new
> > consumer API is non blocking and instead of returning a blocking
> iterator,
> > the consumer provides a poll() API that returns a list of records. We
> think
> > this is better compared to the blocking iterators since it effectively
> > decouples the threading strategy used for processing messages from the
> > consumer. It is worth noting that the consumer is entirely single
> threaded
> > and runs in the user thread. The advantage is that it can be easily
> > rewritten in less multi-threading-friendly languages. The consumer
> batches
> > data and multiplexes I/O 

Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-11 Thread Suyog Rao
Actually, looking at the code, the consumer client code can also catch this 
exception while iterating for messages. The fetcher thread inserts a special 
message before dying, which triggers an exception while client calls next 
message

https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/FetcherRunnable.scala#L87

So, we could create a new ConsumerConnector object when this error happens, 
which will re-initialize the fetcher threads. 

Thanks,
Suyog

On Feb 11, 2014, at 12:16 PM, vinh  wrote:

> In that case, is there a way to detect that a consumer instance is no longer 
> usable, so that we can recreate the instance on the fly again to have it 
> reconnect?  Without having to restart our app?
> 
> Thanks,
> -Vinh
> 
> On Feb 11, 2014, at 7:45 AM, Jun Rao  wrote:
> 
>> We do catch the exception. However, we don't know what to do with it.
>> Retrying may not fix the problem. So, we just log it and let the thread die.
>> 
>> Thanks,
>> 
>> Jun
>> 
>> 
>> On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole  wrote:
>> 
>>> Yes, there might be - we experience link resets every so often, and
>>> definitely did today.
>>> 
>>> Assume it is this, are you surprised the thread went down? Perhaps we need
>>> to catch this?
>>> 
>>> Philip
>>> 
 On Feb 10, 2014, at 8:38 PM, Jun Rao  wrote:
 
 This indicates that message checksum validation failed. Is there any
>>> issue
 with the network?
 
 Thanks,
 
 Jun
 
 
> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole 
>>> wrote:
> 
> Saw this thrown today, which brought down a Consumer thread -- we're
>>> using
> Consumers built on the High-level consumer framework. What may have
> happened here? We are using a custom C++ Producer which does not do
> compression, and which hasn't changed in months, but this error is
> relatively new to us, and is occurring occasionally. We are running the
>>> Sun
> JDK:
> 
>  java version "1.7.0_25"
>  Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
> 
> Restarting the Consumer clears it up, so the message on the Broker
>>> itself
> does not appear to be problematic. We are running 3 Consumers, each of
> which has 48 ConsumerConnector objects. Our code explicitly calls
>>> commit(),
> we do not auto-commit.
> 
> Thanks,
> 
> Philip
> 
> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
> FetcherRunnable for premapped:2-29: fetched offset = 120758878080:
>>> consumed
> offset = 120758878080
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
>>> offset:
> 120758878080
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>  at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>  at
>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
>  at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
>  at
> 
> 
>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
>  at
> 
> 
>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
>  at
> 
> 
>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:45)
>  at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
> FetcherRunnable
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
>>> offset:
> 120758878080
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>  at
> 
> 
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>  at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>  at
>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>  at
> 
> 
>>> kafka.message.

Re: New Consumer API discussion

2014-02-11 Thread Imran Rashid
Hi,

thanks for sharing this and getting feedback.  Sorry I am probably missing
something basic, but I'm not sure how a multi-threaded consumer would
work.  I can imagine its either:

a) I just have one thread poll kafka.  If I want to process msgs in
multiple threads, than I deal w/ that after polling, eg. stick them into a
blocking queue or something, and have more threads that read from the queue.

b) each thread creates its own KafkaConsumer.  They are all registered the
same way, and I leave it to kafka to figure out what data to give to each
one.


(a) certainly makes things simple, but I worry about throughput -- is that
just as good as having one thread trying to consumer each partition?

(b) makes it a bit of a pain to figure out how many threads to use.  I
assume there is no point in using more threads than there are partitions,
so first you've got to figure out how many partitions there are in each
topic.  Might be nice if there were some util functions to simplify this.


Also, since the initial call to subscribe doesn't give the partition
assignment, does that mean the first call to poll() will always call the
ConsumerRebalanceCallback?

probably a short code-sample would clear up all my questions.  I'm
imagining pseudo-code like:


int numPartitions = ...
int numThreads = min(maxThreads, numPartitions);
//maybe should be something even more complicated, to take into account how
many other active consumers there are right now for the given group

List consumers = new ArrayList();
for (int i = 0; i < numThreads; i++) {
  MyConsumer c = new MyConsumer();
  c.subscribe(...);
  //if subscribe is expensive, then this should already happen in another
thread
  consumers.add(c);
}

// if each subscribe() happened in a different thread, we should put a
barrier in here, so everybody subscribes before they begin polling

//now launch a thread per consumer, where they each poll



If I'm on the right track, I'd like to expand this example, showing how
each "MyConsumer" can keep track of its partitions & offsets, even in the
face of rebalances.  As Jay said, I think a minimal code example could
really help us see the utility & faults of the api.

overall I really like what I see, seems like a big improvement!

thanks,
Imran



On Mon, Feb 10, 2014 at 12:54 PM, Neha Narkhede wrote:

> As mentioned in previous emails, we are also working on a re-implementation
> of the consumer. I would like to use this email thread to discuss the
> details of the public API. I would also like us to be picky about this
> public api now so it is as good as possible and we don't need to break it
> in the future.
>
> The best way to get a feel for the API is actually to take a look at the
> javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >,
> the hope is to get the api docs good enough so that it is self-explanatory.
> You can also take a look at the configs
> here<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> >
>
> Some background info on implementation:
>
> At a high level the primary difference in this consumer is that it removes
> the distinction between the "high-level" and "low-level" consumer. The new
> consumer API is non blocking and instead of returning a blocking iterator,
> the consumer provides a poll() API that returns a list of records. We think
> this is better compared to the blocking iterators since it effectively
> decouples the threading strategy used for processing messages from the
> consumer. It is worth noting that the consumer is entirely single threaded
> and runs in the user thread. The advantage is that it can be easily
> rewritten in less multi-threading-friendly languages. The consumer batches
> data and multiplexes I/O over TCP connections to each of the brokers it
> communicates with, for high throughput. The consumer also allows long poll
> to reduce the end-to-end message latency for low throughput data.
>
> The consumer provides a group management facility that supports the concept
> of a group with multiple consumer instances (just like the current
> consumer). This is done through a custom heartbeat and group management
> protocol transparent to the user. At the same time, it allows users the
> option to subscribe to a fixed set of partitions and not use group
> management at all. The offset management strategy defaults to Kafka based
> offset management and the API provides a way for the user to use a
> customized offset store to manage the consumer's offsets.
>
> A key difference in this consumer also is the fact that it does not depend
> on zookeeper at all.
>
> More details about the new consumer design are
> here<
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >
>
> Please take a look at the new
> API<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Kafk

some brokers cannot register themselves with zookeeper

2014-02-11 Thread Yu, Libo
Hi team,

This is an issue that has frustrated me for quit some time. One of our clusters 
has
three hosts. In my startup script, three zookeeper processes are brought up 
first followed
by three kafka processes. The problem we have is that after three kafka 
processes are up,
only one broker has been registered in zookeeper (In this case, host three). If 
I manually
kill the kafka processes on host one and host two and restart them, they can 
register
themselves with zookeeper successfully. I've attached logs from host one. The 
log indicated
broker 1 was registered at /brokers/ids. When I checked zookeeper, I found only 
broker 3
was registered. It seems there is a race condition.


[2014-02-11 15:20:55,266] INFO Session establishment complete on server cfgtps1q
-phys/HostOne:9181, sessionid = 0x144229beb98, negotiated timeout = 100
00 (org.apache.zookeeper.ClientCnxn)
[2014-02-11 15:20:55,268] INFO zookeeper state changed (SyncConnected) (org.I0It
ec.zkclient.ZkClient)
[2014-02-11 15:20:55,378] INFO /brokers/ids/1 exists with value { "host":"cfgtps
1q-phys.nam.nsroot.net", "jmx_port":, "port":11934, "version":1 } during con
nection loss; this is ok (kafka.utils.ZkUtils$)
[2014-02-11 15:20:55,379] INFO Registered broker 1 at path /brokers/ids/1 with a
ddress hostone.xxx.xx.net:11934. (kafka.utils.ZkUtils$)
[2014-02-11 15:20:55,380] INFO [Kafka Server 1], Connecting to ZK: HostOne
:9181, HostTwo:9181, HostThree:9181 (kafka.server.KafkaServer)
[2014-02-11 15:20:55,511] INFO Will not load MX4J, mx4j-tools.jar is not in the
classpath (kafka.utils.Mx4jLoader$)
[2014-02-11 15:20:55,520] INFO conflict in /controller data: 1 stored data: 3 (k
afka.utils.ZkUtils$)
[2014-02-11 15:20:55,538] INFO [Kafka Server 1], Started (kafka.server.KafkaServ
er)
[2014-02-11 15:20:58,015] INFO 1 successfully elected as leader (kafka.server.Zo
okeeperLeaderElector)
[2014-02-11 15:20:58,605] INFO Accepted socket connection from /HostThree:52420 
(org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:20:58,609] INFO Client attempting to establish new session at 
/HostThree:52420 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:20:58,616] INFO Established session 0x144229beb980001 with 
negotiated timeout 1 for client /HostThree:52420 
(org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:21:01,064] INFO New leader is 1 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-02-11 15:21:36,375] INFO Accepted socket connection from 
/xx.xx.xxx.xx:54709 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:21:36,378] INFO Client attempting to establish new session at 
/xx.xx.xxx.xx:54709 (org.apache.zookeeper.server.NIOServerCnxn)

Regards,

Libo


Re: New Consumer API discussion

2014-02-11 Thread Pradeep Gollakota
Updated thoughts.

   1.

   subscribe(String topic, int... paritions) and unsubscribe(String topic,
   int... partitions) should be subscribe(TopicPartition...
topicPartitions)and unsubscribe(TopicPartition...
   topicPartitons)
2.

   Does it make sense to provide a convenience method to subscribe to
   topics at a particular offset directly? E.g.
subscribe(TopicPartitionOffset...
   offsets)
3.

   The javadoc makes no mention of what would happen if positions() is
   called with a TopicPartitionOffset to which the Consumer is not
   subscribed to.
4.

   The javadoc makes no mention of what would happen if positions() is
   called with two different offsets for a single TopicPartition
5. The javadoc shows lastCommittedOffsets() return type as
   List. This should either be Map or Map
   6. It seems like #4 can be avoided by using Map or Map as the argument type.
   7. To address #3, maybe we can return List that
   are invalid.



On Tue, Feb 11, 2014 at 12:04 PM, Neha Narkhede wrote:

> Pradeep,
>
> To be clear, we want to get feedback on the APIs from the
> javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >since
> the wiki will be slightly behind on the APIs.
>
> 1. Regarding consistency, do you have specific feedback on which APIs
> should have different arguments/return types?
> 2. lastCommittedOffsets() does what you said in the javadoc.
>
> Thanks,
> Neha
>
>
> On Tue, Feb 11, 2014 at 11:45 AM, Pradeep Gollakota  >wrote:
>
> > Hi Jay,
> >
> > I apologize for derailing the conversation about the consumer API. We
> > should start a new discussion about hierarchical topics, if we want to
> keep
> > talking about it. My final thought on the matter is that, hierarchical
> > topics is still an important feature to have in Kafka, because it gives
> us
> > flexibility to do namespace level access controls.
> >
> > Getting back to the topic of the Consumer API:
> >
> >1. Any thoughts on consistency for method arguments and return types?
> >2. lastCommittedOffsets() method returns a
> > Listwhere as the confluence page suggested a
> > Map >Long>. I would think that a Map is the more appropriate return type.
> >
> >
> >
> > On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps  wrote:
> >
> > > Hey Pradeep,
> > >
> > > That wiki is fairly old and it predated more flexible subscription
> > > mechanisms. In the high-level consumer you currently have wildcard
> > > subscription and in the new proposed interface you can actually
> subscribe
> > > based on any logic you want to create a "union" of streams. Personally
> I
> > > think this gives you everything you would want with a hierarchy and
> more
> > > actual flexibility (since you can define groupings however you want).
> > What
> > > do you think?
> > >
> > > -Jay
> > >
> > >
> > > On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota <
> pradeep...@gmail.com
> > > >wrote:
> > >
> > > > WRT to hierarchical topics, I'm referring to
> > > > KAFKA-1175.
> > > > I would just like to think through the implications for the Consumer
> > API
> > > if
> > > > and when we do implement hierarchical topics. For example, in the
> > > > proposal<
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
> > > > >written
> > > > by Jay, he says that initially wildcard subscriptions are not going
> > > > to be supported. But does that mean that they will be supported in
> v2?
> > If
> > > > that's the case, that would change the semantics of the Consumer API.
> > > >
> > > > As to having classes for Topic, PartitionId, etc. it looks like I was
> > > > referring to the TopicPartition and TopicPartitionOffset classes (I
> > > didn't
> > > > realize these were already there). I was only looking at the
> confluence
> > > > page which shows List[(String, Int, Long)] instead of
> > > > List[TopicParitionOffset] (as is shown in the javadoc). However, I
> did
> > > > notice that we're not being consistent in the Java version. E.g. we
> > have
> > > > commit(TopicPartitionOffset... offsets) and
> > > > lastCommittedOffsets(TopicPartition... partitions) on the one hand.
> On
> > > the
> > > > other hand we have subscribe(String topic, int... partitions). I
> agree
> > > that
> > > > creating a class for TopicId today would probably not make too much
> > sense
> > > > today. But with hierarchical topics, I may change my mind. This is
> > > exactly
> > > > what was done in the HBase API in 0.96 when namespaces were added.
> 0.96
> > > > HBase API introduced a class called 'TableName' to represent the
> > > namespace
> > > > and table name.
> > > >
> > > >
> > > > On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > > >wrote:
> > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > > Mattijs -
> > > > >
> > > > > - Constructors link to
> > > > > http://kafka.apache.org/documentation.html#con

Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-11 Thread vinh
In that case, is there a way to detect that a consumer instance is no longer 
usable, so that we can recreate the instance on the fly again to have it 
reconnect?  Without having to restart our app?

Thanks,
-Vinh

On Feb 11, 2014, at 7:45 AM, Jun Rao  wrote:

> We do catch the exception. However, we don't know what to do with it.
> Retrying may not fix the problem. So, we just log it and let the thread die.
> 
> Thanks,
> 
> Jun
> 
> 
> On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole  wrote:
> 
>> Yes, there might be - we experience link resets every so often, and
>> definitely did today.
>> 
>> Assume it is this, are you surprised the thread went down? Perhaps we need
>> to catch this?
>> 
>> Philip
>> 
>>> On Feb 10, 2014, at 8:38 PM, Jun Rao  wrote:
>>> 
>>> This indicates that message checksum validation failed. Is there any
>> issue
>>> with the network?
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
 On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole 
>> wrote:
 
 Saw this thrown today, which brought down a Consumer thread -- we're
>> using
 Consumers built on the High-level consumer framework. What may have
 happened here? We are using a custom C++ Producer which does not do
 compression, and which hasn't changed in months, but this error is
 relatively new to us, and is occurring occasionally. We are running the
>> Sun
 JDK:
 
   java version "1.7.0_25"
   Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
   Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
 
 Restarting the Consumer clears it up, so the message on the Broker
>> itself
 does not appear to be problematic. We are running 3 Consumers, each of
 which has 48 ConsumerConnector objects. Our code explicitly calls
>> commit(),
 we do not auto-commit.
 
 Thanks,
 
 Philip
 
 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
 FetcherRunnable for premapped:2-29: fetched offset = 120758878080:
>> consumed
 offset = 120758878080
 kafka.message.InvalidMessageException: message is invalid, compression
 codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
>> offset:
 120758878080
   at
 
 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
   at
 
 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
   at
 
 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
   at
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
   at
>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
   at
 
 
>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
   at
 
 
>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
   at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
   at
 
 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
   at
 
 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
   at
 
 
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
   at scala.collection.immutable.List.foreach(List.scala:45)
   at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
 FetcherRunnable
 kafka.message.InvalidMessageException: message is invalid, compression
 codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
>> offset:
 120758878080
   at
 
 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
   at
 
 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
   at
 
 
>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
   at
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
   at
>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
   at
 
 
>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
   at
 
 
>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
   at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
   at
 
 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
   at
 
 
>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
   at
 
 
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
   at scala.collection.immutable.List.foreach(List.scala:4

Re: Use cases for new java client

2014-02-11 Thread Neha Narkhede
Thanks for sharing your use case! In the new consumer APIs, we intend to
support custom offset management as well as custom partition assignment.
We have a thread discussing the new consumer API javadoc to collect
feedback from users. It will greatly help to have your feedback as well -
http://grokbase.com/t/kafka/users/142avhm32j/new-consumer-api-discussion

Thanks,
Neha


On Tue, Feb 11, 2014 at 11:25 AM, Pete Matern  wrote:

> Hi -
> Recently Mr. Kreps was kind enough to answer a bunch of questions from
> some of us here at Jive software. One subject that particularly caught our
> interest was the new java client, which sounds like it will super useful.
>
> We have some client code which sits on top of SimpleConsumer which we use
> for cases where we want to manage our own offsets, but also for cases where
> we don't care about persistent offsets at all, and just want to consume
> from the tip of a given partition at the point in time when the consuming
> process spins up. I've attached the source in case it might be useful to
> see what we are doing. Thank you, and I hope it's helpful.
>
> -pete


Re: New Consumer API discussion

2014-02-11 Thread Neha Narkhede
Pradeep,

To be clear, we want to get feedback on the APIs from the
javadocsince
the wiki will be slightly behind on the APIs.

1. Regarding consistency, do you have specific feedback on which APIs
should have different arguments/return types?
2. lastCommittedOffsets() does what you said in the javadoc.

Thanks,
Neha


On Tue, Feb 11, 2014 at 11:45 AM, Pradeep Gollakota wrote:

> Hi Jay,
>
> I apologize for derailing the conversation about the consumer API. We
> should start a new discussion about hierarchical topics, if we want to keep
> talking about it. My final thought on the matter is that, hierarchical
> topics is still an important feature to have in Kafka, because it gives us
> flexibility to do namespace level access controls.
>
> Getting back to the topic of the Consumer API:
>
>1. Any thoughts on consistency for method arguments and return types?
>2. lastCommittedOffsets() method returns a
> Listwhere as the confluence page suggested a
> MapLong>. I would think that a Map is the more appropriate return type.
>
>
>
> On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps  wrote:
>
> > Hey Pradeep,
> >
> > That wiki is fairly old and it predated more flexible subscription
> > mechanisms. In the high-level consumer you currently have wildcard
> > subscription and in the new proposed interface you can actually subscribe
> > based on any logic you want to create a "union" of streams. Personally I
> > think this gives you everything you would want with a hierarchy and more
> > actual flexibility (since you can define groupings however you want).
> What
> > do you think?
> >
> > -Jay
> >
> >
> > On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota  > >wrote:
> >
> > > WRT to hierarchical topics, I'm referring to
> > > KAFKA-1175.
> > > I would just like to think through the implications for the Consumer
> API
> > if
> > > and when we do implement hierarchical topics. For example, in the
> > > proposal<
> > > https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
> > > >written
> > > by Jay, he says that initially wildcard subscriptions are not going
> > > to be supported. But does that mean that they will be supported in v2?
> If
> > > that's the case, that would change the semantics of the Consumer API.
> > >
> > > As to having classes for Topic, PartitionId, etc. it looks like I was
> > > referring to the TopicPartition and TopicPartitionOffset classes (I
> > didn't
> > > realize these were already there). I was only looking at the confluence
> > > page which shows List[(String, Int, Long)] instead of
> > > List[TopicParitionOffset] (as is shown in the javadoc). However, I did
> > > notice that we're not being consistent in the Java version. E.g. we
> have
> > > commit(TopicPartitionOffset... offsets) and
> > > lastCommittedOffsets(TopicPartition... partitions) on the one hand. On
> > the
> > > other hand we have subscribe(String topic, int... partitions). I agree
> > that
> > > creating a class for TopicId today would probably not make too much
> sense
> > > today. But with hierarchical topics, I may change my mind. This is
> > exactly
> > > what was done in the HBase API in 0.96 when namespaces were added. 0.96
> > > HBase API introduced a class called 'TableName' to represent the
> > namespace
> > > and table name.
> > >
> > >
> > > On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <
> neha.narkh...@gmail.com
> > > >wrote:
> > >
> > > > Thanks for the feedback.
> > > >
> > > > Mattijs -
> > > >
> > > > - Constructors link to
> > > > http://kafka.apache.org/documentation.html#consumerconfigs for valid
> > > > configurations, which lists zookeeper.connect rather than
> > > > metadata.broker.list, the value for BROKER_LIST_CONFIG in
> > ConsumerConfig.
> > > > Fixed it to just point to ConsumerConfig for now until we finalize
> the
> > > new
> > > > configs
> > > > - Docs for poll(long) mention consumer.commit(true), which I can't
> find
> > > in
> > > > the Consumer docs. For a simple consumer setup, that call is
> something
> > > that
> > > > would make a lot of sense.
> > > > Missed changing the examples to use consumer.commit(true, offsets).
> The
> > > > suggestions by Jay would change it to commit(offsets) and
> > > > commitAsync(offsets), which will hopefully make it easier to
> understand
> > > > those commit APIs.
> > > > - Love the addition of MockConsumer, awesome for unittesting :)
> > > > I'm not quite satisfied with what it does as of right now, but we
> will
> > > > surely improve it as we start writing the consumer.
> > > >
> > > > Jay -
> > > >
> > > > 1. ConsumerRebalanceCallback
> > > > a. Makes sense. Renamed to onPartitionsRevoked
> > > > b. Ya, it will be good to make it forward compatible with Java 8
> > > > capabilities. We can change it to PartitionsAssignedCallback and
> > > >  Partitions

Use cases for new java client

2014-02-11 Thread Pete Matern
Hi -
Recently Mr. Kreps was kind enough to answer a bunch of questions from some of 
us here at Jive software. One subject that particularly caught our interest was 
the new java client, which sounds like it will super useful.

We have some client code which sits on top of SimpleConsumer which we use for 
cases where we want to manage our own offsets, but also for cases where we 
don't care about persistent offsets at all, and just want to consume from the 
tip of a given partition at the point in time when the consuming process spins 
up. I've attached the source in case it might be useful to see what we are 
doing. Thank you, and I hope it's helpful.

-pete

consumer_src.tar.gz
Description: consumer_src.tar.gz


Re: New Consumer API discussion

2014-02-11 Thread Pradeep Gollakota
Hi Jay,

I apologize for derailing the conversation about the consumer API. We
should start a new discussion about hierarchical topics, if we want to keep
talking about it. My final thought on the matter is that, hierarchical
topics is still an important feature to have in Kafka, because it gives us
flexibility to do namespace level access controls.

Getting back to the topic of the Consumer API:

   1. Any thoughts on consistency for method arguments and return types?
   2. lastCommittedOffsets() method returns a
Listwhere as the confluence page suggested a
Map. I would think that a Map is the more appropriate return type.



On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps  wrote:

> Hey Pradeep,
>
> That wiki is fairly old and it predated more flexible subscription
> mechanisms. In the high-level consumer you currently have wildcard
> subscription and in the new proposed interface you can actually subscribe
> based on any logic you want to create a "union" of streams. Personally I
> think this gives you everything you would want with a hierarchy and more
> actual flexibility (since you can define groupings however you want). What
> do you think?
>
> -Jay
>
>
> On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota  >wrote:
>
> > WRT to hierarchical topics, I'm referring to
> > KAFKA-1175.
> > I would just like to think through the implications for the Consumer API
> if
> > and when we do implement hierarchical topics. For example, in the
> > proposal<
> > https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
> > >written
> > by Jay, he says that initially wildcard subscriptions are not going
> > to be supported. But does that mean that they will be supported in v2? If
> > that's the case, that would change the semantics of the Consumer API.
> >
> > As to having classes for Topic, PartitionId, etc. it looks like I was
> > referring to the TopicPartition and TopicPartitionOffset classes (I
> didn't
> > realize these were already there). I was only looking at the confluence
> > page which shows List[(String, Int, Long)] instead of
> > List[TopicParitionOffset] (as is shown in the javadoc). However, I did
> > notice that we're not being consistent in the Java version. E.g. we have
> > commit(TopicPartitionOffset... offsets) and
> > lastCommittedOffsets(TopicPartition... partitions) on the one hand. On
> the
> > other hand we have subscribe(String topic, int... partitions). I agree
> that
> > creating a class for TopicId today would probably not make too much sense
> > today. But with hierarchical topics, I may change my mind. This is
> exactly
> > what was done in the HBase API in 0.96 when namespaces were added. 0.96
> > HBase API introduced a class called 'TableName' to represent the
> namespace
> > and table name.
> >
> >
> > On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede  > >wrote:
> >
> > > Thanks for the feedback.
> > >
> > > Mattijs -
> > >
> > > - Constructors link to
> > > http://kafka.apache.org/documentation.html#consumerconfigs for valid
> > > configurations, which lists zookeeper.connect rather than
> > > metadata.broker.list, the value for BROKER_LIST_CONFIG in
> ConsumerConfig.
> > > Fixed it to just point to ConsumerConfig for now until we finalize the
> > new
> > > configs
> > > - Docs for poll(long) mention consumer.commit(true), which I can't find
> > in
> > > the Consumer docs. For a simple consumer setup, that call is something
> > that
> > > would make a lot of sense.
> > > Missed changing the examples to use consumer.commit(true, offsets). The
> > > suggestions by Jay would change it to commit(offsets) and
> > > commitAsync(offsets), which will hopefully make it easier to understand
> > > those commit APIs.
> > > - Love the addition of MockConsumer, awesome for unittesting :)
> > > I'm not quite satisfied with what it does as of right now, but we will
> > > surely improve it as we start writing the consumer.
> > >
> > > Jay -
> > >
> > > 1. ConsumerRebalanceCallback
> > > a. Makes sense. Renamed to onPartitionsRevoked
> > > b. Ya, it will be good to make it forward compatible with Java 8
> > > capabilities. We can change it to PartitionsAssignedCallback and
> > >  PartitionsRevokedCallback or RebalanceBeginCallback and
> > > RebalanceEndCallback?
> > > c. Ya, I thought about that but then didn't name it just
> > > RebalanceCallback since there could be a conflict with a controller
> side
> > > rebalance callback if/when we have one. However, you can argue that at
> > that
> > > time we can name it ControllerRebalanceCallback instead of polluting a
> > user
> > > facing API. So agree with you here.
> > > 2. Ya, that is a good idea. Changed to subscribe(String topic,
> > > int...partitions).
> > > 3. lastCommittedOffset() is not necessarily a local access since the
> > > consumer can potentially ask for the last committed offsets of
> partitions
> > > that the consumer does not consume and maintain the offsets 

Re: Description of jmx exposed metrics?

2014-02-11 Thread Tomas Nunez
Yes, but this counts how many messages went in that topic, right? This
should always be increasing and never decreasing, if I understand correctly.

I'm asking that because in my kafka configuration I have 18 topics, and in
all of them but one this metric is increasing. There is a topic where this
metric is going up and down like crazy, and I'm worried that something may
be wrong with it.

And strangely this happens in 4 of 5 servers. There is a server where this
topic is only increasing, never decreasing...



On Tue, Feb 11, 2014 at 6:49 PM, Neha Narkhede wrote:

> Yes. All jmx beans start from 0 on broker startup.
>
> Thanks,
> Neha
>
>
> On Tue, Feb 11, 2014 at 9:42 AM, Tomas Nunez  wrote:
>
> > Then I guess mi next question is:
> > The jmx metric "Kafka -> kafka.BrokerTopicStat. ->
> MessagesIn"
> > counts how many messages went in that topic since kafka was started, and
> it
> > will reset to 0 when I restart the service, right?
> >
> > Same thing for Kafka -> kafka.BrokerAllTopicStat -> MessagesIn, right?
> >
> > Thanks!
> >
> >
> > On Tue, Feb 11, 2014 at 5:35 PM, Tomas Nunez  wrote:
> >
> > > Yup... I read both of them, but I saw much more data in the jmx and I
> was
> > > trying to understant it to make the most of them :)
> > >
> > > But I'll settle knowing that is the useful data. Thank you both!
> > >
> > >
> > > On Tue, Feb 11, 2014 at 5:05 PM, Jun Rao  wrote:
> > >
> > >> These are the 0.8 jmx. The 0.7 one can be found in
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Tue, Feb 11, 2014 at 6:51 AM, Andrew Otto 
> > wrote:
> > >>
> > >> > Here tis!
> > >> >
> > >> > https://kafka.apache.org/documentation.html#monitoring
> > >> >
> > >> >
> > >> > On Feb 11, 2014, at 6:50 AM, Tomas Nunez  wrote:
> > >> >
> > >> > > Hi!
> > >> > >
> > >> > > Sorry if this question has already been answered, but I've search
> > the
> > >> > > archives, the project page and the wiki unsuccessfully.
> > >> > >
> > >> > > I'd like to know the meaning of the jmx exposed metrics. I can
> guess
> > >> > based
> > >> > > on the name but I'd like to be sure. Plus, some of them seem to be
> > >> reset
> > >> > > when I restart the server (or when a new file is created in the
> > topic
> > >> > > directory, I'm not sure), and some of them don't.
> > >> > >
> > >> > > Is there a doc anywhere with this info?
> > >> > >
> > >> > > PD: I'm using 0.7.2 and I know 0.8 changes some things, but I
> guess
> > >> the
> > >> > > basic stuff will remain the same...
> > >> > >
> > >> > > --
> > >> > >
> > >> > >
> > >> > > --
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >> >
> > >>
> > >
> > >
> >
> > --
> >
> >
> > --
> >
> >
> >
> >
>

-- 


--





Re: Description of jmx exposed metrics?

2014-02-11 Thread Neha Narkhede
Yes. All jmx beans start from 0 on broker startup.

Thanks,
Neha


On Tue, Feb 11, 2014 at 9:42 AM, Tomas Nunez  wrote:

> Then I guess mi next question is:
> The jmx metric "Kafka -> kafka.BrokerTopicStat. -> MessagesIn"
> counts how many messages went in that topic since kafka was started, and it
> will reset to 0 when I restart the service, right?
>
> Same thing for Kafka -> kafka.BrokerAllTopicStat -> MessagesIn, right?
>
> Thanks!
>
>
> On Tue, Feb 11, 2014 at 5:35 PM, Tomas Nunez  wrote:
>
> > Yup... I read both of them, but I saw much more data in the jmx and I was
> > trying to understant it to make the most of them :)
> >
> > But I'll settle knowing that is the useful data. Thank you both!
> >
> >
> > On Tue, Feb 11, 2014 at 5:05 PM, Jun Rao  wrote:
> >
> >> These are the 0.8 jmx. The 0.7 one can be found in
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Tue, Feb 11, 2014 at 6:51 AM, Andrew Otto 
> wrote:
> >>
> >> > Here tis!
> >> >
> >> > https://kafka.apache.org/documentation.html#monitoring
> >> >
> >> >
> >> > On Feb 11, 2014, at 6:50 AM, Tomas Nunez  wrote:
> >> >
> >> > > Hi!
> >> > >
> >> > > Sorry if this question has already been answered, but I've search
> the
> >> > > archives, the project page and the wiki unsuccessfully.
> >> > >
> >> > > I'd like to know the meaning of the jmx exposed metrics. I can guess
> >> > based
> >> > > on the name but I'd like to be sure. Plus, some of them seem to be
> >> reset
> >> > > when I restart the server (or when a new file is created in the
> topic
> >> > > directory, I'm not sure), and some of them don't.
> >> > >
> >> > > Is there a doc anywhere with this info?
> >> > >
> >> > > PD: I'm using 0.7.2 and I know 0.8 changes some things, but I guess
> >> the
> >> > > basic stuff will remain the same...
> >> > >
> >> > > --
> >> > >
> >> > >
> >> > > --
> >> > >
> >> > >
> >> > >
> >> >
> >> >
> >>
> >
> >
>
> --
>
>
> --
>
>
>
>


Re: Description of jmx exposed metrics?

2014-02-11 Thread Tomas Nunez
Then I guess mi next question is:
The jmx metric "Kafka -> kafka.BrokerTopicStat. -> MessagesIn"
counts how many messages went in that topic since kafka was started, and it
will reset to 0 when I restart the service, right?

Same thing for Kafka -> kafka.BrokerAllTopicStat -> MessagesIn, right?

Thanks!


On Tue, Feb 11, 2014 at 5:35 PM, Tomas Nunez  wrote:

> Yup... I read both of them, but I saw much more data in the jmx and I was
> trying to understant it to make the most of them :)
>
> But I'll settle knowing that is the useful data. Thank you both!
>
>
> On Tue, Feb 11, 2014 at 5:05 PM, Jun Rao  wrote:
>
>> These are the 0.8 jmx. The 0.7 one can be found in
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Tue, Feb 11, 2014 at 6:51 AM, Andrew Otto  wrote:
>>
>> > Here tis!
>> >
>> > https://kafka.apache.org/documentation.html#monitoring
>> >
>> >
>> > On Feb 11, 2014, at 6:50 AM, Tomas Nunez  wrote:
>> >
>> > > Hi!
>> > >
>> > > Sorry if this question has already been answered, but I've search the
>> > > archives, the project page and the wiki unsuccessfully.
>> > >
>> > > I'd like to know the meaning of the jmx exposed metrics. I can guess
>> > based
>> > > on the name but I'd like to be sure. Plus, some of them seem to be
>> reset
>> > > when I restart the server (or when a new file is created in the topic
>> > > directory, I'm not sure), and some of them don't.
>> > >
>> > > Is there a doc anywhere with this info?
>> > >
>> > > PD: I'm using 0.7.2 and I know 0.8 changes some things, but I guess
>> the
>> > > basic stuff will remain the same...
>> > >
>> > > --
>> > >
>> > >
>> > > --
>> > >
>> > >
>> > >
>> >
>> >
>>
>
>

-- 


--





Re: kafka consumer not consuming messages

2014-02-11 Thread Arjun Kota
fetch.wait.max.ms=1
fetch.min.bytes=128

My message size is much more than that.
On Feb 11, 2014 9:21 PM, "Jun Rao"  wrote:

> What's the fetch.wait.max.ms and fetch.min.bytes you used?
>
> Thanks,
>
> Jun
>
>
> On Tue, Feb 11, 2014 at 12:54 AM, Arjun  wrote:
>
> > With the same group id from the console consumer its working fine.
> >
> >
> > On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:
> >
> >> Arjun,
> >>
> >> Are you using the same group name for the console consumer and the java
> >> consumer?
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Feb 10, 2014 at 11:38 PM, Arjun  wrote:
> >>
> >>  Hi Jun,
> >>>
> >>> No its not that problem. I am not getting what the problem is can you
> >>> please help.
> >>>
> >>> thanks
> >>> Arjun Narasimha Kota
> >>>
> >>>
> >>> On Monday 10 February 2014 09:10 PM, Jun Rao wrote:
> >>>
> >>>  Does
>  https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
>  Whydoesmyconsumernevergetanydata?
>  apply?
> 
>  Thanks,
> 
>  Jun
> 
> 
>  On Sun, Feb 9, 2014 at 10:27 PM, Arjun  wrote:
> 
>    Hi,
> 
> > I started using kafka some time back. I was experimenting with 0.8.
> My
> > problem is the kafka is unable to consume the messages. My
> > configuration
> > is kafka broker on the local host and zookeeper on the local host. I
> > have only one broker and one consumer at present.
> >
> > What have I done:
> >1) I used the java examples in the kafka src and pushed some
> 600
> > messages to the broker
> >2) I used the console consumer to check weather the messages
> are
> > there in the broker or not. Console consumer printed all 600 messages
> >3) Now i used the java Consumer code, and tried to get those
> > messages. This is not printing any messages. It just got stuck
> >
> > When was it working earlier:
> >-When i tried with three brokers and three consumers in the
> same
> > machine, with the same configuration it worked fine.
> >-I changed the properties accordingly when i tried to make it
> > work
> > with one broker and one consumer
> >
> > What does log say:
> >- attaching the logs even
> >
> > If some one points me where I am doing wrong it would be helpful.
> >
> > Thanks
> > Arjun Narasimha Kota
> >
> >
> >
> >
> >>
> >
>


Re: Description of jmx exposed metrics?

2014-02-11 Thread Tomas Nunez
Yup... I read both of them, but I saw much more data in the jmx and I was
trying to understant it to make the most of them :)

But I'll settle knowing that is the useful data. Thank you both!


On Tue, Feb 11, 2014 at 5:05 PM, Jun Rao  wrote:

> These are the 0.8 jmx. The 0.7 one can be found in
>
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
>
> Thanks,
>
> Jun
>
>
> On Tue, Feb 11, 2014 at 6:51 AM, Andrew Otto  wrote:
>
> > Here tis!
> >
> > https://kafka.apache.org/documentation.html#monitoring
> >
> >
> > On Feb 11, 2014, at 6:50 AM, Tomas Nunez  wrote:
> >
> > > Hi!
> > >
> > > Sorry if this question has already been answered, but I've search the
> > > archives, the project page and the wiki unsuccessfully.
> > >
> > > I'd like to know the meaning of the jmx exposed metrics. I can guess
> > based
> > > on the name but I'd like to be sure. Plus, some of them seem to be
> reset
> > > when I restart the server (or when a new file is created in the topic
> > > directory, I'm not sure), and some of them don't.
> > >
> > > Is there a doc anywhere with this info?
> > >
> > > PD: I'm using 0.7.2 and I know 0.8 changes some things, but I guess the
> > > basic stuff will remain the same...
> > >
> > > --
> > >
> > >
> > > --
> > >
> > >
> > >
> >
> >
>

-- 


--





Re: Description of jmx exposed metrics?

2014-02-11 Thread Jun Rao
These are the 0.8 jmx. The 0.7 one can be found in
https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring

Thanks,

Jun


On Tue, Feb 11, 2014 at 6:51 AM, Andrew Otto  wrote:

> Here tis!
>
> https://kafka.apache.org/documentation.html#monitoring
>
>
> On Feb 11, 2014, at 6:50 AM, Tomas Nunez  wrote:
>
> > Hi!
> >
> > Sorry if this question has already been answered, but I've search the
> > archives, the project page and the wiki unsuccessfully.
> >
> > I'd like to know the meaning of the jmx exposed metrics. I can guess
> based
> > on the name but I'd like to be sure. Plus, some of them seem to be reset
> > when I restart the server (or when a new file is created in the topic
> > directory, I'm not sure), and some of them don't.
> >
> > Is there a doc anywhere with this info?
> >
> > PD: I'm using 0.7.2 and I know 0.8 changes some things, but I guess the
> > basic stuff will remain the same...
> >
> > --
> >
> >
> > --
> >
> >
> >
>
>


Re: New Consumer API discussion

2014-02-11 Thread Jay Kreps
Hey Pradeep,

That wiki is fairly old and it predated more flexible subscription
mechanisms. In the high-level consumer you currently have wildcard
subscription and in the new proposed interface you can actually subscribe
based on any logic you want to create a "union" of streams. Personally I
think this gives you everything you would want with a hierarchy and more
actual flexibility (since you can define groupings however you want). What
do you think?

-Jay


On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota wrote:

> WRT to hierarchical topics, I'm referring to
> KAFKA-1175.
> I would just like to think through the implications for the Consumer API if
> and when we do implement hierarchical topics. For example, in the
> proposal<
> https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
> >written
> by Jay, he says that initially wildcard subscriptions are not going
> to be supported. But does that mean that they will be supported in v2? If
> that's the case, that would change the semantics of the Consumer API.
>
> As to having classes for Topic, PartitionId, etc. it looks like I was
> referring to the TopicPartition and TopicPartitionOffset classes (I didn't
> realize these were already there). I was only looking at the confluence
> page which shows List[(String, Int, Long)] instead of
> List[TopicParitionOffset] (as is shown in the javadoc). However, I did
> notice that we're not being consistent in the Java version. E.g. we have
> commit(TopicPartitionOffset... offsets) and
> lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the
> other hand we have subscribe(String topic, int... partitions). I agree that
> creating a class for TopicId today would probably not make too much sense
> today. But with hierarchical topics, I may change my mind. This is exactly
> what was done in the HBase API in 0.96 when namespaces were added. 0.96
> HBase API introduced a class called 'TableName' to represent the namespace
> and table name.
>
>
> On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede  >wrote:
>
> > Thanks for the feedback.
> >
> > Mattijs -
> >
> > - Constructors link to
> > http://kafka.apache.org/documentation.html#consumerconfigs for valid
> > configurations, which lists zookeeper.connect rather than
> > metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
> > Fixed it to just point to ConsumerConfig for now until we finalize the
> new
> > configs
> > - Docs for poll(long) mention consumer.commit(true), which I can't find
> in
> > the Consumer docs. For a simple consumer setup, that call is something
> that
> > would make a lot of sense.
> > Missed changing the examples to use consumer.commit(true, offsets). The
> > suggestions by Jay would change it to commit(offsets) and
> > commitAsync(offsets), which will hopefully make it easier to understand
> > those commit APIs.
> > - Love the addition of MockConsumer, awesome for unittesting :)
> > I'm not quite satisfied with what it does as of right now, but we will
> > surely improve it as we start writing the consumer.
> >
> > Jay -
> >
> > 1. ConsumerRebalanceCallback
> > a. Makes sense. Renamed to onPartitionsRevoked
> > b. Ya, it will be good to make it forward compatible with Java 8
> > capabilities. We can change it to PartitionsAssignedCallback and
> >  PartitionsRevokedCallback or RebalanceBeginCallback and
> > RebalanceEndCallback?
> > c. Ya, I thought about that but then didn't name it just
> > RebalanceCallback since there could be a conflict with a controller side
> > rebalance callback if/when we have one. However, you can argue that at
> that
> > time we can name it ControllerRebalanceCallback instead of polluting a
> user
> > facing API. So agree with you here.
> > 2. Ya, that is a good idea. Changed to subscribe(String topic,
> > int...partitions).
> > 3. lastCommittedOffset() is not necessarily a local access since the
> > consumer can potentially ask for the last committed offsets of partitions
> > that the consumer does not consume and maintain the offsets for. That's
> the
> > reason it is batched right now.
> > 4. Yes, look at
> >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
> > 5. Sure, but that is not part of the consumer API right? I think you're
> > suggesting looking at OffsetRequest to see if it would do that properly?
> > 6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
> > negative timeout will poll indefinitely?
> > 7. Good point. Changed to commit(...) and commitAsync(...)
> > 8. To commit the current position for all partitions owned by the
> consumer,
> > you can use commit(). If you don't use group management, then
> > commit(customListOfPartitions)
> > 9. Forgot to include unsubscribe. Done now.
> > 10. positions() can be called at any time and affects the next fetch on
> the
> > next 

Re: Monitoring producer failures on kafka 0.7 (using zookeeper)

2014-02-11 Thread Jun Rao
These are ZK warnings. It seems to be related to the SASL setting.

Thanks,

Jun


On Tue, Feb 11, 2014 at 2:46 AM, Laurent Thoulon <
laurent.thou...@ldmobile.net> wrote:

>
> Hi,
>
> I'm working with a kafka producer in async mode using zookeeper.
> From time to time, i observe network problems and i see the following
> error in my logs:
>
> [2014-02-11 11:21:57,848][INFO ][org.apache.zookeeper.ClientCnxn] Unable
> to read additional data from server sessionid 0x1441cc4a9c5001f, likely
> server has closed socket, closing socket connection and attempting reconnect
> [2014-02-11 11:21:57,949][INFO ][org.I0Itec.zkclient.ZkClient] zookeeper
> state changed (Disconnected)
> [2014-02-11 11:21:59,143][INFO ][org.apache.zookeeper.ClientCnxn] Opening
> socket connection to server obfuscated . Will not attempt to authenticate
> using SASL (unknown error)
> [2014-02-11 11:21:59,146][WARN ][org.apache.zookeeper.ClientCnxn] Session
> 0x1441cc4a9c5001f for server null, unexpected error, closing socket
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068)
>
> The problem is, my code isn't catching this error and i haven't found
> anyway to do so.
> I thought about using the CallbackHandler but it doesn't have any onError
> method.
>
> Is there anyway to trigger some process when connection fails ?
> What i would like to do is to put my trafic on hold when this happens and
> until the connection is back up so i'm sure each incoming request that
> succeeds is logged into kafka.
>
> FYI, I'm not using the synchronized producer for performance issues.
>
> Thanks for your help on this matter.
>
> Laurent
>


Re: Dropping messages ?

2014-02-11 Thread Jun Rao
Are you committing offsets manually? How do you realize that some messages
are lost? Do you log every message returned to Kafka consumer client? Is it
possible that a message is returned to the consumer, but is lost in the
application logic?

Thanks,

Jun


On Mon, Feb 10, 2014 at 10:23 PM, Kat Walker  wrote:

> Hi Jun/Guozhang
>
> We might have to retry our QA tests in its entirety. We simply cannot
> reset consumer offset as there is a lot of processing involved after
> consuming those messages. This might take almost a week. The Kafka message
> also contains `volatile` data which is fetched from a database and
> destroyed after consuming  that Kafka message.
>
> What is puzzling is that we have been running Kafka in Production for over
> 4 months now and we have never faced this issue. Our peak volume is <= 1000
> messages / second. On an average, it is less than 100.
>
> Our zookeeper version is 3.3.6. What I suspect is that we managed to `roll
> over` a few messages due to inconsistencies in zookeeper offsets. Recall
> that we only had only 1 Kafka broker. Once again, this is just a `wild`
> speculation.
>
> The topics were created before we started our tests.
>
> Has someone ever lost a Kafka message before ? Are there any guarantees ?
> We are okay with all kinds of delays provided, the messages arrive in order
> and are delivered regardless.
>
> Thanks
> Kat
>
> > Date: Mon, 10 Feb 2014 07:38:01 -0800
> > Subject: Re: Dropping messages ?
> > From: jun...@gmail.com
> > To: users@kafka.apache.org
> >
> > If you reset the consumer offset and try to consume those messages again,
> > do you see the same drop?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:
> >
> > > Hi
> > >
> > > We have been using Kafka(0.8) for the past few months with the
> following
> > > setup
> > > Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
> > >
> > > Yesterday, while running Stress tests in one of the QA machines , we
> > > observed that a few messages which were produced within a couple of
> > > milliseconds of each other did not reach the Kafka consumer. ie There
> was
> > > no trace of that message at the consumer end.
> > >
> > > We decided to check whether we had any errors at our side or there was
> a
> > > network issue. We did not find any issue. We then decided to check
> whether
> > > we can find that message in one of the Kafka partitions. The message
> was
> > > found in one of the topic partitions.
> > >
> > > We are not sure why Kafka did not notify any consumers about the
> message.
> > > Are there any special cases where Kafka silently drops a message ?
> > >
> > > We also found a delay in the notifications/watches triggered from
> > > zookeeper. We are not sure whether these are related ? It will be
> difficult
> > > to reproduce as the test probably took a few days to complete. But
> surely
> > > we did lose approximately 5% of the messages. We have logs of messages
> > > being produced at the producer side and corresponding entries in Kafka
> > > partitions logs. But nothing at the consumer side. The only repeating
> > > pattern was that the messages were probably produced within the same
> > > millisecond. So if you have a sequence of messages which was produced
> in
> > > the same millisecond like M0, M1, M2, M3 ie 4 messages. We probably
> have
> > > M0,M1,M3 but not M2. This is puzzling as to how only message is
> dropped out
> > > of the given 4.
> > >
> > >
> > > We use the High Level Kafka Producer and Consumer. Both are single
> > > threaded(at our end).
> > >
> > > Does kafka need its own dedicated zookeeper ensemble ? We also use the
> > > same zookeeper ensemble as  our configuration service.
> > >
> > > Unfortunately, we did not have DEBUG messages at the server enabled
> during
> > > the setup. Although, NO error messages were observed during the same
> time
> > > period.
> > >
> > >
> > > Before we try running the same Tests again, can someone please shed
> more
> > > light as to the reasons why kafka dropped a few messages ?
> > >
> > > Kat
> > >
>
>


Re: Producer connection unsuccessful

2014-02-11 Thread Jun Rao
Can you telnet to that broker ipaddress and port from the producer host?
Seems like a network connection issue.

Thanks,

Jun


On Tue, Feb 11, 2014 at 1:56 AM, Harshal Bhutkar <
harshal_bhut...@persistent.co.in> wrote:

> I am running zookeeper and 2 brokers(port 9093 and 9094) on remote
> machine. I am able create topic using 'kafka-create-topic.sh'
> I am also to get topic list using 'kafka-list-topic.sh'
> But whenever I try to produce msg it gives me following error
> ERROR Producer connection to ipaddres:9093 unsuccessful
> (kafka.producer.SyncProducer)
> java.net.ConnectException: Connection refused
>
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>
>


Re: kafka consumer not consuming messages

2014-02-11 Thread Jun Rao
What's the fetch.wait.max.ms and fetch.min.bytes you used?

Thanks,

Jun


On Tue, Feb 11, 2014 at 12:54 AM, Arjun  wrote:

> With the same group id from the console consumer its working fine.
>
>
> On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:
>
>> Arjun,
>>
>> Are you using the same group name for the console consumer and the java
>> consumer?
>>
>> Guozhang
>>
>>
>> On Mon, Feb 10, 2014 at 11:38 PM, Arjun  wrote:
>>
>>  Hi Jun,
>>>
>>> No its not that problem. I am not getting what the problem is can you
>>> please help.
>>>
>>> thanks
>>> Arjun Narasimha Kota
>>>
>>>
>>> On Monday 10 February 2014 09:10 PM, Jun Rao wrote:
>>>
>>>  Does
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
 Whydoesmyconsumernevergetanydata?
 apply?

 Thanks,

 Jun


 On Sun, Feb 9, 2014 at 10:27 PM, Arjun  wrote:

   Hi,

> I started using kafka some time back. I was experimenting with 0.8. My
> problem is the kafka is unable to consume the messages. My
> configuration
> is kafka broker on the local host and zookeeper on the local host. I
> have only one broker and one consumer at present.
>
> What have I done:
>1) I used the java examples in the kafka src and pushed some 600
> messages to the broker
>2) I used the console consumer to check weather the messages are
> there in the broker or not. Console consumer printed all 600 messages
>3) Now i used the java Consumer code, and tried to get those
> messages. This is not printing any messages. It just got stuck
>
> When was it working earlier:
>-When i tried with three brokers and three consumers in the same
> machine, with the same configuration it worked fine.
>-I changed the properties accordingly when i tried to make it
> work
> with one broker and one consumer
>
> What does log say:
>- attaching the logs even
>
> If some one points me where I am doing wrong it would be helpful.
>
> Thanks
> Arjun Narasimha Kota
>
>
>
>
>>
>


Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec

2014-02-11 Thread Jun Rao
We do catch the exception. However, we don't know what to do with it.
Retrying may not fix the problem. So, we just log it and let the thread die.

Thanks,

Jun


On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole  wrote:

> Yes, there might be - we experience link resets every so often, and
> definitely did today.
>
> Assume it is this, are you surprised the thread went down? Perhaps we need
> to catch this?
>
> Philip
>
> > On Feb 10, 2014, at 8:38 PM, Jun Rao  wrote:
> >
> > This indicates that message checksum validation failed. Is there any
> issue
> > with the network?
> >
> > Thanks,
> >
> > Jun
> >
> >
> >> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole 
> wrote:
> >>
> >> Saw this thrown today, which brought down a Consumer thread -- we're
> using
> >> Consumers built on the High-level consumer framework. What may have
> >> happened here? We are using a custom C++ Producer which does not do
> >> compression, and which hasn't changed in months, but this error is
> >> relatively new to us, and is occurring occasionally. We are running the
> Sun
> >> JDK:
> >>
> >>java version "1.7.0_25"
> >>Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
> >>Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
> >>
> >> Restarting the Consumer clears it up, so the message on the Broker
> itself
> >> does not appear to be problematic. We are running 3 Consumers, each of
> >> which has 48 ConsumerConnector objects. Our code explicitly calls
> commit(),
> >> we do not auto-commit.
> >>
> >> Thanks,
> >>
> >> Philip
> >>
> >> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
> >> FetcherRunnable for premapped:2-29: fetched offset = 120758878080:
> consumed
> >> offset = 120758878080
> >> kafka.message.InvalidMessageException: message is invalid, compression
> >> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
> offset:
> >> 120758878080
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >>at
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> >>at
> >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> >>at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> >>at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> >>at
> >>
> >>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> >>at scala.collection.immutable.List.foreach(List.scala:45)
> >>at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
> >> FetcherRunnable
> >> kafka.message.InvalidMessageException: message is invalid, compression
> >> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
> offset:
> >> 120758878080
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >>at
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> >>at
> >>
> >>
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> >>at
> >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> >>at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> >>at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> >>at
> >>
> >>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> >>at scala.collection.immutable.List.foreach(List.scala:45)
> >>at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >>
>


RE: How to compile with a newer version of zookeeper

2014-02-11 Thread Neha Narkhede
Yes. That is 3.3.4.

Thanks,
Neha
On Feb 11, 2014 6:59 AM, "Yu, Libo"  wrote:

> When I telnet to the zookeeper and type "status", this is what I got:
>
> Zookeeper version: 3.3.3-1203054, built on 11/17/2011 05:47 GMT
>
> Is that 3.3.4? So 0.8 final also uses 3.3.4, is that right? Thanks.
>
> Regards,
>
> Libo
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Monday, February 10, 2014 11:26 PM
> To: users@kafka.apache.org
> Subject: Re: How to compile with a newer version of zookeeper
>
> 0.8-beta already depends on zookeeper 3.3.4. Also, Kafka 0.8 final is
> better and more stable compared to 0.8-beta
>
> Thanks,
> Neha
>
>
> On Mon, Feb 10, 2014 at 6:19 PM, Libo Yu  wrote:
>
> > Hi team,
> >
> > We are using Kafka 0.8-beta1. The zookeeper in it is 3.3.3 (although
> > the version in the license file is 3.3.4).
> > I want to upgrade to a newer version. Any idea what I need to do in
> > order to compile broker with a newer version of zookeeper? Thanks.
> >
> > Libo
> >
>


RE: How to compile with a newer version of zookeeper

2014-02-11 Thread Yu, Libo
When I telnet to the zookeeper and type "status", this is what I got:

Zookeeper version: 3.3.3-1203054, built on 11/17/2011 05:47 GMT

Is that 3.3.4? So 0.8 final also uses 3.3.4, is that right? Thanks.

Regards,

Libo

-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Monday, February 10, 2014 11:26 PM
To: users@kafka.apache.org
Subject: Re: How to compile with a newer version of zookeeper

0.8-beta already depends on zookeeper 3.3.4. Also, Kafka 0.8 final is better 
and more stable compared to 0.8-beta

Thanks,
Neha


On Mon, Feb 10, 2014 at 6:19 PM, Libo Yu  wrote:

> Hi team,
>
> We are using Kafka 0.8-beta1. The zookeeper in it is 3.3.3 (although 
> the version in the license file is 3.3.4).
> I want to upgrade to a newer version. Any idea what I need to do in 
> order to compile broker with a newer version of zookeeper? Thanks.
>
> Libo
>


Re: Description of jmx exposed metrics?

2014-02-11 Thread Andrew Otto
Although that is not all of them, just recommended ones to pay attention to :/


On Feb 11, 2014, at 6:50 AM, Tomas Nunez  wrote:

> Hi!
> 
> Sorry if this question has already been answered, but I've search the
> archives, the project page and the wiki unsuccessfully.
> 
> I'd like to know the meaning of the jmx exposed metrics. I can guess based
> on the name but I'd like to be sure. Plus, some of them seem to be reset
> when I restart the server (or when a new file is created in the topic
> directory, I'm not sure), and some of them don't.
> 
> Is there a doc anywhere with this info?
> 
> PD: I'm using 0.7.2 and I know 0.8 changes some things, but I guess the
> basic stuff will remain the same...
> 
> -- 
> 
> 
> --
> 
> 
> 



Re: Description of jmx exposed metrics?

2014-02-11 Thread Andrew Otto
Here tis!

https://kafka.apache.org/documentation.html#monitoring


On Feb 11, 2014, at 6:50 AM, Tomas Nunez  wrote:

> Hi!
> 
> Sorry if this question has already been answered, but I've search the
> archives, the project page and the wiki unsuccessfully.
> 
> I'd like to know the meaning of the jmx exposed metrics. I can guess based
> on the name but I'd like to be sure. Plus, some of them seem to be reset
> when I restart the server (or when a new file is created in the topic
> directory, I'm not sure), and some of them don't.
> 
> Is there a doc anywhere with this info?
> 
> PD: I'm using 0.7.2 and I know 0.8 changes some things, but I guess the
> basic stuff will remain the same...
> 
> -- 
> 
> 
> --
> 
> 
> 



Description of jmx exposed metrics?

2014-02-11 Thread Tomas Nunez
Hi!

Sorry if this question has already been answered, but I've search the
archives, the project page and the wiki unsuccessfully.

I'd like to know the meaning of the jmx exposed metrics. I can guess based
on the name but I'd like to be sure. Plus, some of them seem to be reset
when I restart the server (or when a new file is created in the topic
directory, I'm not sure), and some of them don't.

Is there a doc anywhere with this info?

PD: I'm using 0.7.2 and I know 0.8 changes some things, but I guess the
basic stuff will remain the same...

-- 


--





Monitoring producer failures on kafka 0.7 (using zookeeper)

2014-02-11 Thread Laurent Thoulon

Hi, 

I'm working with a kafka producer in async mode using zookeeper. 
>From time to time, i observe network problems and i see the following error in 
>my logs: 

[2014-02-11 11:21:57,848][INFO ][org.apache.zookeeper.ClientCnxn] Unable to 
read additional data from server sessionid 0x1441cc4a9c5001f, likely server has 
closed socket, closing socket connection and attempting reconnect 
[2014-02-11 11:21:57,949][INFO ][org.I0Itec.zkclient.ZkClient] zookeeper state 
changed (Disconnected) 
[2014-02-11 11:21:59,143][INFO ][org.apache.zookeeper.ClientCnxn] Opening 
socket connection to server obfuscated . Will not attempt to authenticate using 
SASL (unknown error) 
[2014-02-11 11:21:59,146][WARN ][org.apache.zookeeper.ClientCnxn] Session 
0x1441cc4a9c5001f for server null, unexpected error, closing socket connection 
and attempting reconnect 
java.net.ConnectException: Connection refused 
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
 
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068) 

The problem is, my code isn't catching this error and i haven't found anyway to 
do so. 
I thought about using the CallbackHandler but it doesn't have any onError 
method. 

Is there anyway to trigger some process when connection fails ? 
What i would like to do is to put my trafic on hold when this happens and until 
the connection is back up so i'm sure each incoming request that succeeds is 
logged into kafka. 

FYI, I'm not using the synchronized producer for performance issues. 

Thanks for your help on this matter. 

Laurent 


RE: Dropping messages ?

2014-02-11 Thread Kat Walker
Hi Guozhang

The dropped messages were found in one of the topic partitions as indicated in 
my first email. It is clear that the messages were persisted in Kafka logs.

Can you answer my other questions regarding Kafka's consistency and durability.

Thanks
Kat

> Date: Tue, 11 Feb 2014 00:24:27 -0800
> Subject: Re: Dropping messages ?
> From: wangg...@gmail.com
> To: users@kafka.apache.org
> 
> Can you verify if the missing messages exist in the broker or not by ways
> Jun have said, you can just create another consumer group which does no
> processing but just count messages.
> 
> Guozhang
> 
> 
> 
> On Mon, Feb 10, 2014 at 10:23 PM, Kat Walker  wrote:
> 
> > Hi Jun/Guozhang
> >
> > We might have to retry our QA tests in its entirety. We simply cannot
> > reset consumer offset as there is a lot of processing involved after
> > consuming those messages. This might take almost a week. The Kafka message
> > also contains `volatile` data which is fetched from a database and
> > destroyed after consuming  that Kafka message.
> >
> > What is puzzling is that we have been running Kafka in Production for over
> > 4 months now and we have never faced this issue. Our peak volume is <= 1000
> > messages / second. On an average, it is less than 100.
> >
> > Our zookeeper version is 3.3.6. What I suspect is that we managed to `roll
> > over` a few messages due to inconsistencies in zookeeper offsets. Recall
> > that we only had only 1 Kafka broker. Once again, this is just a `wild`
> > speculation.
> >
> > The topics were created before we started our tests.
> >
> > Has someone ever lost a Kafka message before ? Are there any guarantees ?
> > We are okay with all kinds of delays provided, the messages arrive in order
> > and are delivered regardless.
> >
> > Thanks
> > Kat
> >
> > > Date: Mon, 10 Feb 2014 07:38:01 -0800
> > > Subject: Re: Dropping messages ?
> > > From: jun...@gmail.com
> > > To: users@kafka.apache.org
> > >
> > > If you reset the consumer offset and try to consume those messages again,
> > > do you see the same drop?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:
> > >
> > > > Hi
> > > >
> > > > We have been using Kafka(0.8) for the past few months with the
> > following
> > > > setup
> > > > Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
> > > >
> > > > Yesterday, while running Stress tests in one of the QA machines , we
> > > > observed that a few messages which were produced within a couple of
> > > > milliseconds of each other did not reach the Kafka consumer. ie There
> > was
> > > > no trace of that message at the consumer end.
> > > >
> > > > We decided to check whether we had any errors at our side or there was
> > a
> > > > network issue. We did not find any issue. We then decided to check
> > whether
> > > > we can find that message in one of the Kafka partitions. The message
> > was
> > > > found in one of the topic partitions.
> > > >
> > > > We are not sure why Kafka did not notify any consumers about the
> > message.
> > > > Are there any special cases where Kafka silently drops a message ?
> > > >
> > > > We also found a delay in the notifications/watches triggered from
> > > > zookeeper. We are not sure whether these are related ? It will be
> > difficult
> > > > to reproduce as the test probably took a few days to complete. But
> > surely
> > > > we did lose approximately 5% of the messages. We have logs of messages
> > > > being produced at the producer side and corresponding entries in Kafka
> > > > partitions logs. But nothing at the consumer side. The only repeating
> > > > pattern was that the messages were probably produced within the same
> > > > millisecond. So if you have a sequence of messages which was produced
> > in
> > > > the same millisecond like M0, M1, M2, M3 ie 4 messages. We probably
> > have
> > > > M0,M1,M3 but not M2. This is puzzling as to how only message is
> > dropped out
> > > > of the given 4.
> > > >
> > > >
> > > > We use the High Level Kafka Producer and Consumer. Both are single
> > > > threaded(at our end).
> > > >
> > > > Does kafka need its own dedicated zookeeper ensemble ? We also use the
> > > > same zookeeper ensemble as  our configuration service.
> > > >
> > > > Unfortunately, we did not have DEBUG messages at the server enabled
> > during
> > > > the setup. Although, NO error messages were observed during the same
> > time
> > > > period.
> > > >
> > > >
> > > > Before we try running the same Tests again, can someone please shed
> > more
> > > > light as to the reasons why kafka dropped a few messages ?
> > > >
> > > > Kat
> > > >
> >
> >
> 
> 
> 
> -- 
> -- Guozhang
  

Producer connection unsuccessful

2014-02-11 Thread Harshal Bhutkar
I am running zookeeper and 2 brokers(port 9093 and 9094) on remote machine. I 
am able create topic using 'kafka-create-topic.sh'
I am also to get topic list using 'kafka-list-topic.sh'
But whenever I try to produce msg it gives me following error
ERROR Producer connection to ipaddres:9093 unsuccessful 
(kafka.producer.SyncProducer)
java.net.ConnectException: Connection refused

DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.



Re: Mirrormaker clients not balanced

2014-02-11 Thread Tomas Nunez
You are totally right. I just tried, and it works.

I thought about creating the dirs and "see what happens" but I was afraid I
could break anything :D

Thank you very, Jun!


On Tue, Feb 11, 2014 at 5:31 AM, Jun Rao  wrote:

> This is probably your issue:
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-IamusingtheZK-basedproducerin0.7andIseedataonlyproducedonsomeofthebrokers,butnotall,why
> ?
>
> Thanks,
>
> Jun
>
>
> On Mon, Feb 10, 2014 at 4:04 PM, Tomas Nunez  wrote:
>
> > I don't see anything in kafabe03. In fact, I just restarted it, and I
> > saw in the logs a lot of other topics registering. First, lines like
> > this:
> > (...)
> > [2014-02-10 20:30:07,195] INFO Loading log 'topic2-0'
> > (kafka.log.LogManager)
> > [2014-02-10 20:30:07,201] INFO Loading the last segment
> > /var/kafka/topic2-0/000559732292.kafka in mutable mode,
> > recovery false (kafka.log.Log)
> > (...)
> > [2014-02-10 20:30:07,464] INFO Registering broker /brokers/ids/3
> > (kafka.server.KafkaZooKeeper)
> > [2014-02-10 20:30:07,486] INFO Registering broker /brokers/ids/3
> > succeeded with
> > id:3,creatorId:kafkabe03-1392064207464,host:kafkabe03,port:9092
> > (kafka.server.KafkaZooKeeper)
> > [2014-02-10 20:30:07,516] INFO Begin registering broker topic
> > /brokers/topics/topic2/3 with 1 partitions
> > (kafka.server.KafkaZooKeeper)
> > [2014-02-10 20:30:07,522] INFO End registering broker topic
> > /brokers/topics/topic2/3 (kafka.server.KafkaZooKeeper)
> > (...)
> >
> > But no trace of this particular topic, topic1.
> >
> > Any idea why this particular topic isn't registering? Where can I look?
> >
> > Thanks again
> > Tomàs
> >
> >
> > One of the be brokers (3) is not registered in ZK. Do you see ZK session
> > expiration (potentially due to GC) in that broker?
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> >
> > On Mon, Feb 10, 2014 at 4:28 PM, Tomas Nunez  wrote:
> >
> > > Hi
> > >
> > > I'm new around here and I'm dealing with a problem, and reading the
> > > documentation I don't know where else to look.
> > >
> > > I have a cross-dc mirrormaker setup: Mirrormaker is consuming from 5
> > > frontend servers in each DC (10 in total) and 5 backend servers are
> > > consuming from mirrormaker. That's working for most of the topics, but
> > some
> > > of them are not being consumed from all backend servers.
> > >
> > > as an example, browsing with zkcli I find this:
> > >
> > > kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh
> -server
> > > zookeeperbe01:2181 ls /brokers/topics/topic1
> > > Connecting to zookeeperbe01:2181
> > >
> > > WATCHER::
> > >
> > > WatchedEvent state:SyncConnected type:None path:null
> > > [2, 1, 5, 4]
> > > kafka-0.7.2-incubating-src/bin$ /usr/share/zookeeper/bin/zkCli.sh
> -server
> > > zookeeperfe01:2181 ls /brokers/topics/topic1
> > > Connecting to zookeeperfe01:2181
> > >
> > > WATCHER::
> > >
> > > WatchedEvent state:SyncConnected type:None path:null
> > > [3, 2, 1, 5, 4]
> > >
> > > What can be wrong? Where can I look to get more info to troubleshoot
> > this?
> > > Any hint?
> > >
> > > Kafka version is 0.7.2, in mirromaker config I have "--num.streams 5
> > > --num.producers 5".  Zookeeper version is 3.3.5,
> > >
> > > Here you can see kafka-console-consumer.sh connecting to both fe and be
> > > servers:
> > > https://gist.github.com/pythiannunez/1623934cb538678f053e
> > >
> > > Thanks!
> > >
> > >
> >
> > --
> >
> >
> > --
> >
> >
> >
> >
>

-- 


--





Re: kafka consumer not consuming messages

2014-02-11 Thread Arjun

With the same group id from the console consumer its working fine.

On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:

Arjun,

Are you using the same group name for the console consumer and the java
consumer?

Guozhang


On Mon, Feb 10, 2014 at 11:38 PM, Arjun  wrote:


Hi Jun,

No its not that problem. I am not getting what the problem is can you
please help.

thanks
Arjun Narasimha Kota


On Monday 10 February 2014 09:10 PM, Jun Rao wrote:


Does
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
Whydoesmyconsumernevergetanydata?
apply?

Thanks,

Jun


On Sun, Feb 9, 2014 at 10:27 PM, Arjun  wrote:

  Hi,

I started using kafka some time back. I was experimenting with 0.8. My
problem is the kafka is unable to consume the messages. My configuration
is kafka broker on the local host and zookeeper on the local host. I
have only one broker and one consumer at present.

What have I done:
   1) I used the java examples in the kafka src and pushed some 600
messages to the broker
   2) I used the console consumer to check weather the messages are
there in the broker or not. Console consumer printed all 600 messages
   3) Now i used the java Consumer code, and tried to get those
messages. This is not printing any messages. It just got stuck

When was it working earlier:
   -When i tried with three brokers and three consumers in the same
machine, with the same configuration it worked fine.
   -I changed the properties accordingly when i tried to make it work
with one broker and one consumer

What does log say:
   - attaching the logs even

If some one points me where I am doing wrong it would be helpful.

Thanks
Arjun Narasimha Kota









Re: kafka consumer not consuming messages

2014-02-11 Thread Arjun

nope i will try that. thanks for suggesting

On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:

Arjun,

Are you using the same group name for the console consumer and the java
consumer?

Guozhang


On Mon, Feb 10, 2014 at 11:38 PM, Arjun  wrote:


Hi Jun,

No its not that problem. I am not getting what the problem is can you
please help.

thanks
Arjun Narasimha Kota


On Monday 10 February 2014 09:10 PM, Jun Rao wrote:


Does
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
Whydoesmyconsumernevergetanydata?
apply?

Thanks,

Jun


On Sun, Feb 9, 2014 at 10:27 PM, Arjun  wrote:

  Hi,

I started using kafka some time back. I was experimenting with 0.8. My
problem is the kafka is unable to consume the messages. My configuration
is kafka broker on the local host and zookeeper on the local host. I
have only one broker and one consumer at present.

What have I done:
   1) I used the java examples in the kafka src and pushed some 600
messages to the broker
   2) I used the console consumer to check weather the messages are
there in the broker or not. Console consumer printed all 600 messages
   3) Now i used the java Consumer code, and tried to get those
messages. This is not printing any messages. It just got stuck

When was it working earlier:
   -When i tried with three brokers and three consumers in the same
machine, with the same configuration it worked fine.
   -I changed the properties accordingly when i tried to make it work
with one broker and one consumer

What does log say:
   - attaching the logs even

If some one points me where I am doing wrong it would be helpful.

Thanks
Arjun Narasimha Kota









Re: kafka consumer not consuming messages

2014-02-11 Thread Guozhang Wang
Arjun,

Are you using the same group name for the console consumer and the java
consumer?

Guozhang


On Mon, Feb 10, 2014 at 11:38 PM, Arjun  wrote:

> Hi Jun,
>
> No its not that problem. I am not getting what the problem is can you
> please help.
>
> thanks
> Arjun Narasimha Kota
>
>
> On Monday 10 February 2014 09:10 PM, Jun Rao wrote:
>
>> Does
>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
>> Whydoesmyconsumernevergetanydata?
>> apply?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Sun, Feb 9, 2014 at 10:27 PM, Arjun  wrote:
>>
>>  Hi,
>>>
>>> I started using kafka some time back. I was experimenting with 0.8. My
>>> problem is the kafka is unable to consume the messages. My configuration
>>> is kafka broker on the local host and zookeeper on the local host. I
>>> have only one broker and one consumer at present.
>>>
>>> What have I done:
>>>   1) I used the java examples in the kafka src and pushed some 600
>>> messages to the broker
>>>   2) I used the console consumer to check weather the messages are
>>> there in the broker or not. Console consumer printed all 600 messages
>>>   3) Now i used the java Consumer code, and tried to get those
>>> messages. This is not printing any messages. It just got stuck
>>>
>>> When was it working earlier:
>>>   -When i tried with three brokers and three consumers in the same
>>> machine, with the same configuration it worked fine.
>>>   -I changed the properties accordingly when i tried to make it work
>>> with one broker and one consumer
>>>
>>> What does log say:
>>>   - attaching the logs even
>>>
>>> If some one points me where I am doing wrong it would be helpful.
>>>
>>> Thanks
>>> Arjun Narasimha Kota
>>>
>>>
>>>
>


-- 
-- Guozhang


Re: Dropping messages ?

2014-02-11 Thread Guozhang Wang
Can you verify if the missing messages exist in the broker or not by ways
Jun have said, you can just create another consumer group which does no
processing but just count messages.

Guozhang



On Mon, Feb 10, 2014 at 10:23 PM, Kat Walker  wrote:

> Hi Jun/Guozhang
>
> We might have to retry our QA tests in its entirety. We simply cannot
> reset consumer offset as there is a lot of processing involved after
> consuming those messages. This might take almost a week. The Kafka message
> also contains `volatile` data which is fetched from a database and
> destroyed after consuming  that Kafka message.
>
> What is puzzling is that we have been running Kafka in Production for over
> 4 months now and we have never faced this issue. Our peak volume is <= 1000
> messages / second. On an average, it is less than 100.
>
> Our zookeeper version is 3.3.6. What I suspect is that we managed to `roll
> over` a few messages due to inconsistencies in zookeeper offsets. Recall
> that we only had only 1 Kafka broker. Once again, this is just a `wild`
> speculation.
>
> The topics were created before we started our tests.
>
> Has someone ever lost a Kafka message before ? Are there any guarantees ?
> We are okay with all kinds of delays provided, the messages arrive in order
> and are delivered regardless.
>
> Thanks
> Kat
>
> > Date: Mon, 10 Feb 2014 07:38:01 -0800
> > Subject: Re: Dropping messages ?
> > From: jun...@gmail.com
> > To: users@kafka.apache.org
> >
> > If you reset the consumer offset and try to consume those messages again,
> > do you see the same drop?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Feb 10, 2014 at 1:21 AM, A A  wrote:
> >
> > > Hi
> > >
> > > We have been using Kafka(0.8) for the past few months with the
> following
> > > setup
> > > Kafka Broker - 1Zookeepers Ensemble - 3Partitions per topic - 3
> > >
> > > Yesterday, while running Stress tests in one of the QA machines , we
> > > observed that a few messages which were produced within a couple of
> > > milliseconds of each other did not reach the Kafka consumer. ie There
> was
> > > no trace of that message at the consumer end.
> > >
> > > We decided to check whether we had any errors at our side or there was
> a
> > > network issue. We did not find any issue. We then decided to check
> whether
> > > we can find that message in one of the Kafka partitions. The message
> was
> > > found in one of the topic partitions.
> > >
> > > We are not sure why Kafka did not notify any consumers about the
> message.
> > > Are there any special cases where Kafka silently drops a message ?
> > >
> > > We also found a delay in the notifications/watches triggered from
> > > zookeeper. We are not sure whether these are related ? It will be
> difficult
> > > to reproduce as the test probably took a few days to complete. But
> surely
> > > we did lose approximately 5% of the messages. We have logs of messages
> > > being produced at the producer side and corresponding entries in Kafka
> > > partitions logs. But nothing at the consumer side. The only repeating
> > > pattern was that the messages were probably produced within the same
> > > millisecond. So if you have a sequence of messages which was produced
> in
> > > the same millisecond like M0, M1, M2, M3 ie 4 messages. We probably
> have
> > > M0,M1,M3 but not M2. This is puzzling as to how only message is
> dropped out
> > > of the given 4.
> > >
> > >
> > > We use the High Level Kafka Producer and Consumer. Both are single
> > > threaded(at our end).
> > >
> > > Does kafka need its own dedicated zookeeper ensemble ? We also use the
> > > same zookeeper ensemble as  our configuration service.
> > >
> > > Unfortunately, we did not have DEBUG messages at the server enabled
> during
> > > the setup. Although, NO error messages were observed during the same
> time
> > > period.
> > >
> > >
> > > Before we try running the same Tests again, can someone please shed
> more
> > > light as to the reasons why kafka dropped a few messages ?
> > >
> > > Kat
> > >
>
>



-- 
-- Guozhang