RE: ConsumerRebalanceFailedException

2014-02-28 Thread Yu, Libo
e the reason for the failed 
rebalance attempts.

Thanks,
Neha


On Tue, Feb 25, 2014 at 12:01 PM, Yu, Libo  wrote:

> Hi all,
>
> I tried to reproduce this exception. In case one, when no broker was 
> running, I launched all consumers and got this exception. In case two, 
> while the consumers and brokers were running, I shutdown all brokers 
> one by one and did not see this exception. I wonder why in case two 
> this exception did not occur. Thanks.
>
>
> Regards,
>
> Libo
>
>


ConsumerRebalanceFailedException

2014-02-25 Thread Yu, Libo
Hi all,

I tried to reproduce this exception. In case one, when no broker was running, I 
launched all consumers and
got this exception. In case two, while the consumers and brokers were running, 
I shutdown all brokers one by
one and did not see this exception. I wonder why in case two this exception did 
not occur. Thanks.


Regards,

Libo



RE: broker offline

2014-02-21 Thread Yu, Libo
In our case, two brokers were offline. When the first broker was offline,
that would trigger a rebalance. When the second broker was offline, if the 
consumers were in the process of rebalance, what do we expect? Is the 
second rebalance request queued? 

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Friday, February 14, 2014 7:33 PM
To: users@kafka.apache.org
Subject: Re: broker offline

Hello Libo,

When ZK resumes from a soft failure, like a GC, it will mark the ephemeral 
nodes as session timed out, and the brokers will try to re-register upon 
receiving the session timeout. You can re-produce this issue by signal pause 
the ZK process.

Guozhang


On Fri, Feb 14, 2014 at 12:15 PM, Yu, Libo  wrote:

> Hi team,
>
> We have three brokers on our production cluster. I noticed two of them 
> somehow got offline and then re-registered with zookeeper and got back 
> online. It seems the issue was caused by some zookeeper issue. So I 
> want to know what may be the possible cases of the issue. If I want to 
> reproduce the issue, is there any way to do it? Thanks.
>
> Regards,
>
> Libo
>
>


--
-- Guozhang


broker offline

2014-02-14 Thread Yu, Libo
Hi team,

We have three brokers on our production cluster. I noticed two of them somehow
got offline and then re-registered with zookeeper and got back online. It seems 
the
issue was caused by some zookeeper issue. So I want to know what may be the 
possible
cases of the issue. If I want to reproduce the issue, is there any way to do 
it? Thanks.

Regards,

Libo



RE: some brokers cannot register themselves with zookeeper

2014-02-12 Thread Yu, Libo
Thanks, Guozhang. We are using 0.8-beta1.

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Tuesday, February 11, 2014 6:40 PM
To: users@kafka.apache.org
Subject: Re: some brokers cannot register themselves with zookeeper

Hello Libo,

Which Kafka version are you using? Pre-0.8.1 there is a bug that can cause a 
registration path to be deleted:

https://issues.apache.org/jira/browse/KAFKA-992

And this has been fixed in 0.8.1

Guozhang


On Tue, Feb 11, 2014 at 1:16 PM, Yu, Libo  wrote:

> Hi team,
>
> This is an issue that has frustrated me for quit some time. One of our 
> clusters has three hosts. In my startup script, three zookeeper 
> processes are brought up first followed by three kafka processes. The 
> problem we have is that after three kafka processes are up, only one 
> broker has been registered in zookeeper (In this case, host three). If 
> I manually kill the kafka processes on host one and host two and 
> restart them, they can register themselves with zookeeper 
> successfully. I've attached logs from host one.
> The log indicated
> broker 1 was registered at /brokers/ids. When I checked zookeeper, I 
> found only broker 3 was registered. It seems there is a race 
> condition.
>
>
> [2014-02-11 15:20:55,266] INFO Session establishment complete on 
> server cfgtps1q -phys/HostOne:9181, sessionid = 0x144229beb98, 
> negotiated timeout = 100
> 00 (org.apache.zookeeper.ClientCnxn)
> [2014-02-11 15:20:55,268] INFO zookeeper state changed (SyncConnected) 
> (org.I0It
> ec.zkclient.ZkClient)
> [2014-02-11 15:20:55,378] INFO /brokers/ids/1 exists with value { 
> "host":"cfgtps 1q-phys.nam.nsroot.net", "jmx_port":, "port":11934, 
> "version":1 } during con nection loss; this is ok 
> (kafka.utils.ZkUtils$)
> [2014-02-11 15:20:55,379] INFO Registered broker 1 at path 
> /brokers/ids/1 with a ddress hostone.xxx.xx.net:11934. 
> (kafka.utils.ZkUtils$)
> [2014-02-11 15:20:55,380] INFO [Kafka Server 1], Connecting to ZK: 
> HostOne :9181, HostTwo:9181, HostThree:9181 (kafka.server.KafkaServer)
> [2014-02-11 15:20:55,511] INFO Will not load MX4J, mx4j-tools.jar is 
> not in the classpath (kafka.utils.Mx4jLoader$)
> [2014-02-11 15:20:55,520] INFO conflict in /controller data: 1 stored
> data: 3 (k
> afka.utils.ZkUtils$)
> [2014-02-11 15:20:55,538] INFO [Kafka Server 1], Started 
> (kafka.server.KafkaServ
> er)
> [2014-02-11 15:20:58,015] INFO 1 successfully elected as leader 
> (kafka.server.Zo
> okeeperLeaderElector)
> [2014-02-11 15:20:58,605] INFO Accepted socket connection from
> /HostThree:52420 (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:20:58,609] INFO Client attempting to establish new 
> session at /HostThree:52420 
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:20:58,616] INFO Established session 0x144229beb980001 
> with negotiated timeout 1 for client /HostThree:52420
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:21:01,064] INFO New leader is 1
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2014-02-11 15:21:36,375] INFO Accepted socket connection from
> /xx.xx.xxx.xx:54709 (org.apache.zookeeper.server.NIOServerCnxn)
> [2014-02-11 15:21:36,378] INFO Client attempting to establish new 
> session at /xx.xx.xxx.xx:54709 
> (org.apache.zookeeper.server.NIOServerCnxn)
>
> Regards,
>
> Libo
>



--
-- Guozhang


some brokers cannot register themselves with zookeeper

2014-02-11 Thread Yu, Libo
Hi team,

This is an issue that has frustrated me for quit some time. One of our clusters 
has
three hosts. In my startup script, three zookeeper processes are brought up 
first followed
by three kafka processes. The problem we have is that after three kafka 
processes are up,
only one broker has been registered in zookeeper (In this case, host three). If 
I manually
kill the kafka processes on host one and host two and restart them, they can 
register
themselves with zookeeper successfully. I've attached logs from host one. The 
log indicated
broker 1 was registered at /brokers/ids. When I checked zookeeper, I found only 
broker 3
was registered. It seems there is a race condition.


[2014-02-11 15:20:55,266] INFO Session establishment complete on server cfgtps1q
-phys/HostOne:9181, sessionid = 0x144229beb98, negotiated timeout = 100
00 (org.apache.zookeeper.ClientCnxn)
[2014-02-11 15:20:55,268] INFO zookeeper state changed (SyncConnected) (org.I0It
ec.zkclient.ZkClient)
[2014-02-11 15:20:55,378] INFO /brokers/ids/1 exists with value { "host":"cfgtps
1q-phys.nam.nsroot.net", "jmx_port":, "port":11934, "version":1 } during con
nection loss; this is ok (kafka.utils.ZkUtils$)
[2014-02-11 15:20:55,379] INFO Registered broker 1 at path /brokers/ids/1 with a
ddress hostone.xxx.xx.net:11934. (kafka.utils.ZkUtils$)
[2014-02-11 15:20:55,380] INFO [Kafka Server 1], Connecting to ZK: HostOne
:9181, HostTwo:9181, HostThree:9181 (kafka.server.KafkaServer)
[2014-02-11 15:20:55,511] INFO Will not load MX4J, mx4j-tools.jar is not in the
classpath (kafka.utils.Mx4jLoader$)
[2014-02-11 15:20:55,520] INFO conflict in /controller data: 1 stored data: 3 (k
afka.utils.ZkUtils$)
[2014-02-11 15:20:55,538] INFO [Kafka Server 1], Started (kafka.server.KafkaServ
er)
[2014-02-11 15:20:58,015] INFO 1 successfully elected as leader (kafka.server.Zo
okeeperLeaderElector)
[2014-02-11 15:20:58,605] INFO Accepted socket connection from /HostThree:52420 
(org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:20:58,609] INFO Client attempting to establish new session at 
/HostThree:52420 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:20:58,616] INFO Established session 0x144229beb980001 with 
negotiated timeout 1 for client /HostThree:52420 
(org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:21:01,064] INFO New leader is 1 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-02-11 15:21:36,375] INFO Accepted socket connection from 
/xx.xx.xxx.xx:54709 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-02-11 15:21:36,378] INFO Client attempting to establish new session at 
/xx.xx.xxx.xx:54709 (org.apache.zookeeper.server.NIOServerCnxn)

Regards,

Libo


RE: How to compile with a newer version of zookeeper

2014-02-11 Thread Yu, Libo
When I telnet to the zookeeper and type "status", this is what I got:

Zookeeper version: 3.3.3-1203054, built on 11/17/2011 05:47 GMT

Is that 3.3.4? So 0.8 final also uses 3.3.4, is that right? Thanks.

Regards,

Libo

-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Monday, February 10, 2014 11:26 PM
To: users@kafka.apache.org
Subject: Re: How to compile with a newer version of zookeeper

0.8-beta already depends on zookeeper 3.3.4. Also, Kafka 0.8 final is better 
and more stable compared to 0.8-beta

Thanks,
Neha


On Mon, Feb 10, 2014 at 6:19 PM, Libo Yu  wrote:

> Hi team,
>
> We are using Kafka 0.8-beta1. The zookeeper in it is 3.3.3 (although 
> the version in the license file is 3.3.4).
> I want to upgrade to a newer version. Any idea what I need to do in 
> order to compile broker with a newer version of zookeeper? Thanks.
>
> Libo
>


RE: Consumer's behavior when brokers are temporarily not available.

2014-02-07 Thread Yu, Libo
Thanks, Neha. We are planning to migrate to 0.8.1.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Friday, February 07, 2014 10:26 AM
To: users@kafka.apache.org
Subject: RE: Consumer's behavior when brokers are temporarily not available.

Yes bouncing the process will allow you to consume again. Also would you mind 
giving 0.8 final a try? It is much more stable compared to 0.8 beta.

Thanks,
Neha
On Feb 7, 2014 6:49 AM, "Yu, Libo"  wrote:

> We are using 0.8 beta1. Our zookeeper had some issue which in turn 
> triggered consumer rebalance.
> In this case, after maximum number of retries, the rebalance failed, 
> should we bounce our process in order to consume again?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Friday, February 07, 2014 12:03 AM
> To: users@kafka.apache.org
> Subject: Re: Consumer's behavior when brokers are temporarily not 
> available.
>
> Which version are you using? In 0.8, broker failure won't trigger 
> consumer rebalances. Only changes in #partitions and consumers will 
> trigger rebalances.
>
> Thanks,
>
> Jun
>
>
> On Thu, Feb 6, 2014 at 10:46 AM, Yu, Libo  wrote:
>
> > While the broker is not available (caused by zookeeper issue), the 
> > rebalance will fail. Should rebalance succeed in this case? Thanks.
> >
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > Sent: Thursday, February 06, 2014 12:49 PM
> > To: users@kafka.apache.org
> > Subject: Re: Consumer's behavior when brokers are temporarily not 
> > available.
> >
> > Neha is right, though it is a little interesting that consumers did 
> > not achieve a concensus after four retries have exhasuted, for just 
> > an event of broker-failure.
> >
> > Could you check your consumer log searching for "begin rebalance" 
> > and check what caused these rebalances to fail?
> >
> > Guozhang
> >
> >
> > On Thu, Feb 6, 2014 at 9:12 AM, Neha Narkhede 
> >  > >wrote:
> >
> > > The consumer only retries "rebalance.max.retries" times. Once it 
> > > runs out of the retries, it needs to be restarted to consume again.
> > >
> > >
> > >
> > >
> > > On Thu, Feb 6, 2014 at 9:05 AM, Yu, Libo  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > This is what we experienced recently:
> > > > Some zookeeper's issue made broker unavailable for a short 
> > > > period of
> > > time.
> > > > On the consumer side, this triggered rebalance and rebalanced 
> > > > failed
> > > after
> > > > four tries.
> > > > So while should we expect while the broker is not up? Should 
> > > > consumer
> > > keep
> > > > trying to rebalance or wait for the brokers to be brought back?
> >  Thanks.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


RE: Consumer's behavior when brokers are temporarily not available.

2014-02-07 Thread Yu, Libo
We are using 0.8 beta1. Our zookeeper had some issue which in turn triggered 
consumer rebalance.
In this case, after maximum number of retries, the rebalance failed, should we 
bounce our process
in order to consume again?

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Friday, February 07, 2014 12:03 AM
To: users@kafka.apache.org
Subject: Re: Consumer's behavior when brokers are temporarily not available.

Which version are you using? In 0.8, broker failure won't trigger consumer 
rebalances. Only changes in #partitions and consumers will trigger rebalances.

Thanks,

Jun


On Thu, Feb 6, 2014 at 10:46 AM, Yu, Libo  wrote:

> While the broker is not available (caused by zookeeper issue), the 
> rebalance will fail. Should rebalance succeed in this case? Thanks.
>
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: Thursday, February 06, 2014 12:49 PM
> To: users@kafka.apache.org
> Subject: Re: Consumer's behavior when brokers are temporarily not 
> available.
>
> Neha is right, though it is a little interesting that consumers did 
> not achieve a concensus after four retries have exhasuted, for just an 
> event of broker-failure.
>
> Could you check your consumer log searching for "begin rebalance" and 
> check what caused these rebalances to fail?
>
> Guozhang
>
>
> On Thu, Feb 6, 2014 at 9:12 AM, Neha Narkhede  >wrote:
>
> > The consumer only retries "rebalance.max.retries" times. Once it 
> > runs out of the retries, it needs to be restarted to consume again.
> >
> >
> >
> >
> > On Thu, Feb 6, 2014 at 9:05 AM, Yu, Libo  wrote:
> >
> > > Hi folks,
> > >
> > > This is what we experienced recently:
> > > Some zookeeper's issue made broker unavailable for a short period 
> > > of
> > time.
> > > On the consumer side, this triggered rebalance and rebalanced 
> > > failed
> > after
> > > four tries.
> > > So while should we expect while the broker is not up? Should 
> > > consumer
> > keep
> > > trying to rebalance or wait for the brokers to be brought back?
>  Thanks.
> > >
> > >
> > >
> > >
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


RE: Consumer's behavior when brokers are temporarily not available.

2014-02-06 Thread Yu, Libo
"Restarted", do you mean restart the process/recreate a consumer from scratch 
in order to consume again?

This is quite different from the answer I got previously from here. I have more 
questions here.
Is "rebalance.backoff.ms" the time interval between any consecutive retries 
before  "rebalance.max.retries"  is reached?

This is what I was told previously.
After "rebalance.max.retries" number of retries, the consumer would back off 
for "rebalance.backoff.ms" before retrying 
again. Please clarify. Thanks.

Regards,

Libo

-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Thursday, February 06, 2014 12:12 PM
To: users@kafka.apache.org
Subject: Re: Consumer's behavior when brokers are temporarily not available.

The consumer only retries "rebalance.max.retries" times. Once it runs out of 
the retries, it needs to be restarted to consume again.




On Thu, Feb 6, 2014 at 9:05 AM, Yu, Libo  wrote:

> Hi folks,
>
> This is what we experienced recently:
> Some zookeeper's issue made broker unavailable for a short period of time.
> On the consumer side, this triggered rebalance and rebalanced failed 
> after four tries.
> So while should we expect while the broker is not up? Should consumer 
> keep trying to rebalance or wait for the brokers to be brought back?  Thanks.
>
>
>
>
>
> Regards,
>
> Libo
>
>


RE: Consumer's behavior when brokers are temporarily not available.

2014-02-06 Thread Yu, Libo
While the broker is not available (caused by zookeeper issue), the rebalance 
will fail. Should 
rebalance succeed in this case? Thanks.


Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Thursday, February 06, 2014 12:49 PM
To: users@kafka.apache.org
Subject: Re: Consumer's behavior when brokers are temporarily not available.

Neha is right, though it is a little interesting that consumers did not achieve 
a concensus after four retries have exhasuted, for just an event of 
broker-failure.

Could you check your consumer log searching for "begin rebalance" and check 
what caused these rebalances to fail?

Guozhang


On Thu, Feb 6, 2014 at 9:12 AM, Neha Narkhede wrote:

> The consumer only retries "rebalance.max.retries" times. Once it runs 
> out of the retries, it needs to be restarted to consume again.
>
>
>
>
> On Thu, Feb 6, 2014 at 9:05 AM, Yu, Libo  wrote:
>
> > Hi folks,
> >
> > This is what we experienced recently:
> > Some zookeeper's issue made broker unavailable for a short period of
> time.
> > On the consumer side, this triggered rebalance and rebalanced failed
> after
> > four tries.
> > So while should we expect while the broker is not up? Should 
> > consumer
> keep
> > trying to rebalance or wait for the brokers to be brought back?  Thanks.
> >
> >
> >
> >
> >
> > Regards,
> >
> > Libo
> >
> >
>



--
-- Guozhang


Consumer's behavior when brokers are temporarily not available.

2014-02-06 Thread Yu, Libo
Hi folks,

This is what we experienced recently:
Some zookeeper's issue made broker unavailable for a short period of time.
On the consumer side, this triggered rebalance and rebalanced failed after
four tries.
So while should we expect while the broker is not up? Should consumer keep
trying to rebalance or wait for the brokers to be brought back?  Thanks.





Regards,

Libo



num.partitions

2014-01-23 Thread Yu, Libo
Hi team,

I believe num.partitions is for automatic topic creation. Is that right?
The default number of partition for kafka-create-topic.sh is 1. So
Will num.partitions impact kafka-create-topic.sh? Thanks.

Regards,

Libo



RE: purging a topic

2014-01-15 Thread Yu, Libo
Thanks, Jun. I checked ImportZkOffsets. It is not very handy and cannot automate
all steps for purging purpose.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, January 14, 2014 11:43 PM
To: users@kafka.apache.org
Subject: Re: purging a topic

You would need to first stop the consumer, update the offset in ZK and then 
restart the consumer. Also, have you looked at the tool ImportZkOffsets?

Thanks,

Jun


On Tue, Jan 14, 2014 at 12:38 PM, Yu, Libo  wrote:

> Hi folks,
>
> I am writing a tool to "purge" the pending topics for a user. Assume 
> the user has never consumed this topic previously. If I create all the 
> nodes on the path /consumers/[myuser]/offsets/[mytopic]/[partition] 
> and put the maximum available offset to the node, is that enough to 
> let the consumer ignore the current pending messages?
>
> Regards,
>
> Libo
>
>


purging a topic

2014-01-14 Thread Yu, Libo
Hi folks,

I am writing a tool to "purge" the pending topics for a user. Assume the user 
has never
consumed this topic previously. If I create all the nodes on the path
/consumers/[myuser]/offsets/[mytopic]/[partition] and put the maximum
available offset to the node, is that enough to let the consumer ignore the 
current pending
messages?

Regards,

Libo



RE: ConsumerRebalanceFailedException

2013-12-30 Thread Yu, Libo
Hi Jun,

zookeeper.session.timeout.ms is used in a broker's configuration and manages 
brokers' registration with zk. 
Does it apply to consumer as well? Thanks.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Monday, December 30, 2013 11:13 AM
To: users@kafka.apache.org
Subject: Re: ConsumerRebalanceFailedException

If the consumer is not shut down properly, it will take 
zookeeper.session.timeout.ms before the consumer is deregistered from ZK.
If you restart the consumer before that, rebalances may fail.

Make sure that you call connector.shutdown() when you shut down the consumer

Thanks,

Jun


On Mon, Dec 30, 2013 at 1:58 AM, Hanish Bansal < 
hanish.bansal.agar...@gmail.com> wrote:

> Hi All,
>
> I am getting consumer rebalance failed exception if i restart my 
> consumer within 1-3 seconds.
>
> Exception trace is:
>
> Caused by: kafka.common.ConsumerRebalanceFailedException:
> indexConsumerGroup1_IMPETUS-I0027C-1388416992091-ac0d82d7 can't 
> rebalance after 4 retries
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedR
> ebalance(Unknown
> Source)
> at
>
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperCons
> umerConnector$$reinitializeConsumer(Unknown
> Source)
> at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams
> (Unknown
> Source)
> at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams
> (Unknown
> Source)
>
>
> Is this exception depends on any of below properties:
> zookeeper.session.timeout.ms  6000
>  zookeeper.connection.timeout.ms 6000
>
> If i kill the consumer and start that again after 5-6 sec then it 
> started working properly without throwing any exception.
>
> If i start consumer immediately after killing that then 
> ConsumerRebalanceFailedException occurs.
>
> Please help !!
>
> --
> *Thanks & Regards*
> *Hanish Bansal*
>


RE: upgrade from beta1 to 0.81

2013-12-20 Thread Yu, Libo
Thanks for sharing your experience. Such kind of manual cleanup may not be 
affordable if there 
are many topics.

Regards,

Libo


-Original Message-
From: Drew Goya [mailto:d...@gradientx.com] 
Sent: Friday, December 20, 2013 12:36 AM
To: users@kafka.apache.org
Subject: Re: upgrade from beta1 to 0.81

We migrated from 0.8.0 to 0.8.1 last week.  We have a 15 broker cluster so it 
took a while to roll through them one by one.  Once I finished I was finally 
able to complete a partition reassignment.  I also had to do some manual 
cleanup, but Neha says it will be fixed soon:

https://issues.apache.org/jira/browse/KAFKA-1074

Until then, if you have done any partition reassignment you will have to watch 
your brokers as they come up.  They may fail and you will have to go delete the 
empty partition directories.


On Thu, Dec 19, 2013 at 11:07 AM, Guozhang Wang  wrote:

> 0.8.1 is working in stable at LinkedIn now.
>
> Guozhang
>
>
> On Thu, Dec 19, 2013 at 10:52 AM, Yu, Libo  wrote:
>
> > I also want to know how stable the 0.81 will be, compared with 0.8 
> > or 0.8-beta1.
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Jason Rosenberg [mailto:j...@squareup.com]
> > Sent: Thursday, December 19, 2013 12:54 PM
> > To: users@kafka.apache.org
> > Subject: Re: upgrade from beta1 to 0.81
> >
> > How stable is 0.8.1, will there be a 'release' of this soon, or are 
> > there still significant open issues?
> >
> > Thanks,
> >
> > Jason
> >
> >
> > On Thu, Dec 19, 2013 at 12:17 PM, Guozhang Wang 
> > wrote:
> >
> > > Libo, yes the upgrade from 0.8 to 0.8.1 can be done in place.
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Dec 19, 2013 at 8:57 AM, Yu, Libo  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > As the tools in 0.8 are not stable and we don't want to take the 
> > > > risk. we want to skip 0.8 and upgrade from beta1 to 0.81 directly.
> > > > So my question is whether we can do an in place upgrade and let 
> > > > 0.81 use beta1's zk and kf data.
> > > > Assume
> > > > that we will disable log compaction. Thanks.
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


RE: upgrade from beta1 to 0.81

2013-12-19 Thread Yu, Libo
I also want to know how stable the 0.81 will be, compared with 0.8 or 0.8-beta1.

Regards,

Libo


-Original Message-
From: Jason Rosenberg [mailto:j...@squareup.com] 
Sent: Thursday, December 19, 2013 12:54 PM
To: users@kafka.apache.org
Subject: Re: upgrade from beta1 to 0.81

How stable is 0.8.1, will there be a 'release' of this soon, or are there still 
significant open issues?

Thanks,

Jason


On Thu, Dec 19, 2013 at 12:17 PM, Guozhang Wang  wrote:

> Libo, yes the upgrade from 0.8 to 0.8.1 can be done in place.
>
> Guozhang
>
>
> On Thu, Dec 19, 2013 at 8:57 AM, Yu, Libo  wrote:
>
> > Hi folks,
> >
> > As the tools in 0.8 are not stable and we don't want to take the 
> > risk. we want to skip 0.8 and upgrade from beta1 to 0.81 directly. 
> > So my question is whether we can do an in place upgrade and let 0.81 
> > use beta1's zk and kf data.
> > Assume
> > that we will disable log compaction. Thanks.
> >
> > Regards,
> >
> > Libo
> >
> >
>
>
> --
> -- Guozhang
>


RE: upgrade from beta1 to 0.81

2013-12-19 Thread Yu, Libo
Hi Guozhang,

What I'd like to know is whether the upgrade from 0.8-beta1 to 0.81 will be in 
place.

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Thursday, December 19, 2013 12:18 PM
To: users@kafka.apache.org
Subject: Re: upgrade from beta1 to 0.81

Libo, yes the upgrade from 0.8 to 0.8.1 can be done in place.

Guozhang


On Thu, Dec 19, 2013 at 8:57 AM, Yu, Libo  wrote:

> Hi folks,
>
> As the tools in 0.8 are not stable and we don't want to take the risk. 
> we want to skip 0.8 and upgrade from beta1 to 0.81 directly. So my 
> question is whether we can do an in place upgrade and let 0.81 use 
> beta1's zk and kf data.
> Assume
> that we will disable log compaction. Thanks.
>
> Regards,
>
> Libo
>
>


--
-- Guozhang


upgrade from beta1 to 0.81

2013-12-19 Thread Yu, Libo
Hi folks,

As the tools in 0.8 are not stable and we don't want to take the risk. we want
to skip 0.8 and upgrade from beta1 to 0.81 directly. So my question is whether
we can do an in place upgrade and let 0.81 use beta1's zk and kf data. Assume
that we will disable log compaction. Thanks.

Regards,

Libo



RE: a consumer question

2013-12-18 Thread Yu, Libo
Thanks for confirming that, Guozhang.

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Wednesday, December 18, 2013 4:34 PM
To: users@kafka.apache.org
Subject: Re: a consumer question

Jun is right. Just checked the code. If you set consumer.timeout.ms to 0 then 
if there is no message a ConsumerTimeoutException will be thrown right away.


On Tue, Dec 17, 2013 at 9:08 PM, Jun Rao  wrote:

> Actually, hasNext() only returns false when the consumer connector is 
> shutdown. Typically, you either set consumer.timeout.ms to -1 or a 
> value larger than 0. If it's set to 0, my guess is that it throws a 
> timeout exception immediately if there is no more message.
>
> Thanks,
>
> Jun
>
>
> On Tue, Dec 17, 2013 at 4:57 PM, Guozhang Wang  wrote:
>
> > If there is no more messages, hasNext will return false instead of
> throwing
> > an exception.
> >
> > Guozhang
> >
> >
> > On Tue, Dec 17, 2013 at 11:53 AM, Yu, Libo  wrote:
> >
> > > Sorry, a typo. Correct my question. When consumer.timeout.ms is 
> > > set to
> > 0,
> > >  if there is no
> > > message available, hasNext() throws a timeout exception, otherwise 
> > > it returns true.
> > > Is that the right behavior?
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> > > -Original Message-
> > > From: Jun Rao [mailto:jun...@gmail.com]
> > > Sent: Tuesday, December 17, 2013 12:40 AM
> > > To: users@kafka.apache.org
> > > Subject: Re: a consumer question
> > >
> > > If there is a message, hasNext() returns true, not throwing an
> exception.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Dec 16, 2013 at 11:29 AM, Yu, Libo  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > For this parameters, if consumer.timeout.ms is set to 0, 
> > > > whenever I call ConsumerIterator's hasNext(), if there is a 
> > > > message available, a timeout exception will be thrown. Is my 
> > > > understanding correct?
> Thanks.
> > > >
> > > > consumer.timeout.ms
> > > >
> > > > -1
> > > >
> > > > Throw a timeout exception to the consumer if no message is 
> > > > available for consumption after the specified interval
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



--
-- Guozhang


RE: a consumer question

2013-12-18 Thread Yu, Libo
Thanks, Jun. That is also my guess:) If the exception is caught, I can easily
convert hasNext() from blocking to nonblocking.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Wednesday, December 18, 2013 12:09 AM
To: users@kafka.apache.org
Subject: Re: a consumer question

Actually, hasNext() only returns false when the consumer connector is shutdown. 
Typically, you either set consumer.timeout.ms to -1 or a value larger than 0. 
If it's set to 0, my guess is that it throws a timeout exception immediately if 
there is no more message.

Thanks,

Jun


On Tue, Dec 17, 2013 at 4:57 PM, Guozhang Wang  wrote:

> If there is no more messages, hasNext will return false instead of 
> throwing an exception.
>
> Guozhang
>
>
> On Tue, Dec 17, 2013 at 11:53 AM, Yu, Libo  wrote:
>
> > Sorry, a typo. Correct my question. When consumer.timeout.ms is set 
> > to
> 0,
> >  if there is no
> > message available, hasNext() throws a timeout exception, otherwise 
> > it returns true.
> > Is that the right behavior?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Jun Rao [mailto:jun...@gmail.com]
> > Sent: Tuesday, December 17, 2013 12:40 AM
> > To: users@kafka.apache.org
> > Subject: Re: a consumer question
> >
> > If there is a message, hasNext() returns true, not throwing an exception.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Dec 16, 2013 at 11:29 AM, Yu, Libo  wrote:
> >
> > > Hi folks,
> > >
> > > For this parameters, if consumer.timeout.ms is set to 0, whenever 
> > > I call ConsumerIterator's hasNext(), if there is a message 
> > > available, a timeout exception will be thrown. Is my understanding 
> > > correct? Thanks.
> > >
> > > consumer.timeout.ms
> > >
> > > -1
> > >
> > > Throw a timeout exception to the consumer if no message is 
> > > available for consumption after the specified interval
> > >
> > >
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


RE: a consumer question

2013-12-17 Thread Yu, Libo
Sorry, a typo. Correct my question. When consumer.timeout.ms is set to 0,  if 
there is no 
message available, hasNext() throws a timeout exception, otherwise it returns 
true.
Is that the right behavior? 

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, December 17, 2013 12:40 AM
To: users@kafka.apache.org
Subject: Re: a consumer question

If there is a message, hasNext() returns true, not throwing an exception.

Thanks,

Jun


On Mon, Dec 16, 2013 at 11:29 AM, Yu, Libo  wrote:

> Hi folks,
>
> For this parameters, if consumer.timeout.ms is set to 0, whenever I 
> call ConsumerIterator's hasNext(), if there is a message available, a 
> timeout exception will be thrown. Is my understanding correct? Thanks.
>
> consumer.timeout.ms
>
> -1
>
> Throw a timeout exception to the consumer if no message is available 
> for consumption after the specified interval
>
>
>
> Regards,
>
> Libo
>
>


a consumer question

2013-12-16 Thread Yu, Libo
Hi folks,

For this parameters, if consumer.timeout.ms is set to 0, whenever I call 
ConsumerIterator's hasNext(),
if there is a message available, a timeout exception will be thrown. Is my 
understanding correct? Thanks.

consumer.timeout.ms

-1

Throw a timeout exception to the consumer if no message is available for 
consumption after the specified interval



Regards,

Libo



RE: cluster expansion

2013-12-16 Thread Yu, Libo
For the 1st question, I don't quite get the answer.
Say I have broker 1,2,3,4,5,6. And the topic A has three partitions on broker 
1,2,3.
If I add another 3 partitions for A. They will occupy 4, 5, 6? 

One more  question. Does the partition addition tool work 
only with zookeeper? Can I keep the zk cluster running and shut down kafka 
processes
before I run the partition addition tool? I am asking this question because we 
try to avoid 
any message loss when adding a partition in a production environment. Thanks.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Monday, December 16, 2013 12:13 PM
To: users@kafka.apache.org
Subject: Re: cluster expansion

If we have six brokers, and a topic has three partitions on broker 1, 2, 3.
Now if I add another three partitions. Will they be evenly distributed to
1,2,3 or 4,5,6?

Evenly distributed on all six brokers.

If I use reassignment tool in 0.81 with 0.8 broker, will that work and get 
around the bugs?

Your broker also needs to be on 0.8.1 for it to work correctly.



On Mon, Dec 16, 2013 at 9:06 AM, Yu, Libo  wrote:

> If we have six brokers, and a topic has three partitions on broker 1, 2, 3.
> Now if I add another three partitions. Will they be evenly distributed 
> to
> 1,2,3 or 4,5,6?
>
> One more question.
> If I use reassignment tool in 0.81 with 0.8 broker, will that work and 
> get around the bugs?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Monday, December 16, 2013 10:26 AM
> To: users@kafka.apache.org
> Subject: RE: cluster expansion
>
> They will be evenly distributed across the nodes in the cluster.
>
> Thanks,
> Neha
> On Dec 16, 2013 6:42 AM, "Yu, Libo"  wrote:
>
> > Assume we have three brokers and a topic already has three partitions.
> > Now if I add another three partitions to this topic. Where will the 
> > three partitions be located after running the script?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Jun Rao [mailto:jun...@gmail.com]
> > Sent: Sunday, December 15, 2013 11:43 PM
> > To: users@kafka.apache.org
> > Subject: Re: cluster expansion
> >
> > Yes, kafka-add-partitons.sh should be stable in 0.8.0.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Sat, Dec 14, 2013 at 1:22 AM, Robert Turner 
> wrote:
> >
> > > Is the kafka-add-partitons.sh tool stable in 0.8.0?
> > >
> > >
> > > On 13 December 2013 19:21, Neha Narkhede 
> > wrote:
> > >
> > > > Partition movement is not an automatic operation in Kafka yet. 
> > > > You need
> > > to
> > > > use the partition reassignment tool -
> > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Replication+tool
> > > s# Re plicationtools-6.ReassignPartitionsTool
> > > > .
> > > >
> > > >
> > > > Also, that feature is stable in 0.8.1.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Fri, Dec 13, 2013 at 6:48 AM, Robert Turner 
> > > > 
> > > wrote:
> > > >
> > > > > No the 6 partitions for each topic will remain on the original
> > brokers.
> > > > You
> > > > > could either reassign some partitions from all topics to the 
> > > > > new
> > > brokers
> > > > or
> > > > > you could add partitions to the new brokers for each topic. In
> > > > > 0.8.0
> > > > there
> > > > > is now an add-partitions tool.
> > > > >
> > > > > Cheers
> > > > >Rob Turner.
> > > > >
> > > > >
> > > > > On 13 December 2013 14:42, Yu, Libo  wrote:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > There are three brokers running 0.8-beta1 in our cluster
> currently.
> > > > > Assume
> > > > > > all the topics have six partitions.
> > > > > > I am going to add another three brokers to the cluster and 
> > > > > > upgrade
> > > all
> > > > of
> > > > > > them to 0.8. My question is after the cluster is up, will 
> > > > > > the partition be evenly distributed to all brokers? Thanks.
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Libo
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Cheers
> > > > >Rob.
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Cheers
> > >Rob.
> > >
> >
>


RE: cluster expansion

2013-12-16 Thread Yu, Libo
If we have six brokers, and a topic has three partitions on broker 1, 2, 3.
Now if I add another three partitions. Will they be evenly distributed to
1,2,3 or 4,5,6?

One more question.
If I use reassignment tool in 0.81 with 0.8 broker, will that work and get 
around the bugs? 

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Monday, December 16, 2013 10:26 AM
To: users@kafka.apache.org
Subject: RE: cluster expansion

They will be evenly distributed across the nodes in the cluster.

Thanks,
Neha
On Dec 16, 2013 6:42 AM, "Yu, Libo"  wrote:

> Assume we have three brokers and a topic already has three partitions.
> Now if I add another three partitions to this topic. Where will the 
> three partitions be located after running the script?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Sunday, December 15, 2013 11:43 PM
> To: users@kafka.apache.org
> Subject: Re: cluster expansion
>
> Yes, kafka-add-partitons.sh should be stable in 0.8.0.
>
> Thanks,
>
> Jun
>
>
> On Sat, Dec 14, 2013 at 1:22 AM, Robert Turner  wrote:
>
> > Is the kafka-add-partitons.sh tool stable in 0.8.0?
> >
> >
> > On 13 December 2013 19:21, Neha Narkhede 
> wrote:
> >
> > > Partition movement is not an automatic operation in Kafka yet. You 
> > > need
> > to
> > > use the partition reassignment tool -
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#
> > Re plicationtools-6.ReassignPartitionsTool
> > > .
> > >
> > >
> > > Also, that feature is stable in 0.8.1.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Fri, Dec 13, 2013 at 6:48 AM, Robert Turner 
> > > 
> > wrote:
> > >
> > > > No the 6 partitions for each topic will remain on the original
> brokers.
> > > You
> > > > could either reassign some partitions from all topics to the new
> > brokers
> > > or
> > > > you could add partitions to the new brokers for each topic. In
> > > > 0.8.0
> > > there
> > > > is now an add-partitions tool.
> > > >
> > > > Cheers
> > > >Rob Turner.
> > > >
> > > >
> > > > On 13 December 2013 14:42, Yu, Libo  wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > There are three brokers running 0.8-beta1 in our cluster currently.
> > > > Assume
> > > > > all the topics have six partitions.
> > > > > I am going to add another three brokers to the cluster and 
> > > > > upgrade
> > all
> > > of
> > > > > them to 0.8. My question is after the cluster is up, will the 
> > > > > partition be evenly distributed to all brokers? Thanks.
> > > > >
> > > > > Regards,
> > > > >
> > > > > Libo
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Cheers
> > > >Rob.
> > > >
> > >
> >
> >
> >
> > --
> > Cheers
> >Rob.
> >
>


RE: cluster expansion

2013-12-16 Thread Yu, Libo
Assume we have three brokers and a topic already has three partitions.
Now if I add another three partitions to this topic. Where will the three 
partitions be located after running the script?

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Sunday, December 15, 2013 11:43 PM
To: users@kafka.apache.org
Subject: Re: cluster expansion

Yes, kafka-add-partitons.sh should be stable in 0.8.0.

Thanks,

Jun


On Sat, Dec 14, 2013 at 1:22 AM, Robert Turner  wrote:

> Is the kafka-add-partitons.sh tool stable in 0.8.0?
>
>
> On 13 December 2013 19:21, Neha Narkhede  wrote:
>
> > Partition movement is not an automatic operation in Kafka yet. You 
> > need
> to
> > use the partition reassignment tool -
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Re
> plicationtools-6.ReassignPartitionsTool
> > .
> >
> >
> > Also, that feature is stable in 0.8.1.
> >
> > Thanks,
> > Neha
> >
> >
> > On Fri, Dec 13, 2013 at 6:48 AM, Robert Turner 
> wrote:
> >
> > > No the 6 partitions for each topic will remain on the original brokers.
> > You
> > > could either reassign some partitions from all topics to the new
> brokers
> > or
> > > you could add partitions to the new brokers for each topic. In 
> > > 0.8.0
> > there
> > > is now an add-partitions tool.
> > >
> > > Cheers
> > >Rob Turner.
> > >
> > >
> > > On 13 December 2013 14:42, Yu, Libo  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > There are three brokers running 0.8-beta1 in our cluster currently.
> > > Assume
> > > > all the topics have six partitions.
> > > > I am going to add another three brokers to the cluster and 
> > > > upgrade
> all
> > of
> > > > them to 0.8. My question is after the cluster is up, will the 
> > > > partition be evenly distributed to all brokers? Thanks.
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > >
> > >
> > > --
> > > Cheers
> > >Rob.
> > >
> >
>
>
>
> --
> Cheers
>Rob.
>


cluster expansion

2013-12-13 Thread Yu, Libo
Hi folks,

There are three brokers running 0.8-beta1 in our cluster currently. Assume all 
the topics have six partitions.
I am going to add another three brokers to the cluster and upgrade all of them 
to 0.8. My question is after
the cluster is up, will the partition be evenly distributed to all brokers? 
Thanks.

Regards,

Libo



RE: error from adding a partition

2013-12-10 Thread Yu, Libo
I figured out the cause. After compiling 0.8, in core/target/scala-2.8.0/
somehow kafka_2.8.0-0.8.0-beta1's jars are generated as well and that
caused the error.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, December 10, 2013 10:56 AM
To: users@kafka.apache.org
Subject: Re: error from adding a partition

Tried this on the 0.8.0 release and it works for me. Could you make sure there 
are no duplicated kafka jars?

Thanks,

Jun


On Tue, Dec 10, 2013 at 7:08 AM, Yu, Libo  wrote:

> Hi folks,
>
> I got this error when I tried to test the partition addition tool.
> bin/kafka-add-partitions.sh --partition 1 --topic libotesttopic 
> --zookeeper xx.xxx.xxx.xx: adding partitions failed because of 
> kafka.admin.AdminUtils$.assignReplicasToBrokers(Lscala/collection/Seq;
> )Lscala/collection/Map;
> java.lang.NoSuchMethodError:
> kafka.admin.AdminUtils$.assignReplicasToBrokers(Lscala/collection/Seq;)Lscala/collection/Map;
> at
> kafka.admin.AddPartitionsCommand$.addPartitions(AddPartitionsCommand.scala:90)
> at
> kafka.admin.AddPartitionsCommand$.main(AddPartitionsCommand.scala:68)
> at
> kafka.admin.AddPartitionsCommand.main(AddPartitionsCommand.scala)
>
> Did I miss anything here? Thanks.
>
> Regards,
>
> Libo
>
>


error from adding a partition

2013-12-10 Thread Yu, Libo
Hi folks,

I got this error when I tried to test the partition addition tool.
bin/kafka-add-partitions.sh --partition 1 --topic libotesttopic --zookeeper 
xx.xxx.xxx.xx:
adding partitions failed because of 
kafka.admin.AdminUtils$.assignReplicasToBrokers(Lscala/collection/Seq;)Lscala/collection/Map;
java.lang.NoSuchMethodError: 
kafka.admin.AdminUtils$.assignReplicasToBrokers(Lscala/collection/Seq;)Lscala/collection/Map;
at 
kafka.admin.AddPartitionsCommand$.addPartitions(AddPartitionsCommand.scala:90)
at kafka.admin.AddPartitionsCommand$.main(AddPartitionsCommand.scala:68)
at kafka.admin.AddPartitionsCommand.main(AddPartitionsCommand.scala)

Did I miss anything here? Thanks.

Regards,

Libo



RE: set new retention size on the fly

2013-12-04 Thread Yu, Libo
Great. Thanks, Jun.

Regards,

Libo

-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Wednesday, December 04, 2013 12:46 PM
To: users@kafka.apache.org
Subject: Re: set new retention size on the fly

In 0.8.1, per topic config will only be persisted in ZK, not in the local 
server property file.

To change a topic config, you would do
  kafka-topics.sh --alter --config

To change # partitons in a topic, you would do
  kafka-topics.sh --alter --partitions

Our tentative future release schedule is documented in 
https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan

Thanks,

Jun


On Wed, Dec 4, 2013 at 8:15 AM, Yu, Libo  wrote:

> Thanks for the clarification. I am just curious about how this works out.
> If we can change the retention size with "kafka-topics.sh --alter", 
> will the new retention size be updated to the servers.properties?
> If there is any documentation, that will be great.  Another question 
> is whether you have any release plan for 0.8.1. Thanks.
>
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Tuesday, December 03, 2013 8:57 PM
> To: users@kafka.apache.org
> Subject: Re: set new retention size on the fly
>
> Just to clarify. Even though # partition is a config property of a 
> topic, it has to be done through the add partition admin command. 
> Other topic level config changes can be made through "kafka-topics.sh 
> --alter" in 0.8.1 (trunk).
>
> Thanks,
>
> Jun
>
>
> On Tue, Dec 3, 2013 at 3:56 PM, Jay Kreps  wrote:
>
> > 0.8 includes the partition increasing tool. 0.8.1 makes all 
> > per-topic configuration dynamic and updatable via a command line tool.
> >
> > -Jay
> >
> >
> > On Tue, Dec 3, 2013 at 1:23 PM, Yu, Libo  wrote:
> >
> > > Hi Neha,
> > >
> > > "0.8.1 includes the ability to dynamically change per topic configs."
> > > Do you mean number of partitions or retention size?
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> > > -Original Message-
> > > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > > Sent: Tuesday, December 03, 2013 1:25 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: set new retention size on the fly
> > >
> > > For adding partitions, look at
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#
> > Re
> > plicationtools-5.AddPartitionTool
> > > 0.8.1 includes the ability to dynamically change per topic configs.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Tue, Dec 3, 2013 at 10:21 AM, Yu, Libo  wrote:
> > >
> > > > Hi folks,
> > > >
> > > > For 0.8, it is possible to add a partition dynamically. Is it 
> > > > possible to increase the retention size on the fly? This feature 
> > > > will be very useful for operation. I know rolling start can pick 
> > > > up the change but it takes too much effort. Thanks.
> > > >
> > > > Libo
> > > >
> > > >
> > >
> >
>


RE: set new retention size on the fly

2013-12-04 Thread Yu, Libo
Thanks for the clarification. I am just curious about how this works out.
If we can change the retention size with "kafka-topics.sh --alter",
will the new retention size be updated to the servers.properties?
If there is any documentation, that will be great.  Another question
is whether you have any release plan for 0.8.1. Thanks.


Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, December 03, 2013 8:57 PM
To: users@kafka.apache.org
Subject: Re: set new retention size on the fly

Just to clarify. Even though # partition is a config property of a topic, it 
has to be done through the add partition admin command. Other topic level 
config changes can be made through "kafka-topics.sh --alter" in 0.8.1 (trunk).

Thanks,

Jun


On Tue, Dec 3, 2013 at 3:56 PM, Jay Kreps  wrote:

> 0.8 includes the partition increasing tool. 0.8.1 makes all per-topic 
> configuration dynamic and updatable via a command line tool.
>
> -Jay
>
>
> On Tue, Dec 3, 2013 at 1:23 PM, Yu, Libo  wrote:
>
> > Hi Neha,
> >
> > "0.8.1 includes the ability to dynamically change per topic configs."
> > Do you mean number of partitions or retention size?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > Sent: Tuesday, December 03, 2013 1:25 PM
> > To: users@kafka.apache.org
> > Subject: Re: set new retention size on the fly
> >
> > For adding partitions, look at
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Re
> plicationtools-5.AddPartitionTool
> > 0.8.1 includes the ability to dynamically change per topic configs.
> >
> > Thanks,
> > Neha
> >
> >
> > On Tue, Dec 3, 2013 at 10:21 AM, Yu, Libo  wrote:
> >
> > > Hi folks,
> > >
> > > For 0.8, it is possible to add a partition dynamically. Is it 
> > > possible to increase the retention size on the fly? This feature 
> > > will be very useful for operation. I know rolling start can pick 
> > > up the change but it takes too much effort. Thanks.
> > >
> > > Libo
> > >
> > >
> >
>


RE: set new retention size on the fly

2013-12-03 Thread Yu, Libo
Hi Neha,

"0.8.1 includes the ability to dynamically change per topic configs."
Do you mean number of partitions or retention size?

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Tuesday, December 03, 2013 1:25 PM
To: users@kafka.apache.org
Subject: Re: set new retention size on the fly

For adding partitions, look at
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionTool
0.8.1 includes the ability to dynamically change per topic configs.

Thanks,
Neha


On Tue, Dec 3, 2013 at 10:21 AM, Yu, Libo  wrote:

> Hi folks,
>
> For 0.8, it is possible to add a partition dynamically. Is it possible 
> to increase the retention size on the fly? This feature will be very 
> useful for operation. I know rolling start can pick up the change but 
> it takes too much effort. Thanks.
>
> Libo
>
>


set new retention size on the fly

2013-12-03 Thread Yu, Libo
Hi folks,

For 0.8, it is possible to add a partition dynamically. Is it possible to
increase the retention size on the fly? This feature will be very useful
for operation. I know rolling start can pick up the change but it takes
too much effort. Thanks.

Libo



RE: ConsumerRebalanceFailedException

2013-12-02 Thread Yu, Libo
Thanks for your insights, Jun. That is really helpful. I forgot to mention the 
cause of the issue in my previous 
Email. We have three brokers. I notice from the log that all three brokers 
re-registered themselves with zk.
That means all of them were somehow offline for a short time and then 
automatically got online again. That 
caused the rebalance failure. While all the brokers are offline, I assume a 
consumer will constantly retry to 
establish connection again. How long is the interval between the retries? Is it 
max.fetch.wait + socket.timeout.ms?
Thanks.

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Monday, December 02, 2013 11:55 AM
To: users@kafka.apache.org
Subject: Re: ConsumerRebalanceFailedException

Is the failure on the last rebalance? If so, some partitions will not have any 
consumers. A common reason for rebalance failure is that there is conflict in 
owning partitions among different consumers in the same group.
Increasing the # retries and the amount of backoff time btw retires should 
help. Our default setting should be good enough if there are not too many 
topics being subscribed and the ZK latency is normal.

Thanks,

Jun


On Mon, Dec 2, 2013 at 6:57 AM, Yu, Libo  wrote:

> Actually, I saw this line in the log : can't rebalance after 4 retries.
> What should I expect in this case? All consumers threads failed or 
> only some of them?
> If I increase the number of retries or delay between retries, will 
> that help?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Friday, November 29, 2013 8:50 PM
> To: users@kafka.apache.org
> Subject: Re: ConsumerRebalanceFailedException
>
> Transient rebalance failures are ok. However, it's important that the 
> last rebalance in a sequence succeeds. Otherwise, some of the 
> partitions will not be consumed by any consumers.
>
> Thanks,
>
> Jun
>
>
> On Fri, Nov 29, 2013 at 10:44 AM, Yu, Libo  wrote:
>
> > You are right, Joe. I checked our brokers' log. We have three brokers.
> > All of them failed to connect to zk at some point.
> > So they were offline and later reregistered themselves with the zk. 
> > I don't know how many rebalance should be triggered in that case. 
> > There is only one exception found in consumer's log. My question is 
> > whether users need to do anything to handle 
> > ConsumerRebalanceFailedException.
> >
> > This is from consumer log:
> >
> > [28/11/13 16:38:56:056 PM EST] 102 ERROR
> > consumer.ZookeeperConsumerConnector: [xx ], error during 
> > syncedRebalance
> > kafka.common.ConsumerRebalanceFailedException: x can't 
> > rebalance after 4 retries
> > at
> > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.synce
> > dR
> > eb
> > alance(ZookeeperConsumerConnector.scala:397)
> > at
> > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon
> > $1
> > .r
> > un(ZookeeperConsumerConnector.scala:326)
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Joe Stein [mailto:joe.st...@stealth.ly]
> > Sent: Friday, November 29, 2013 11:57 AM
> > To: users@kafka.apache.org
> > Subject: Re: ConsumerRebalanceFailedException
> >
> > What is the full stack trace?  if you see "can't rebalance after 4
> retries"
> > then likely the problem is the broker is down or not available
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > /
> >
> >
> > On Fri, Nov 29, 2013 at 11:31 AM, Yu, Libo  wrote:
> >
> > > We found our consumer stopped working after this exception occurred.
> > > Can the consumer recover from such an exception?
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> > > -Original Message-
> > > From: Florin Trofin [mailto:ftro...@adobe.com]
> > > Sent: Tuesday, July 16, 2013 4:20 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: ConsumerRebalanceFailedException
> > >
> > > Yes, I think these are two separate issues.
> > >
> > > F.
> > >
> > > On 7/16/13 11:32 AM, "Joel Koshy"  wrote:
> > >
> > > >From a user's perspective, ConsumerRebalanceException is a bit 
&

RE: upgrade from 0.8-beta1 to 0.8

2013-12-02 Thread Yu, Libo
Thanks for confirming that, Jun.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Friday, November 29, 2013 8:28 PM
To: users@kafka.apache.org
Subject: Re: upgrade from 0.8-beta1 to 0.8

You should be able to do an in-place upgrade.

Thanks,

Jun


On Fri, Nov 29, 2013 at 6:35 AM, Yu, Libo  wrote:

> Hi team,
>
> Currently we are using 0.8-beta1. We plan to upgrade to 0.8. My 
> concern is whether we need to purge all existing kafka and zookeeper 
> data on the hard drive for this upgrade. In other words, can 0.8 use 
> 0.8-beta1 kafka and zookeeper data on the hard drive? Thanks.
>
> Regards,
>
> Libo
>
>


RE: ConsumerRebalanceFailedException

2013-12-02 Thread Yu, Libo
Actually, I saw this line in the log : can't rebalance after 4 retries.
What should I expect in this case? All consumers threads failed or only some of 
them?
If I increase the number of retries or delay between retries, will that help?

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Friday, November 29, 2013 8:50 PM
To: users@kafka.apache.org
Subject: Re: ConsumerRebalanceFailedException

Transient rebalance failures are ok. However, it's important that the last 
rebalance in a sequence succeeds. Otherwise, some of the partitions will not be 
consumed by any consumers.

Thanks,

Jun


On Fri, Nov 29, 2013 at 10:44 AM, Yu, Libo  wrote:

> You are right, Joe. I checked our brokers' log. We have three brokers. 
> All of them failed to connect to zk at some point.
> So they were offline and later reregistered themselves with the zk. I 
> don't know how many rebalance should be triggered in that case. There 
> is only one exception found in consumer's log. My question is whether 
> users need to do anything to handle ConsumerRebalanceFailedException.
>
> This is from consumer log:
>
> [28/11/13 16:38:56:056 PM EST] 102 ERROR
> consumer.ZookeeperConsumerConnector: [xx ], error during 
> syncedRebalance
> kafka.common.ConsumerRebalanceFailedException: x can't 
> rebalance after 4 retries
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedR
> eb
> alance(ZookeeperConsumerConnector.scala:397)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1
> .r
> un(ZookeeperConsumerConnector.scala:326)
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Joe Stein [mailto:joe.st...@stealth.ly]
> Sent: Friday, November 29, 2013 11:57 AM
> To: users@kafka.apache.org
> Subject: Re: ConsumerRebalanceFailedException
>
> What is the full stack trace?  if you see "can't rebalance after 4 retries"
> then likely the problem is the broker is down or not available
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> /
>
>
> On Fri, Nov 29, 2013 at 11:31 AM, Yu, Libo  wrote:
>
> > We found our consumer stopped working after this exception occurred.
> > Can the consumer recover from such an exception?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Florin Trofin [mailto:ftro...@adobe.com]
> > Sent: Tuesday, July 16, 2013 4:20 PM
> > To: users@kafka.apache.org
> > Subject: Re: ConsumerRebalanceFailedException
> >
> > Yes, I think these are two separate issues.
> >
> > F.
> >
> > On 7/16/13 11:32 AM, "Joel Koshy"  wrote:
> >
> > >From a user's perspective, ConsumerRebalanceException is a bit 
> > >cryptic -I think the other thread was to provide a more informative 
> > >message and also be able to recover when a broker does come up 
> > >(fixed in KAFKA-969).
> > >
> > >Thanks,
> > >
> > >Joel
> > >
> > >On Tue, Jul 16, 2013 at 11:04 AM, Vaibhav Puranik 
> > >
> > >wrote:
> > >> Thank you Joel.
> > >>
> > >> In a different but related thread, somebody is asking to rename 
> > >> the exception as NoBrokerAvailableExcption. But given the 
> > >> description above, the exception seems to be named appropriately.
> > >>
> > >> Regards,
> > >> Vaibhav
> > >>
> > >>
> > >> On Tue, Jul 16, 2013 at 12:05 AM, Joel Koshy 
> > >>
> > >>wrote:
> > >>
> > >>> Yes - rebalance => consumers trying to coordinate through ZK.
> > >>> Rebalances can happen when one or more of the following happen:
> > >>> - a consumed topic partition appears or disappears - i.e., if a 
> > >>> broker comes or goes.
> > >>> - a consumer instance in the group comes or goes "goes" could 
> > >>> also be triggered by session expirations in zookeeper - 
> > >>> typically caused by client-side GC or flaky connections to zookeeper.
> > >>>
> > >>> On Mon, Jul 15, 2013 at 10:15 AM, Vaibhav Puranik 
> > >>> 
> > >>> wrote:
> > >>> > Hi all,
> > >>> >
> > >>> > We have a small

RE: ConsumerRebalanceFailedException

2013-11-29 Thread Yu, Libo
You are right, Joe. I checked our brokers' log. We have three brokers. All of 
them failed to connect to zk at some point.  
So they were offline and later reregistered themselves with the zk. I don't 
know how many rebalance should be 
triggered in that case. There is only one exception found in consumer's log. My 
question is whether users need to do 
anything to handle ConsumerRebalanceFailedException. 

This is from consumer log:

[28/11/13 16:38:56:056 PM EST] 102 ERROR consumer.ZookeeperConsumerConnector: 
[xx
], error during syncedRebalance
kafka.common.ConsumerRebalanceFailedException: x can't rebalance after 
4 retries
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedReb
alance(ZookeeperConsumerConnector.scala:397)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.r
un(ZookeeperConsumerConnector.scala:326)

Regards,

Libo


-Original Message-
From: Joe Stein [mailto:joe.st...@stealth.ly] 
Sent: Friday, November 29, 2013 11:57 AM
To: users@kafka.apache.org
Subject: Re: ConsumerRebalanceFailedException

What is the full stack trace?  if you see "can't rebalance after 4 retries"
then likely the problem is the broker is down or not available

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
****/


On Fri, Nov 29, 2013 at 11:31 AM, Yu, Libo  wrote:

> We found our consumer stopped working after this exception occurred.
> Can the consumer recover from such an exception?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Florin Trofin [mailto:ftro...@adobe.com]
> Sent: Tuesday, July 16, 2013 4:20 PM
> To: users@kafka.apache.org
> Subject: Re: ConsumerRebalanceFailedException
>
> Yes, I think these are two separate issues.
>
> F.
>
> On 7/16/13 11:32 AM, "Joel Koshy"  wrote:
>
> >From a user's perspective, ConsumerRebalanceException is a bit 
> >cryptic -I think the other thread was to provide a more informative 
> >message and also be able to recover when a broker does come up (fixed 
> >in KAFKA-969).
> >
> >Thanks,
> >
> >Joel
> >
> >On Tue, Jul 16, 2013 at 11:04 AM, Vaibhav Puranik 
> >
> >wrote:
> >> Thank you Joel.
> >>
> >> In a different but related thread, somebody is asking to rename the 
> >> exception as NoBrokerAvailableExcption. But given the description 
> >> above, the exception seems to be named appropriately.
> >>
> >> Regards,
> >> Vaibhav
> >>
> >>
> >> On Tue, Jul 16, 2013 at 12:05 AM, Joel Koshy 
> >>wrote:
> >>
> >>> Yes - rebalance => consumers trying to coordinate through ZK.
> >>> Rebalances can happen when one or more of the following happen:
> >>> - a consumed topic partition appears or disappears - i.e., if a 
> >>> broker comes or goes.
> >>> - a consumer instance in the group comes or goes "goes" could also 
> >>> be triggered by session expirations in zookeeper - typically 
> >>> caused by client-side GC or flaky connections to zookeeper.
> >>>
> >>> On Mon, Jul 15, 2013 at 10:15 AM, Vaibhav Puranik 
> >>> 
> >>> wrote:
> >>> > Hi all,
> >>> >
> >>> > We have a small Kafka cluster (0.7.1 - 3 nodes) in EC2. The load 
> >>> > is
> >>>about
> >>> > 200 million events per day, each being few kilobytes. We have a
> >>>single
> >>> node
> >>> > zookeeper.
> >>> >
> >>> > Yesterday suddenly our Kafka clients started throwing the 
> >>> > following
> >>> > exception:
> >>> > java.lang.RuntimeException:
> >>> kafka.common.ConsumerRebalanceFailedException:
> >>> >
> >>>CONSUMER_GROUP_NAME_ip-00-00-00-00.ec2.internal-1373821190828-5f78e
> >>>9a
> >>>f
> >>> > can't rebalance after 4 retries
> >>> > at
> >>> >
> >>>
> >>>com.gumgum.kafka.consumer.KafkaTemplate.executeWithBatch(KafkaTempl
> >>>at
> >>>e.j
> >>>ava:59)
> >>> > at
> >>> >
> >>>
> >>>com.gumgum.storm.fileupload.GenericKafkaSpout.nextTuple(GenericKafk
> >>>aS
> >>>pou
> >>>t.java:73)
> >>> > at
>

RE: ConsumerRebalanceFailedException

2013-11-29 Thread Yu, Libo
We found our consumer stopped working after this exception occurred.
Can the consumer recover from such an exception?

Regards,

Libo


-Original Message-
From: Florin Trofin [mailto:ftro...@adobe.com] 
Sent: Tuesday, July 16, 2013 4:20 PM
To: users@kafka.apache.org
Subject: Re: ConsumerRebalanceFailedException

Yes, I think these are two separate issues.

F.

On 7/16/13 11:32 AM, "Joel Koshy"  wrote:

>From a user's perspective, ConsumerRebalanceException is a bit cryptic 
>-I think the other thread was to provide a more informative message and 
>also be able to recover when a broker does come up (fixed in 
>KAFKA-969).
>
>Thanks,
>
>Joel
>
>On Tue, Jul 16, 2013 at 11:04 AM, Vaibhav Puranik 
>wrote:
>> Thank you Joel.
>>
>> In a different but related thread, somebody is asking to rename the 
>> exception as NoBrokerAvailableExcption. But given the description 
>> above, the exception seems to be named appropriately.
>>
>> Regards,
>> Vaibhav
>>
>>
>> On Tue, Jul 16, 2013 at 12:05 AM, Joel Koshy 
>>wrote:
>>
>>> Yes - rebalance => consumers trying to coordinate through ZK.
>>> Rebalances can happen when one or more of the following happen:
>>> - a consumed topic partition appears or disappears - i.e., if a 
>>> broker comes or goes.
>>> - a consumer instance in the group comes or goes "goes" could also 
>>> be triggered by session expirations in zookeeper - typically caused 
>>> by client-side GC or flaky connections to zookeeper.
>>>
>>> On Mon, Jul 15, 2013 at 10:15 AM, Vaibhav Puranik 
>>> 
>>> wrote:
>>> > Hi all,
>>> >
>>> > We have a small Kafka cluster (0.7.1 - 3 nodes) in EC2. The load 
>>> > is
>>>about
>>> > 200 million events per day, each being few kilobytes. We have a
>>>single
>>> node
>>> > zookeeper.
>>> >
>>> > Yesterday suddenly our Kafka clients started throwing the 
>>> > following
>>> > exception:
>>> > java.lang.RuntimeException:
>>> kafka.common.ConsumerRebalanceFailedException:
>>> > 
>>>CONSUMER_GROUP_NAME_ip-00-00-00-00.ec2.internal-1373821190828-5f78e9a
>>>f
>>> > can't rebalance after 4 retries
>>> > at
>>> >
>>> 
>>>com.gumgum.kafka.consumer.KafkaTemplate.executeWithBatch(KafkaTemplat
>>>e.j
>>>ava:59)
>>> > at
>>> >
>>> 
>>>com.gumgum.storm.fileupload.GenericKafkaSpout.nextTuple(GenericKafkaS
>>>pou
>>>t.java:73)
>>> > at
>>> >
>>> 
>>>backtype.storm.daemon.executor$fn__3968$fn__4009$fn__4010.invoke(exec
>>>uto
>>>r.clj:433)
>>> > at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>>> >
>>> > None of the Kafka clients (ConsumerConenctor class) would start. 
>>> > They
>>> would
>>> > fail with the exception.
>>> >
>>> > We tried restarting the clilents, restarting the zookeeper as well.
>>>But
>>> > finally it all started working when we restarted all of our kafka
>>> brokers.
>>> > We didn't lose any data because producers (going directly to the
>>>brokers
>>> > through a load balancer) were working fine.
>>> >
>>> > I tried googling this issue and looks like lot of people have 
>>> > faced
>>>it,
>>> but
>>> > couldn't get anything concrete.
>>> >
>>> > Given this, I have two questions:
>>> >
>>> > It will be nice if you can tell me why this can happen or point me
>>>to a
>>> > link where I can understand it better. What does Consumer 
>>> > Rebalancing
>>> mean?
>>> > Does that mean consumers are trying to coordinate amongst 
>>> > themselves
>>> using
>>> > Zookeeper?
>>> >
>>> > On a separate note, are there any JMX parameters I need to be
>>>monitoring
>>> to
>>> > make sure that my kafka cluster is healthy? How can I keep watch 
>>> > on
>>>my
>>> > kafka cluster?
>>> >
>>> > Regards,
>>> > Vaibhav Puranik
>>> > GumGum
>>>



upgrade from 0.8-beta1 to 0.8

2013-11-29 Thread Yu, Libo
Hi team,

Currently we are using 0.8-beta1. We plan to upgrade to 0.8. My concern
is whether we need to purge all existing kafka and zookeeper data on the
hard drive for this upgrade. In other words, can 0.8 use 0.8-beta1 kafka
and zookeeper data on the hard drive? Thanks.

Regards,

Libo



RE: addition of a partition

2013-11-29 Thread Yu, Libo
Thanks, Jun. It is good to know that. Currently, auto.offset.reset  is set to 
largest for all our consumers.
It seems we have to change it to smallest.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Friday, November 29, 2013 1:17 AM
To: users@kafka.apache.org
Subject: Re: addition of a partition

The consumer should be able to pick up the new partitions automatically. If is 
set to the largest, the consumer may miss some data in the new partitions. To 
avoid data loss, you have to set auto.offset.reset to smallest.

Thanks,

Jun


On Thu, Nov 28, 2013 at 12:28 PM, Yu, Libo  wrote:

> Hi team,
>
> I am reading this link:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Re
> plicationtools-5.AddPartitionTool and this JIRA 
> https://issues.apache.org/jira/i#browse/KAFKA-1030. I have a couple of 
> questions.
>
> After adding a partition by using the tool, should the consumer 
> processes be restarted? If auto.offset.reset is set to the largest for 
> my consumer processes, will be it changed during partition addition? 
> And is there chance of message loss? Thanks.
>
>
>
> Regards,
>
> Libo
>
>


RE: about compiling broker in 0.8 branch

2013-11-29 Thread Yu, Libo
Thanks for confirming that, Joe.

Regards,

Libo


-Original Message-
From: Joe Stein [mailto:joe.st...@stealth.ly] 
Sent: Thursday, November 28, 2013 8:23 PM
To: users@kafka.apache.org
Subject: Re: about compiling broker in 0.8 branch

The official binary release of the broker is done with 2.8.0.  I believe that 
also has the largest implementations still.

You can use any Scala version your code needs of producer/consumer libraries 
supported with that broker or other languages.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
/


On Thu, Nov 28, 2013 at 4:24 PM, Yu, Libo  wrote:

> Hi team,
>
> For the current 0.8 branch, is it recommended to compile it with Scala 
> 2.10?
> I remember someone said previously it is best to compile the broker 
> with Scala 2.80. Thanks.
>
> Regards,
>
> Libo
>
>


about compiling broker in 0.8 branch

2013-11-28 Thread Yu, Libo
Hi team,

For the current 0.8 branch, is it recommended to compile it with Scala 2.10?
I remember someone said previously it is best to compile the broker with
Scala 2.80. Thanks.

Regards,

Libo



addition of a partition

2013-11-28 Thread Yu, Libo
Hi team,

I am reading this link:
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionTool
and this JIRA https://issues.apache.org/jira/i#browse/KAFKA-1030. I have a 
couple of questions.

After adding a partition by using the tool, should the consumer processes be 
restarted? If auto.offset.reset
is set to the largest for my consumer processes, will be it changed during 
partition addition? And is there
chance of message loss? Thanks.



Regards,

Libo



RE: retention size

2013-11-22 Thread Yu, Libo
I did a restart and the issue was gone. It could be that we changed the 
retention size and did not restart
the brokers to pick up the change. Thanks for your help.

Regards,

Libo


-Original Message-
From: Yu, Libo [ICG-IT] 
Sent: Friday, November 22, 2013 10:53 AM
To: 'users@kafka.apache.org'
Subject: RE: retention size

I am using 0.8-beta1. I used only one producer and compression was not turned 
on.
The topic was newly created. The default retention size applied so each of 
three partitions Is 3Gb. I checked log files for all three partitions and all 
of them are 1.4G.

Regards,

Libo


-Original Message-
From: Joel Koshy [mailto:jjkosh...@gmail.com]
Sent: Thursday, November 21, 2013 9:50 PM
To: users@kafka.apache.org
Subject: Re: retention size

Are you on 0.8 branch or trunk? Also, what are the sizes for the other 
partitions? Also how many producers do you have? It could be because the 
producer sticks to a partition for the metadata refresh period - so if your 
test run isn't long enough some partitions may be more loaded than the others.

On Thu, Nov 21, 2013 at 06:28:39PM +, Yu, Libo  wrote:
> Hi team,
> 
> We have 3 brokers in a cluster. The replication factor is 2. I set the 
> default retention size to 3G bytes. I published 12G data to a topic, 
> which is enough to fully load all partitions. I assume on each broker 
> the partition size should be 3G. However, it is only 1.4G for one partation. 
> Is this a known issue? Thanks.
> 
> Regards,
> 
> Libo
> 



RE: retention size

2013-11-22 Thread Yu, Libo
I am using 0.8-beta1. I used only one producer and compression was not turned 
on.
The topic was newly created. The default retention size applied so each of 
three partitions
Is 3Gb. I checked log files for all three partitions and all of them are 1.4G.

Regards,

Libo


-Original Message-
From: Joel Koshy [mailto:jjkosh...@gmail.com] 
Sent: Thursday, November 21, 2013 9:50 PM
To: users@kafka.apache.org
Subject: Re: retention size

Are you on 0.8 branch or trunk? Also, what are the sizes for the other 
partitions? Also how many producers do you have? It could be because the 
producer sticks to a partition for the metadata refresh period - so if your 
test run isn't long enough some partitions may be more loaded than the others.

On Thu, Nov 21, 2013 at 06:28:39PM +0000, Yu, Libo  wrote:
> Hi team,
> 
> We have 3 brokers in a cluster. The replication factor is 2. I set the 
> default retention size to 3G bytes. I published 12G data to a topic, 
> which is enough to fully load all partitions. I assume on each broker 
> the partition size should be 3G. However, it is only 1.4G for one partation. 
> Is this a known issue? Thanks.
> 
> Regards,
> 
> Libo
> 



retention size

2013-11-21 Thread Yu, Libo
Hi team,

We have 3 brokers in a cluster. The replication factor is 2. I set the default 
retention size to
3G bytes. I published 12G data to a topic, which is enough to fully load all 
partitions. I assume
on each broker the partition size should be 3G. However, it is only 1.4G for 
one partation. Is
this a known issue? Thanks.

Regards,

Libo



NodeExists Keeper exception

2013-11-15 Thread Yu, Libo
Hi team,

Still this is from beta1. I notice this exception occurred frequently in our
broker logs.
[2013-11-14 21:09:58,714] INFO Got user-level KeeperException when proce
ssing sessionid:0x24250e816b000df type:create cxid:0x26b zxid:0x
fffe txntype:unknown reqpath:n/a Error 
Path:/consumers/myuser/owners/mytopic/2 Error:Kee
perErrorCode = NodeExists for /consumers/myuser/owners/mytopic/2 
(org.apache.zookeeper.serve
r.PrepRequestProcessor)

/consumers/myuser/owners/mytopic/2 is not an ephemeral node. Is it a zookeeper 
bug?


Regards,

Libo



RE: broker exception

2013-11-14 Thread Yu, Libo
We are using the default one in 0.8-beta1. It should be 3.3.3. 
According to the comment in the Jira, it is more like a zookeeper
configuration issue. I will make change to our configuration
and keep monitoring it. Thanks.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Thursday, November 14, 2013 12:27 PM
To: users@kafka.apache.org
Subject: Re: broker exception

Are you using ZK 3.3.4? This seems to be caused by a bug in 3.3.3 and 3.3.0.

https://issues.apache.org/jira/browse/ZOOKEEPER-1115

Thanks,

Jun


On Thu, Nov 14, 2013 at 5:25 AM, Yu, Libo  wrote:

> Hi team,
>
> This exception occurs regularly on our brokers. When it occurs, a 
> broker will lose its leader role but still in ISR. And running 
> preferred-leader-election script may rebalance the leadership but in 
> some cases it does not help.
>
> [2013-11-14 08:04:40,001] INFO Processed session termination for
> sessionid: 0x34250e8160b0360 (org.apache.zookee
> per.server.PrepRequestProcessor)
> [2013-11-14 08:04:40,001] INFO Processed session termination for
> sessionid: 0x24250e816b00341 (org.apache.zookee
> per.server.PrepRequestProcessor)
> [2013-11-14 08:04:41,909] ERROR Unexpected exception causing shutdown 
> while sock still open (org.apache.zookeepe
> r.server.quorum.LearnerHandler)
> java.io.EOFException
> at java.io.DataInputStream.readInt(DataInputStream.java:392)
> at
> org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63)
> at
> org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:84)
> at
> org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:108)
> at
> org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.j
> ava:413)
> [2013-11-14 08:04:41,917] WARN *** GOODBYE /xxx.xxx.xx.xx:59352
>  (org.apache.zookeeper.server.quorum.LearnerHandler)
> [2013-11-14 08:04:41,926] INFO Got user-level KeeperException when 
> processing sessionid:0x24250e816b00348 type:setData cxid:0x7e4 
> zxid:0xfffe txntype:unknown
> reqpath:/consumers/myusergroup/offsets/mytopic/1 Error Path:null 
> Error:KeeperErrorCode = Session expired
> (org.apache.zookeeper.server.PrepRequestProcessor)
> [2013-11-14 08:04:42,000] INFO Expiring session 0x24242764c0b, 
> timeout of 1ms exceeded 
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2013-11-14 08:04:42,001] INFO Processed session termination for
> sessionid: 0x24242764c0b
> (org.apache.zookeeper.server.PrepRequestProcessor)
> [2013-11-14 08:04:42,002] INFO Closed socket connection for client
> /xxx.xxx.xx.xx:41654 which had sessionid 0x24242764c0b
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2013-11-14 08:04:42,064] INFO Got user-level KeeperException when 
> processing sessionid:0x14250e81604 type:create cxid:0x11 
> zxid:0xfffe txntype:unknown reqpath:n/a Error 
> Path:/controller Error:KeeperErrorCode = NodeExists for /controller
> (org.apache.zookeeper.server.PrepRequestProcessor)
> [2013-11-14 08:04:42,065] INFO conflict in /controller data: 3 stored
> data: 2 (kafka.utils.ZkUtils$)
> [2013-11-14 08:04:42,067] INFO New leader is 2
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
>
>
>
> Regards,
>
> Libo
>
>


RE: Leader not local

2013-11-14 Thread Yu, Libo
Unfortunately, we don't have access to producer log. But it seems very obvious 
to me that producer
failed to find out the correct leader as the warning messages occurred 
repetitively for a 
pretty long time span (more than 10 minutes).

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Thursday, November 14, 2013 11:48 AM
To: users@kafka.apache.org
Subject: Re: Leader not local

Hi Libo,

Yes the producer should be start to find the correct leader upon just one send 
failures. Could you check if there are other exceptions on the producer log?

Guozhang


On Thu, Nov 14, 2013 at 8:23 AM, Yu, Libo  wrote:

> Hi team,
>
> I saw this line within a long time span in our logs for the same topic 
> and partition.
> [2013-11-14 11:13:41,647] WARN [KafkaApi-1] Produce request with 
> correlation id 529240 from client  on partition [mytopic,2] failed due 
> to Leader not local for partition [mytopic,2] on broker 1
> (kafka.server.KafkaApis)
>
> Why can't the producer find out the broker it is publishing to is not 
> the broker in this case? I believe it should find it out and publish 
> to the leader broker.
>
>
> Regards,
>
> Libo
>
>


--
-- Guozhang


Leader not local

2013-11-14 Thread Yu, Libo
Hi team,

I saw this line within a long time span in our logs for the same topic and 
partition.
[2013-11-14 11:13:41,647] WARN [KafkaApi-1] Produce request with correlation id 
529240 from client  on partition [mytopic,2] failed due to Leader not local for 
partition [mytopic,2] on broker 1 (kafka.server.KafkaApis)

Why can't the producer find out the broker it is publishing to is not the 
broker in this case? I believe it should find it out and publish to the leader 
broker.


Regards,

Libo



will this cause message loss?

2013-11-14 Thread Yu, Libo
Hi team,

We are using beta1. I am going to delete all topics and create them with more 
partitions.
But I don't want to lose any messages.

Assume the consumers are online all the time for the following steps. The 
consumer's
auto.offset.reset is set to largest.

1 stop publishing to the brokers.
2 wait until all N messages on the brokers have been consumed
3 delete all topics. That includes deleting the log files and 
/brokers/topics/mytopic and
/consumers/myusergroup/owners/mytopic and /consumers/myusergroup/offsets/mytopic
4 recreate all the topics with more partitions.
5 start the brokers
6 resume publishing to the brokers

When a consumer tries to get the next message from a newly created topic, will 
the OutOfRange
Exception will reset the offset to 0 in this case?
Thanks,

Libo



broker exception

2013-11-14 Thread Yu, Libo
Hi team,

This exception occurs regularly on our brokers. When it occurs, a broker will 
lose its leader role but still in ISR. And running
preferred-leader-election script may rebalance the leadership but in some cases 
it does not help.

[2013-11-14 08:04:40,001] INFO Processed session termination for sessionid: 
0x34250e8160b0360 (org.apache.zookee
per.server.PrepRequestProcessor)
[2013-11-14 08:04:40,001] INFO Processed session termination for sessionid: 
0x24250e816b00341 (org.apache.zookee
per.server.PrepRequestProcessor)
[2013-11-14 08:04:41,909] ERROR Unexpected exception causing shutdown while 
sock still open (org.apache.zookeepe
r.server.quorum.LearnerHandler)
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at 
org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:63)
at 
org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:84)
at 
org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:108)
at 
org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:413)
[2013-11-14 08:04:41,917] WARN *** GOODBYE /xxx.xxx.xx.xx:59352  
(org.apache.zookeeper.server.quorum.LearnerHandler)
[2013-11-14 08:04:41,926] INFO Got user-level KeeperException when processing 
sessionid:0x24250e816b00348 type:setData cxid:0x7e4 zxid:0xfffe 
txntype:unknown reqpath:/consumers/myusergroup/offsets/mytopic/1 Error 
Path:null Error:KeeperErrorCode = Session expired 
(org.apache.zookeeper.server.PrepRequestProcessor)
[2013-11-14 08:04:42,000] INFO Expiring session 0x24242764c0b, timeout of 
1ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2013-11-14 08:04:42,001] INFO Processed session termination for sessionid: 
0x24242764c0b (org.apache.zookeeper.server.PrepRequestProcessor)
[2013-11-14 08:04:42,002] INFO Closed socket connection for client 
/xxx.xxx.xx.xx:41654 which had sessionid 0x24242764c0b 
(org.apache.zookeeper.server.NIOServerCnxn)
[2013-11-14 08:04:42,064] INFO Got user-level KeeperException when processing 
sessionid:0x14250e81604 type:create cxid:0x11 zxid:0xfffe 
txntype:unknown reqpath:n/a Error Path:/controller Error:KeeperErrorCode = 
NodeExists for /controller (org.apache.zookeeper.server.PrepRequestProcessor)
[2013-11-14 08:04:42,065] INFO conflict in /controller data: 3 stored data: 2 
(kafka.utils.ZkUtils$)
[2013-11-14 08:04:42,067] INFO New leader is 2 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)



Regards,

Libo



RE: Purgatory

2013-11-08 Thread Yu, Libo
I read it and tried to understand it. It would be great to add a summary
at the beginning about what it is and how it may impact a user.

Regards,

Libo


-Original Message-
From: Joel Koshy [mailto:jjkosh...@gmail.com] 
Sent: Friday, November 08, 2013 2:01 AM
To: users@kafka.apache.org
Subject: Re: Purgatory

Excellent - thanks for putting that together! Will review it more carefully 
tomorrow and suggest some minor edits if required.

On Thu, Nov 07, 2013 at 10:45:40PM -0500, Marc Labbe wrote:
> I've just added a page for purgatory, feel free to comment/modify at will.
> I hope I didn't misinterpret too much of the code.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Request+Purgatory+(0
> .8)
> 
> I added a few questions of my own.
> 
> 
> On Fri, Nov 1, 2013 at 9:43 PM, Joe Stein  wrote:
> 
> > To edit the Wiki you need to send an ICLA 
> > http://www.apache.org/licenses/#clas to Apache and then once that is 
> > done an email to priv...@kafka.apache.org (or to me and I will copy 
> > private) with your Wiki username and that you sent the ICLA to Apache.
> >
> > Then, I can add you to edit the Wiki.
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> >
> > On Fri, Nov 1, 2013 at 9:08 PM, Marc Labbe  wrote:
> >
> > > Hi Joel,
> > >
> > > I used to have edit to the wiki, I made a few additions to it a 
> > > while ago but it's seem I don't have it anymore. It might have 
> > > been lost in the confluence update. I would be glad to add what I 
> > > have written if I get it back. Otherwise, feel free to paste my 
> > > words in one of the pages, I don't intend on asking for copyrights for 
> > > this :).
> > >
> > > marc
> > >
> > >
> > > On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy  wrote:
> > >
> > > > Marc, thanks for writing that up. I think it is worth adding 
> > > > some details on the request-purgatory on a wiki (Jay had started 
> > > > a wiki page for kafka internals [1] a while ago, but we have not 
> > > > had time to add much to it since.) Your write-up could be 
> > > > reviewed and added there. Do you have edit permissions on the wiki?
> > > >
> > > > As for the purge interval config - yes the documentation can be 
> > > > improved a bit. It's one of those "internal" configs that 
> > > > generally don't need to be modified by users. The reason we 
> > > > added that was as
> > > > follows:
> > > > - We found that for low-volume topics, replica fetch requests 
> > > > were getting expired but sitting around in purgatory
> > > > - This was because we were expiring them from the delay queue 
> > > > (used to track when requests should expire), but they were still 
> > > > sitting in the watcherFor map - i.e., they would get purged when 
> > > > the next producer request to that topic/partition arrived, but 
> > > > for low volume topics this could be a long time (or never in the 
> > > > worst case) and we would eventually run into an OOME.
> > > > - So we needed to periodically go through the entire watcherFor 
> > > > map and explicitly remove those requests that had expired.
> > > > - More details on this are in KAFKA-664.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > [1] 
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internal
> > > > s
> > > >
> > > > On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe  wrote:
> > > > > Guozhang,
> > > > >
> > > > > I have to agree with Priya the doc isn't very clear. Although 
> > > > > the configuration is documented, it is simply rewording the 
> > > > > name of the
> > > > config,
> > > > > which isn't particularly useful if you want more information 
> > > > > about
> > what
> > > > the
> > > > > purgatory is. I searched the whole wiki and doc and could not 
> > > > > find
> > > > anything
> > > > > very useful as opposed looking a the code. In this case, 
> > > > > kafka.server.KafkaApis and kafka.server.RequestPurgatory will 
> > > > > be your friends.
> > > > >
> > > > > I'll try to add to Joe's answer here, mostly just reporting 
> > > > > what's available in the Scala doc from the project. I am doing 
> > > > > this to
> > > > understand
> > > > > the mechanics myself btw.
> > > > >
> > > > > As Joe said, messages are not dropped by the purgatory but 
> > > > > simply
> > > removed
> > > > > from the purgatory when they are satisfied. Satisfaction 
> > > > > conditions
> > are
> > > > > different for both fetch and produce requests and this is 
> > > > > implemented
> > > in
> > > > > their respective DelayedRequest implementation (DelayedFetch 
> > > > > and DelayedProduce).
> > > > >
> > > > > Requests purgatories are defined as follow in the code:
> > > > >  - ProducerRequestPurgatory: A holding pen for produce 
> > > > > requests
> > waiting
> > > > to
> > > > > 

RE: add partition tool in 0.8

2013-11-08 Thread Yu, Libo
Thanks for your reply, Joel.

Regards,

Libo


-Original Message-
From: Joel Koshy [mailto:jjkosh...@gmail.com] 
Sent: Thursday, November 07, 2013 5:00 PM
To: users@kafka.apache.org
Subject: Re: add partition tool in 0.8

> 
> kafka-add-partitions.sh is in 0.8 but not in 0.8-beta1. Therefore we 
> cannot use this tool with 0.8-beta1. If I download latest 0.8 and 
> compile it, can I use its kafka-add-partitions.sh to add partitions for the 
> topics that already exist in our 0.8-beta1 kafka? Thanks.

Unfortunately, no - since there were changes in the controller
(broker) to support this. So you will need to upgrade to 0.8 before you can do 
this.

Thanks,

Joel



add partition tool in 0.8

2013-11-07 Thread Yu, Libo
Hi team,

Here is what I want to do:
We are using 0.8-beta1 currently. We already have some topics and want to add 
partitions
for them.

kafka-add-partitions.sh is in 0.8 but not in 0.8-beta1. Therefore we cannot use 
this tool with
0.8-beta1. If I download latest 0.8 and compile it, can I use its 
kafka-add-partitions.sh to add
partitions for the topics that already exist in our 0.8-beta1 kafka? Thanks.

Libo


Regards,

Libo



RE: question about default key

2013-10-24 Thread Yu, Libo
Got it. Thanks.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Thursday, October 24, 2013 10:09 AM
To: users@kafka.apache.org
Subject: Re: question about default key

The default key is null.

Thanks,
Neha
On Oct 24, 2013 6:47 AM, "Yu, Libo"  wrote:

> Hi team,
>
> If I don't specify a key when publishing a message, a default key will 
> be generated.
> In this case, how long is the default key and will the consumer get 
> this default key?
>
> Thanks.
>
> Libo
>
>


question about default key

2013-10-24 Thread Yu, Libo
Hi team,

If I don't specify a key when publishing a message, a default key will be 
generated.
In this case, how long is the default key and will the consumer get this 
default key?

Thanks.

Libo



KeyedMessage question

2013-10-19 Thread Yu, Libo
Hi team,

For the message type in KeyedMessage, I can use String or byte[].
Is there any difference in terms of the actual data transferred?



Regards,

Libo



key partitioner

2013-10-19 Thread Yu, Libo
Hi team,

According to the document, the default partitioner hashes the key string and 
assign the message
to  a broker. Could you give a brief introduction to the hash algorithm? If a 
long timestamp (in hex format)
is used as key, will the messages be distributed evenly to all partitions? 
Assume the timestamp increases
on a one millisecond basis.

Regards,

Libo



RE: producer API thread safety

2013-10-04 Thread Yu, Libo
Great. Thanks.

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Friday, October 04, 2013 12:27 PM
To: users@kafka.apache.org
Subject: Re: producer API thread safety

The send() is thread safe, so the short answer would be yes.


On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo  wrote:

> Hi team,
>
> Is it possible to use a single producer with more than one threads? I 
> am not sure If its send() is thread safe.
>
> Regards,
>
> Libo
>
>


--
-- Guozhang


producer API thread safety

2013-10-04 Thread Yu, Libo
Hi team,

Is it possible to use a single producer with more than one threads? I am not 
sure
If its send() is thread safe.

Regards,

Libo



RE: is it possible to commit offsets on a per stream basis?

2013-10-04 Thread Yu, Libo
This will improve efficiency on the client side greatly. And multiple threads 
don't have to synchronize
before committing offsets. Thanks, Jason.

Regards,

Libo


-Original Message-
From: Jason Rosenberg [mailto:j...@squareup.com] 
Sent: Thursday, October 03, 2013 4:13 PM
To: users@kafka.apache.org
Subject: Re: is it possible to commit offsets on a per stream basis?

I added a comment/suggestion to:
https://issues.apache.org/jira/browse/KAFKA-966

Basically to expose an api for marking an offset for commit, such that the 
auto-commit would only commit offsets up to the last message 'markedForCommit', 
and not the last 'consumed' offset, which may or may not have succeeded.  This 
way, consumer code can just call 'markForCommit()'
after successfully processing each message successfully.

Does that make sense?


On Mon, Sep 9, 2013 at 5:21 PM, Yu, Libo  wrote:

> Thanks, Neha. That number of connections formula is very helpful.
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Monday, September 09, 2013 12:17 PM
> To: users@kafka.apache.org
> Subject: Re: is it possible to commit offsets on a per stream basis?
>
> Memory might become an issue if all the connectors are part of the 
> same process. But this is easily solvable by distributing the 
> connectors over several machines.
> Number of connections would be (# of connectors) * (# of brokers) and 
> will proportionately increase with the # of connectors.
>
> Thanks,
> Neha
>
>
> On Mon, Sep 9, 2013 at 9:08 AM, Yu, Libo  wrote:
>
> > If one connector is used for a single stream, when there are many 
> > topics/streams, will that cause any performance issue, e.g. too many 
> > connections or too much memory or big latency?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > Sent: Sunday, September 08, 2013 12:46 PM
> > To: users@kafka.apache.org
> > Subject: Re: is it possible to commit offsets on a per stream basis?
> >
> > That should be fine too.
> >
> >
> >
> >
> > On Sat, Sep 7, 2013 at 8:33 PM, Jason Rosenberg 
> wrote:
> >
> > > To be clear, it looks like I forgot to add to my question, that I 
> > > am asking about creating multiple connectors, within the same 
> > > consumer process (as I realize I can obviously have multiple 
> > > connectors running on multiple hosts, etc.).  But I'm guessing 
> > > that should be
> fine too?
> > >
> > > Jason
> > >
> > >
> > >
> > >
> > > On Sat, Sep 7, 2013 at 3:09 PM, Neha Narkhede 
> > >  > > >wrote:
> > >
> > > > >> Can I create multiple connectors, and have each use the same 
> > > > >> Regex
> > > > for the TopicFilter?  Will each connector share the set of 
> > > > available topics?  Is this safe to do?
> > > >
> > > > >> Or is it necessary to create mutually non-intersecting 
> > > > >> regex's for
> > > each
> > > > connector?
> > > >
> > > > As long as each of those consumer connectors share the same 
> > > > group id,
> > > Kafka
> > > > consumer rebalancing should automatically re-distribute the 
> > > > topic/partitions amongst the consumer connectors/streams evenly.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Mon, Sep 2, 2013 at 1:35 PM, Jason Rosenberg 
> > > > 
> > > wrote:
> > > >
> > > > > Will this work if we are using a TopicFilter, that can map to 
> > > > > multiple topics.  Can I create multiple connectors, and have 
> > > > > each use the same
> > > > Regex
> > > > > for the TopicFilter?  Will each connector share the set of 
> > > > > available topics?  Is this safe to do?
> > > > >
> > > > > Or is it necessary to create mutually non-intersecting regex's 
> > > > > for each connector?
> > > > >
> > > > > It seems I have a similar issue.  I have been using auto 
> > > > > commit mode,
> > > but
> > > > > it doesn't guarantee that all messages committed have been 
> > > > > successfully processed (seems a change to the connector itself 
> > > > > might expose a way to
> > > > use
>

RE: Strategies for improving Consumer throughput

2013-10-02 Thread Yu, Libo
You can use a thread pool to write to hbase. And create another pool of 
consumer threads. Or add more 
consumer processes. The bottleneck is writing to Hbase in this case.
Regards,

Libo


-Original Message-
From: Graeme Wallace [mailto:graeme.wall...@farecompare.com] 
Sent: Wednesday, October 02, 2013 4:36 PM
To: users
Subject: Re: Strategies for improving Consumer throughput

Yes, definitely consumers are behind - we can see from examining the offsets


On Wed, Oct 2, 2013 at 1:59 PM, Joe Stein  wrote:

> Are you sure the consumers are behind? could the pause be because the 
> stream is empty and producing messages is what is behind the consumption?
>
> What if you shut off your consumers for 5 minutes and then start them 
> again do the consumers behave the same way?
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
>
> On Wed, Oct 2, 2013 at 3:54 PM, Graeme Wallace < 
> graeme.wall...@farecompare.com> wrote:
>
> > Hi All,
> >
> > We've got processes that produce many millions of itineraries per minute.
> > We would like to get them into HBase (so we can query for chunks of 
> > them
> > later) - so our idea was to write each itinerary as a message into 
> > Kafka
> -
> > so that not only can we have consumers that write to HBase, but also
> other
> > consumers that may provide some sort of real-time monitoring service 
> > and also an archive service.
> >
> > Problem is - we don't really know enough about how best to do this 
> > effectively with Kafka, so that the producers can run flat out and 
> > the consumers can run flat out too. We've tried having one topic, 
> > with
> multiple
> > partitions to match the spindles on our broker h/w (12 on each) - 
> > and setting up a thread per partition on the consumer side.
> >
> > At the moment, our particular problem is that the consumers just 
> > can't
> keep
> > up. We can see from logging that the consumer threads seem to run in 
> > bursts, then a pause (as yet we don't know what the pause is - dont 
> > think its GC). Anyways, does what we are doing with one topic and 
> > multiple partitions sound correct ? Or do we need to change ? Any 
> > tricks to speed
> up
> > consumption ? (we've tried changing the fetch size - doesnt help much).
> Am
> > i correct in assuming we can have one thread per partition for
> consumption
> > ?
> >
> > Thanks in advance,
> >
> > Graeme
> >
> > --
> > Graeme Wallace
> > CTO
> > FareCompare.com
> > O: 972 588 1414
> > M: 214 681 9018
> >
>



--
Graeme Wallace
CTO
FareCompare.com
O: 972 588 1414
M: 214 681 9018


bandwidth usage issue

2013-10-01 Thread Yu, Libo
Hi team,

Here is a usage case: Assume each host in a kafka cluster a gigabit network 
adaptor.
And the incoming traffic is 0.8gbps and at one point all the traffic goes to 
one host.
The remaining bandwidth is not enough for the followers to replicate messages 
from
this leader.

To make sure no broker will be dropped out of ISR, the intra-cluster bandwidth 
must
be much larger than incoming/outgoing bandwidth. Otherwise, use replication 
factor 1.
Is my understanding correct? Thanks.


Regards,

Libo



RE: [jira] [Updated] (KAFKA-1046) Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x

2013-09-17 Thread Yu, Libo
Hi team,

Is it safe to apply the 0.8 patch to 0.8 beta1?

Regards,

Libo

-Original Message-
From: Joe Stein [mailto:crypt...@gmail.com] 
Sent: Friday, September 13, 2013 4:10 PM
To: d...@kafka.apache.org; users@kafka.apache.org
Subject: Re: [jira] [Updated] (KAFKA-1046) Added support for Scala 2.10 builds 
while maintaining compatibility with 2.8.x

Thanks Chris for the patches and Neha for reviewing and committing them!!!

It is great we now have support for Scala 2.10 in Kafka trunk and also 0.8 
branch and without losing any existing support for anything else.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


On Fri, Sep 13, 2013 at 3:57 PM, Neha Narkhede (JIRA) wrote:

>
>  [
> https://issues.apache.org/jira/browse/KAFKA-1046?page=com.atlassian.ji
> ra.plugin.system.issuetabpanels:all-tabpanel]
>
> Neha Narkhede updated KAFKA-1046:
> -
>
> Resolution: Fixed
> Status: Resolved  (was: Patch Available)
>
> Thanks for the patches. Checked in your patch to trunk
>
> > Added support for Scala 2.10 builds while maintaining compatibility 
> > with
> 2.8.x
> >
> --
> 
> >
> > Key: KAFKA-1046
> > URL: https://issues.apache.org/jira/browse/KAFKA-1046
> > Project: Kafka
> >  Issue Type: Improvement
> >Affects Versions: 0.8
> >Reporter: Christopher Freeman
> >Assignee: Christopher Freeman
> > Fix For: 0.8
> >
> > Attachments: kafka_2_10_refactor_0.8.patch,
> kafka_2_10_refactor.patch, Screen Shot 2013-09-09 at 9.34.09 AM.png
> >
> >
> > I refactored the project such that it will compile against Scala 2.10.1.
>
> --
> This message is automatically generated by JIRA.
> If you think it was sent incorrectly, please contact your JIRA 
> administrators For more information on JIRA, see: 
> http://www.atlassian.com/software/jira
>


RE: [jira] [Updated] (KAFKA-1046) Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x

2013-09-17 Thread Yu, Libo
Answer my own question:
When I tried to apply the patch to 0.8 beta1, I got many errors and had to skip 
it.

Regards,

Libo


-Original Message-
From: Yu, Libo [ICG-IT] 
Sent: Tuesday, September 17, 2013 3:33 PM
To: 'users@kafka.apache.org'; 'd...@kafka.apache.org'
Subject: RE: [jira] [Updated] (KAFKA-1046) Added support for Scala 2.10 builds 
while maintaining compatibility with 2.8.x

Hi team,

Is it safe to apply the 0.8 patch to 0.8 beta1?

Regards,

Libo

-Original Message-
From: Joe Stein [mailto:crypt...@gmail.com]
Sent: Friday, September 13, 2013 4:10 PM
To: d...@kafka.apache.org; users@kafka.apache.org
Subject: Re: [jira] [Updated] (KAFKA-1046) Added support for Scala 2.10 builds 
while maintaining compatibility with 2.8.x

Thanks Chris for the patches and Neha for reviewing and committing them!!!

It is great we now have support for Scala 2.10 in Kafka trunk and also 0.8 
branch and without losing any existing support for anything else.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
/


On Fri, Sep 13, 2013 at 3:57 PM, Neha Narkhede (JIRA) wrote:

>
>  [
> https://issues.apache.org/jira/browse/KAFKA-1046?page=com.atlassian.ji
> ra.plugin.system.issuetabpanels:all-tabpanel]
>
> Neha Narkhede updated KAFKA-1046:
> -
>
> Resolution: Fixed
> Status: Resolved  (was: Patch Available)
>
> Thanks for the patches. Checked in your patch to trunk
>
> > Added support for Scala 2.10 builds while maintaining compatibility 
> > with
> 2.8.x
> >
> --
> 
> >
> > Key: KAFKA-1046
> > URL: https://issues.apache.org/jira/browse/KAFKA-1046
> > Project: Kafka
> >  Issue Type: Improvement
> >Affects Versions: 0.8
> >Reporter: Christopher Freeman
> >Assignee: Christopher Freeman
> > Fix For: 0.8
> >
> > Attachments: kafka_2_10_refactor_0.8.patch,
> kafka_2_10_refactor.patch, Screen Shot 2013-09-09 at 9.34.09 AM.png
> >
> >
> > I refactored the project such that it will compile against Scala 2.10.1.
>
> --
> This message is automatically generated by JIRA.
> If you think it was sent incorrectly, please contact your JIRA 
> administrators For more information on JIRA, see:
> http://www.atlassian.com/software/jira
>


RE: monitoring followers' lag

2013-09-11 Thread Yu, Libo
Hi Joel,

I cannot find such a value. My teammate also cannot find it.
There is no ReplicaFetcherManager underl kafka.server.
There is only a ReplicaManager but there is no MaxLag 
under it.

Regards,

Libo


-Original Message-
From: Joel Koshy [mailto:jjkosh...@gmail.com] 
Sent: Tuesday, September 10, 2013 8:48 PM
To: users@kafka.apache.org
Subject: Re: monitoring followers' lag

It should be "kafka.server":type="ReplicaFetcherManager",name="Replica-MaxLag"
- can you confirm and mind updating the wiki if this is the case?

Thanks,

Joel

On Tue, Sep 10, 2013 at 10:38 AM, Yu, Libo  wrote:
> I have run JConsole. But I cannot find 
> "kafka.server":name="([-.\w]+)-MaxLag", type="ReplicaFetcherManager"
>
> I am running 0.8 beta1. It seems ReplicaFetcherManager has not been 
> added to this version.
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Tuesday, September 10, 2013 11:01 AM
> To: users@kafka.apache.org
> Subject: Re: monitoring followers' lag
>
> Have you looked at the updated docs in 
> http://kafka.apache.org/documentation.html#monitoring ?
>
> Thanks,
>
> Jun
>
>
> On Tue, Sep 10, 2013 at 7:59 AM, Yu, Libo  wrote:
>
>> Hi team,
>>
>> I wonder if anybody can give detailed instructions on how to monitor 
>> the followers' lag by using JMX. Thanks.
>>
>> Regards,
>>
>> Libo
>>
>>


monitoring followers' lag

2013-09-10 Thread Yu, Libo
Hi team,

I wonder if anybody can give detailed instructions on how to monitor
the followers' lag by using JMX. Thanks.

Regards,

Libo



RE: monitoring followers' lag

2013-09-10 Thread Yu, Libo
I have run JConsole. But I cannot find 
"kafka.server":name="([-.\w]+)-MaxLag", type="ReplicaFetcherManager"

I am running 0.8 beta1. It seems ReplicaFetcherManager has not been 
added to this version.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, September 10, 2013 11:01 AM
To: users@kafka.apache.org
Subject: Re: monitoring followers' lag

Have you looked at the updated docs in
http://kafka.apache.org/documentation.html#monitoring ?

Thanks,

Jun


On Tue, Sep 10, 2013 at 7:59 AM, Yu, Libo  wrote:

> Hi team,
>
> I wonder if anybody can give detailed instructions on how to monitor 
> the followers' lag by using JMX. Thanks.
>
> Regards,
>
> Libo
>
>


RE: implicit default minimum retention size per partition is 4GB.

2013-09-10 Thread Yu, Libo
In a stress test, 100K 1Mb messages (100Gb in size) are published (our 
bandwidth is limited). 
As our retention size is 3G which is smaller than the required default minimum 
retention size 
(4G), we noticed 20K messages were missing.  After increasing 
"num.replica.fetchers" to 2,
no more message loss. 
So how much messages in one high watermark? And is it configurable? I went 
through 
the broker configuration page and found nothing.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, September 10, 2013 11:00 AM
To: users@kafka.apache.org
Subject: Re: implicit default minimum retention size per partition is 4GB.

Monitoring the lag in bytes makes sense. The only difficulty is currently, the 
high watermark in the leader is represented in logical message offset, not the 
byte offset. For now, you will have to do the bytes to messages translation 
yourself.

As for setting replica.lag.max.messages, you can observe the max lag in the 
follower and set replica.lag.max.messages to be a bit larger than that. I am 
curious to know the observed max lag in your use case.

Thanks,

Jun


On Tue, Sep 10, 2013 at 6:46 AM, Yu, Libo  wrote:

> Hi team,
>
> For default broker configuration, replica.lag.max.messages is 4000 and 
> message.max.bytes is 1Mb.
> In the extreme case, the follower(s) could lag by 4000 messages. The 
> leader must save at least
> 4000 messages to allow follower(s) to catch up. So the minimum 
> retention size is 4000Mb=4Gb.
> It is better to add this to the documentation.
>
> In our case, message.max.bytes is much larger than 1Mb and 
> replica.lag.max.messages is larger than 4000. This implicit  minimum 
> retention size is hundreds of Gb and we have hundreds of partitions on 
> each broker. We feel we have to use a disk array to run Kafka.
>
> Because the topics have different maximum message size, it makes more 
> sense to use the size gap between the leader and follower(s), e.g., 
> say the follower(s) can only lag behind the leader by 2Gb.
> This makes it easier to control the behavior the brokers and save disk 
> space.
>
> Regards,
>
> Libo
>
>


RE: monitoring followers' lag

2013-09-10 Thread Yu, Libo
Thanks, Jun. I will try it out.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, September 10, 2013 11:01 AM
To: users@kafka.apache.org
Subject: Re: monitoring followers' lag

Have you looked at the updated docs in
http://kafka.apache.org/documentation.html#monitoring ?

Thanks,

Jun


On Tue, Sep 10, 2013 at 7:59 AM, Yu, Libo  wrote:

> Hi team,
>
> I wonder if anybody can give detailed instructions on how to monitor 
> the followers' lag by using JMX. Thanks.
>
> Regards,
>
> Libo
>
>


implicit default minimum retention size per partition is 4GB.

2013-09-10 Thread Yu, Libo
Hi team,

For default broker configuration, replica.lag.max.messages is 4000 and 
message.max.bytes is 1Mb.
In the extreme case, the follower(s) could lag by 4000 messages. The leader 
must save at least
4000 messages to allow follower(s) to catch up. So the minimum retention size 
is 4000Mb=4Gb.
It is better to add this to the documentation.

In our case, message.max.bytes is much larger than 1Mb and 
replica.lag.max.messages is larger than
4000. This implicit  minimum retention size is hundreds of Gb and we have 
hundreds of partitions on
each broker. We feel we have to use a disk array to run Kafka.

Because the topics have different maximum message size, it makes more sense to 
use the size gap
between the leader and follower(s), e.g., say the follower(s) can only lag 
behind the leader by 2Gb.
This makes it easier to control the behavior the brokers and save disk space.

Regards,

Libo



RE: is it possible to commit offsets on a per stream basis?

2013-09-09 Thread Yu, Libo
Thanks, Neha. That number of connections formula is very helpful.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Monday, September 09, 2013 12:17 PM
To: users@kafka.apache.org
Subject: Re: is it possible to commit offsets on a per stream basis?

Memory might become an issue if all the connectors are part of the same 
process. But this is easily solvable by distributing the connectors over 
several machines.
Number of connections would be (# of connectors) * (# of brokers) and will 
proportionately increase with the # of connectors.

Thanks,
Neha


On Mon, Sep 9, 2013 at 9:08 AM, Yu, Libo  wrote:

> If one connector is used for a single stream, when there are many 
> topics/streams, will that cause any performance issue, e.g. too many 
> connections or too much memory or big latency?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Sunday, September 08, 2013 12:46 PM
> To: users@kafka.apache.org
> Subject: Re: is it possible to commit offsets on a per stream basis?
>
> That should be fine too.
>
>
>
>
> On Sat, Sep 7, 2013 at 8:33 PM, Jason Rosenberg  wrote:
>
> > To be clear, it looks like I forgot to add to my question, that I am 
> > asking about creating multiple connectors, within the same consumer 
> > process (as I realize I can obviously have multiple connectors 
> > running on multiple hosts, etc.).  But I'm guessing that should be fine too?
> >
> > Jason
> >
> >
> >
> >
> > On Sat, Sep 7, 2013 at 3:09 PM, Neha Narkhede 
> >  > >wrote:
> >
> > > >> Can I create multiple connectors, and have each use the same 
> > > >> Regex
> > > for the TopicFilter?  Will each connector share the set of 
> > > available topics?  Is this safe to do?
> > >
> > > >> Or is it necessary to create mutually non-intersecting regex's 
> > > >> for
> > each
> > > connector?
> > >
> > > As long as each of those consumer connectors share the same group 
> > > id,
> > Kafka
> > > consumer rebalancing should automatically re-distribute the 
> > > topic/partitions amongst the consumer connectors/streams evenly.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Mon, Sep 2, 2013 at 1:35 PM, Jason Rosenberg 
> > wrote:
> > >
> > > > Will this work if we are using a TopicFilter, that can map to 
> > > > multiple topics.  Can I create multiple connectors, and have 
> > > > each use the same
> > > Regex
> > > > for the TopicFilter?  Will each connector share the set of 
> > > > available topics?  Is this safe to do?
> > > >
> > > > Or is it necessary to create mutually non-intersecting regex's 
> > > > for each connector?
> > > >
> > > > It seems I have a similar issue.  I have been using auto commit 
> > > > mode,
> > but
> > > > it doesn't guarantee that all messages committed have been 
> > > > successfully processed (seems a change to the connector itself 
> > > > might expose a way to
> > > use
> > > > auto offset commit, and have it never commit a message until it 
> > > > is processed).  But that would be a change to the 
> > > > ZookeeperConsumerConnectorEssentially, it would be great if 
> > > > after processing each message, we could mark the message as 
> > > > 'processed', and
> > > thus
> > > > use that status as the max offset to commit when the auto offset 
> > > > commit background thread wakes up each time.
> > > >
> > > > Jason
> > > >
> > > >
> > > > On Thu, Aug 29, 2013 at 11:58 AM, Yu, Libo  wrote:
> > > >
> > > > > Thanks, Neha. That is a great answer.
> > > > >
> > > > > Regards,
> > > > >
> > > > > Libo
> > > > >
> > > > >
> > > > > -Original Message-----
> > > > > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > > > > Sent: Thursday, August 29, 2013 1:55 PM
> > > > > To: users@kafka.apache.org
> > > > > Subject: Re: is it possible to commit offsets on a per stream
> basis?
> > > > >
> > > > > 1 We can create multiple connectors. From each connector 
> > > > > create only
> > > one
> >

RE: is it possible to commit offsets on a per stream basis?

2013-09-09 Thread Yu, Libo
If one connector is used for a single stream, when there are many 
topics/streams,
will that cause any performance issue, e.g. too many connections or too much 
memory or big latency?

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Sunday, September 08, 2013 12:46 PM
To: users@kafka.apache.org
Subject: Re: is it possible to commit offsets on a per stream basis?

That should be fine too.




On Sat, Sep 7, 2013 at 8:33 PM, Jason Rosenberg  wrote:

> To be clear, it looks like I forgot to add to my question, that I am 
> asking about creating multiple connectors, within the same consumer 
> process (as I realize I can obviously have multiple connectors running 
> on multiple hosts, etc.).  But I'm guessing that should be fine too?
>
> Jason
>
>
>
>
> On Sat, Sep 7, 2013 at 3:09 PM, Neha Narkhede  >wrote:
>
> > >> Can I create multiple connectors, and have each use the same 
> > >> Regex
> > for the TopicFilter?  Will each connector share the set of available 
> > topics?  Is this safe to do?
> >
> > >> Or is it necessary to create mutually non-intersecting regex's 
> > >> for
> each
> > connector?
> >
> > As long as each of those consumer connectors share the same group 
> > id,
> Kafka
> > consumer rebalancing should automatically re-distribute the 
> > topic/partitions amongst the consumer connectors/streams evenly.
> >
> > Thanks,
> > Neha
> >
> >
> > On Mon, Sep 2, 2013 at 1:35 PM, Jason Rosenberg 
> wrote:
> >
> > > Will this work if we are using a TopicFilter, that can map to 
> > > multiple topics.  Can I create multiple connectors, and have each 
> > > use the same
> > Regex
> > > for the TopicFilter?  Will each connector share the set of 
> > > available topics?  Is this safe to do?
> > >
> > > Or is it necessary to create mutually non-intersecting regex's for 
> > > each connector?
> > >
> > > It seems I have a similar issue.  I have been using auto commit 
> > > mode,
> but
> > > it doesn't guarantee that all messages committed have been 
> > > successfully processed (seems a change to the connector itself 
> > > might expose a way to
> > use
> > > auto offset commit, and have it never commit a message until it is 
> > > processed).  But that would be a change to the 
> > > ZookeeperConsumerConnectorEssentially, it would be great if 
> > > after processing each message, we could mark the message as 
> > > 'processed', and
> > thus
> > > use that status as the max offset to commit when the auto offset 
> > > commit background thread wakes up each time.
> > >
> > > Jason
> > >
> > >
> > > On Thu, Aug 29, 2013 at 11:58 AM, Yu, Libo  wrote:
> > >
> > > > Thanks, Neha. That is a great answer.
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > > > -Original Message-
> > > > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > > > Sent: Thursday, August 29, 2013 1:55 PM
> > > > To: users@kafka.apache.org
> > > > Subject: Re: is it possible to commit offsets on a per stream basis?
> > > >
> > > > 1 We can create multiple connectors. From each connector create 
> > > > only
> > one
> > > > stream.
> > > > 2 Use a single thread for a stream. In this case, the connector 
> > > > in
> each
> > > > thread can commit freely without any dependence on the other threads.
> >  Is
> > > > this the right way to go? Will it introduce any dead lock when
> multiple
> > > > connectors commit at the same time?
> > > >
> > > > This is a better approach as there is no complex locking involved.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Thu, Aug 29, 2013 at 10:28 AM, Yu, Libo  wrote:
> > > >
> > > > > Hi team,
> > > > >
> > > > > This is our current use case:
> > > > > Assume there is a topic with multiple partitions.
> > > > > 1 Create a connector first and create multiple streams from 
> > > > > the connector for a topic.
> > > > > 2 Create multiple threads, one for each stream. You can assume 
> > > > > the thread's job is to save the message into the database.
> > > > > 3 When it is time to commit offsets, all threads have to
> synchronize
> > > > > on a barrier before committing the offsets. This is to ensure 
> > > > > no message loss in case of process crash.
> > > > >
> > > > > As all threads need to synchronize before committing, it is 
> > > > > not
> > > > efficient.
> > > > > This is a workaround:
> > > > >
> > > > > 1 We can create multiple connectors. From each connector 
> > > > > create
> only
> > > > > one stream.
> > > > > 2 Use a single thread for a stream. In this case, the 
> > > > > connector in each thread can commit freely without any 
> > > > > dependence on the other threads.  Is this the right way to go? 
> > > > > Will it introduce any dead
> > lock
> > > > > when multiple connectors commit at the same time?
> > > > >
> > > > > It would be great to allow committing on a per stream basis.
> > > > >
> > > > > Regards,
> > > > >
> > > > > Libo
> > > > >
> > > > >
> > > >
> > >
> >
>


RE: is it possible to commit offsets on a per stream basis?

2013-08-29 Thread Yu, Libo
Thanks, Neha. That is a great answer.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Thursday, August 29, 2013 1:55 PM
To: users@kafka.apache.org
Subject: Re: is it possible to commit offsets on a per stream basis?

1 We can create multiple connectors. From each connector create only one stream.
2 Use a single thread for a stream. In this case, the connector in each thread 
can commit freely without any dependence on the other threads.  Is this the 
right way to go? Will it introduce any dead lock when multiple connectors 
commit at the same time?

This is a better approach as there is no complex locking involved.

Thanks,
Neha


On Thu, Aug 29, 2013 at 10:28 AM, Yu, Libo  wrote:

> Hi team,
>
> This is our current use case:
> Assume there is a topic with multiple partitions.
> 1 Create a connector first and create multiple streams from the 
> connector for a topic.
> 2 Create multiple threads, one for each stream. You can assume the 
> thread's job is to save the message into the database.
> 3 When it is time to commit offsets, all threads have to synchronize 
> on a barrier before committing the offsets. This is to ensure no 
> message loss in case of process crash.
>
> As all threads need to synchronize before committing, it is not efficient.
> This is a workaround:
>
> 1 We can create multiple connectors. From each connector create only 
> one stream.
> 2 Use a single thread for a stream. In this case, the connector in 
> each thread can commit freely without any dependence on the other 
> threads.  Is this the right way to go? Will it introduce any dead lock 
> when multiple connectors commit at the same time?
>
> It would be great to allow committing on a per stream basis.
>
> Regards,
>
> Libo
>
>


is it possible to commit offsets on a per stream basis?

2013-08-29 Thread Yu, Libo
Hi team,

This is our current use case:
Assume there is a topic with multiple partitions.
1 Create a connector first and create multiple streams from the connector for a 
topic.
2 Create multiple threads, one for each stream. You can assume the thread's job 
is to
save the message into the database.
3 When it is time to commit offsets, all threads have to synchronize on a 
barrier
before committing the offsets. This is to ensure no message loss in case of 
process
crash.

As all threads need to synchronize before committing, it is not efficient. This 
is a workaround:

1 We can create multiple connectors. From each connector create only one stream.
2 Use a single thread for a stream. In this case, the connector in each thread 
can commit
freely without any dependence on the other threads.  Is this the right way to 
go? Will it
introduce any dead lock when multiple connectors commit at the same time?

It would be great to allow committing on a per stream basis.

Regards,

Libo



RE: zookeeper session time out

2013-08-29 Thread Yu, Libo
Thanks for your answer, Neha. Currently we didn't save the GC log.
I will add that option and keep monitoring the issue.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Wednesday, August 28, 2013 4:25 PM
To: users@kafka.apache.org
Subject: Re: zookeeper session time out

Ah, you maybe hitting the GC due to IO issue. You can confirm if this is really 
the case by looking at the gc.log on the broker and check if you see a GC entry 
with a small user and sys time but high real time. We saw a similar 
IO-causing-GC pauses problem when compressing our request log4j files which 
happens every hour or so. Since these files are large and the gzip process hogs 
the IO bandwidth, the linux box hits the dirty_ratio threshold and the kernel 
stops all threads doing I/O until all the dirty pages are flushed to disk. We 
have seen GC pauses until 15-20 seconds when this happens. A workaround is to 
increase your zookeeper session timeout higher to prevent the session 
expiration and the leader re-elections that follow.

As for your file deletion issue, we have seen that if you configure a Kafka 
broker with time based expiration, it ends up deleting possibly 100s of large 
segment files all at the same time. This puts pressure on file system 
journaling (we are using ext4 in data=ordered mode) and it slows down writes on 
the Kafka side. Kafka should throttle time based rolling as well as time based 
expiration to prevent this situation. With that said, we have never really seen 
this cause a GC pause like the one you described though.

So it will be good to investigate the root cause of your GC pause anyway.
Could you check your gc.log and send back the relevant part of the log that 
shows the pause?

Thanks,
Neha


On Wed, Aug 28, 2013 at 1:09 PM, Yu, Libo  wrote:

> Hi team,
>
> We notice when the incoming throughput is very high, the broker has to 
> delete old log files to free up disk space. That caused some kind of 
> blocking
> (latency) and
> frequently the broker's zookeeper session times out. Currently our 
> zookeeper time out threshold is 4s. We can increase it. But if this 
> threshold is too large, what is the consequence? Thanks.
>
>
> Libo
>
>


RE: questtion about log.retention.bytes

2013-08-28 Thread Yu, Libo
Jay made a good point. The docs says it is for the topic but actually 
it is for one partition. Thanks, Jun.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Wednesday, August 28, 2013 11:02 AM
To: users@kafka.apache.org
Subject: Re: questtion about log.retention.bytes

Changed the wording in the doc.

Thanks,

Jun


On Wed, Aug 28, 2013 at 7:20 AM, Jay Kreps  wrote:

> I think the problem is that there is no way to understand the meaning 
> of that config from the docs, so people keep asking over and over 
> again. The docs make iit sound like it is per topic and just says it 
> is the "maximum size before it is deleted" which makes no sense...
>
> -jay
>
> On Tuesday, August 27, 2013, Jun Rao wrote:
>
> > For the first question, yes.
> >
> > For the second one, this is documented in 
> > http://kafka.apache.org/documentation.html#brokerconfigs
> >
> > "Note that all per topic configuration properties below have the 
> > format
> of
> > csv (e.g., "topic1:value1,topic2:value2")."
> > Thanks,
> > Jun
> >
> >
> >
> > On Tue, Aug 27, 2013 at 11:52 AM, Yu, Libo  >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > In a previous email thread
> > >
> >
> http://markmail.org/search/?q=kafka+log.retention.bytes#query:kafka%20
> log.retention.bytes+page:1+mid:qnt4pbq47goii2ui+state:results
> > > ,
> > > you said log.retention.bytes is for each partition. Could you 
> > > clarify
> on
> > > that?
> > >
> > > Say if I have a topic with three partitions. I want to limit the 
> > > disk space to 1Gb for each partition.
> > > Then log.retention.bytes should be set to 1Gb (not 3Gb). Is that right?
> > >
> > > If I want to use log.retention.bytes.per.topic to set the same 
> > > limit, should it be set to 1G or 3G?
> > >
> > > Libo
> > >
> > >
> > > -Original Message-
> > > From: Jun Rao [mailto:jun...@gmail.com ]
> > > Sent: Saturday, August 17, 2013 12:40 AM
> > > To: users@kafka.apache.org 
> > > Subject: Re: questtion about log.retention.bytes
> > >
> > > log.retention.bytes is for all topics that are not included in 
> > > log.retention.bytes.per.topic (which defines a map of topic -> size).
> > >
> > > Currently, we don't have a total size limit across all topics.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Aug 16, 2013 at 2:00 PM, Paul Christian 
> > > >wrote:
> > >
> > > > According to the Kafka 8 documentation under broker configuration.
> > > > There are these parameters and their definitions.
> > > >
> > > > log.retention.bytes -1 The maximum size of the log before 
> > > > deleting it log.retention.bytes.per.topic "" The maximum size of 
> > > > the log for some specific topic before deleting it
> > > >
> > > > I'm curious what the first value 'log.retention.bytes' is for if 
> > > > the second one is for per topic logs, because aren't all logs 
> > > > generated per topic? Is this an aggregate value across topics?
> > > >
> > > > Related question, is there a parameter for kafka where you can 
> > > > say only hold this much TOTAL data across all topic ( logs/index 
> > > > together
> > )?
> > > I.e.
> > > > our  hosts have this much available space and so value 
> > > > log.retention.whatever.aggregate == 75% total  disk space.
> > > >
> > >
> >
>


zookeeper session time out

2013-08-28 Thread Yu, Libo
Hi team,

We notice when the incoming throughput is very high, the broker has to delete
old log files to free up disk space. That caused some kind of blocking 
(latency) and
frequently the broker's zookeeper session times out. Currently our zookeeper
time out threshold is 4s. We can increase it. But if this threshold is too 
large, what
is the consequence? Thanks.


Libo



RE: questtion about log.retention.bytes

2013-08-27 Thread Yu, Libo
Hi Jun,

In a previous email thread 
http://markmail.org/search/?q=kafka+log.retention.bytes#query:kafka%20log.retention.bytes+page:1+mid:qnt4pbq47goii2ui+state:results,
you said log.retention.bytes is for each partition. Could you clarify on that?

Say if I have a topic with three partitions. I want to limit the disk space to 
1Gb for each partition.
Then log.retention.bytes should be set to 1Gb (not 3Gb). Is that right?

If I want to use log.retention.bytes.per.topic to set the same limit, should it 
be set to 1G or 3G?

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Saturday, August 17, 2013 12:40 AM
To: users@kafka.apache.org
Subject: Re: questtion about log.retention.bytes

log.retention.bytes is for all topics that are not included in 
log.retention.bytes.per.topic (which defines a map of topic -> size).

Currently, we don't have a total size limit across all topics.

Thanks,

Jun


On Fri, Aug 16, 2013 at 2:00 PM, Paul Christian
wrote:

> According to the Kafka 8 documentation under broker configuration. 
> There are these parameters and their definitions.
>
> log.retention.bytes -1 The maximum size of the log before deleting it 
> log.retention.bytes.per.topic "" The maximum size of the log for some 
> specific topic before deleting it
>
> I'm curious what the first value 'log.retention.bytes' is for if the 
> second one is for per topic logs, because aren't all logs generated 
> per topic? Is this an aggregate value across topics?
>
> Related question, is there a parameter for kafka where you can say 
> only hold this much TOTAL data across all topic ( logs/index together )? I.e.
> our  hosts have this much available space and so value 
> log.retention.whatever.aggregate == 75% total  disk space.
>


RE: questions about ISR

2013-08-27 Thread Yu, Libo
Thanks, Jun. That is exactly what I want to know. 

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, August 27, 2013 11:25 AM
To: users@kafka.apache.org
Subject: Re: questions about ISR

Look for jmx beans under kafka.server. You will see ???MaxLag and 
???MinFetchRate. In the normal case, when a broker fails, the controller will 
drop the failed broker out of ISR during leader election. So, the value of 
replica.lag.time.max.ms doesn't matter. This value only matters when the 
controller's decision is delayed somehow. In this case, having a large 
replica.lag.time.max.ms may delay the committing of a message.

Thanks,

Jun


On Tue, Aug 27, 2013 at 6:37 AM, Yu, Libo  wrote:

> Thanks, Jun. That is very helpful. However, I still have a couple of 
> questions. "We have a min fetch rate JMX in the broker". How to find 
> out how such min fetch rate is defined? And if replica.lag.time.max.ms 
> is too large, what is the consequence?
>
>
>
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Tuesday, August 27, 2013 12:07 AM
> To: users@kafka.apache.org
> Subject: Re: questions about ISR
>
> I added the following in FAQ:
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Howtoreducec
> hurnsinISR%3F
>
> Thanks,
>
> Jun
>
>
> On Mon, Aug 26, 2013 at 7:46 PM, James Wu  wrote:
>
> > Hi Jun,
> >
> > I am curious Yu's questions too.
> >
> > 1. What is the best practice to set replica.lag.time.max.ms & 
> > replica.lag.max.messages ? As long as possible or something else ?
> >
> > 2. If the broker exceeds one of these 2 configurations, how should 
> > we do to bring the broker back to ISR ? Will controller automatic 
> > cover this to catch broker up, the only thing we need to do is 
> > waiting for the broker back ?
> >
> > Thanks.
> >
> >
> >
> >
> > On Mon, Aug 26, 2013 at 11:15 PM, Jun Rao  wrote:
> >
> > > That's right. You shouldn't need to restart the whole cluster for 
> > > a
> > broker
> > > to rejoin ISR. Do you see many ZK session expirations in the 
> > > brokers (search for "(Expired)"? If so, you may need to tune the 
> > > GC on the
> > broker.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Aug 26, 2013 at 7:11 AM, Yu, Libo  wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Could you confirm the following?
> > > > So after a broker is out of ISR, the only way to let it go back 
> > > > is to restart it.
> > > >
> > > > We should set replica.lag.time.max.ms and 
> > > > replica.lag.max.messages as large as possible to avoid a broker fall 
> > > > outside of ISR.
> > > >
> > > > What we have experienced is that when a broker is out of ISR 
> > > > frequently we need to restart the whole cluster to make it back.
> > > > That is a
> > blocking
> > > > issue
> > > > for us.
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > > > -Original Message-
> > > > From: Jun Rao [mailto:jun...@gmail.com]
> > > > Sent: Friday, August 23, 2013 11:41 PM
> > > > To: users@kafka.apache.org
> > > > Subject: Re: questions about ISR
> > > >
> > > > When a broker is restarted, it will automatically catch up from 
> > > > the
> > > leader
> > > > and will join ISR when it's caught up. Are you not seeing this
> > happening?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Aug 23, 2013 at 11:33 AM, Yu, Libo  wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > When a broker is not in a topic's ISR, will it try to catch up 
> > > > > to go back to ISR itself?
> > > > > Or do we have to restart it?
> > > > >
> > > > > We can increase replica.lag.time.max.ms and 
> > > > > replica.lag.max.messages to let brokers stay longer in ISR. Is 
> > > > > that good practice? Still this is related to the first 
> > > > > questions. We want to know what happens after a broker falls 
> > > > > out
> of ISR and what we should do. Thanks.
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Libo
> > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> >
> > Friendly regards,
> >
> > *James Wu <https://plus.google.com/u/0/100829801349304669533>
> > *
> >
>


Question about preferred replica leader election tool

2013-08-27 Thread Yu, Libo
Hi,

We have three brokers in our kafka cluster. For all topics, the replica factor 
is two.
Here is the distribution of leaders. After I ran the leader election tool, 
nothing
happened. In this list, the first broker in ISR is the leader. I assume after 
running
the tool, the first broker is replicas should be elected to the leader.  Any 
idea why
this does not work? Thanks.

topic: example.topic.one  partition: 0leader: 1   replicas: 2,1 
 isr: 1,2
topic: example.topic.one  partition: 1leader: 2   replicas: 3,2 
 isr: 2,3
topic: example.topic.one  partition: 2leader: 1   replicas: 1,3 
 isr: 1,3
topic: example.topic.twopartition: 0leader: 1   replicas: 3,1  isr: 
1,3
topic: example.topic.twopartition: 1leader: 1   replicas: 1,2  isr: 
1,2
topic: example.topic.twopartition: 2leader: 2   replicas: 2,3  isr: 
2,3

Regards,

Libo



RE: questions about ISR

2013-08-27 Thread Yu, Libo
Thanks, Jun. That is very helpful. However, I still have a couple of
questions. "We have a min fetch rate JMX in the broker". How to 
find out how such min fetch rate is defined? And if  
replica.lag.time.max.ms is too large, what is the consequence?




Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Tuesday, August 27, 2013 12:07 AM
To: users@kafka.apache.org
Subject: Re: questions about ISR

I added the following in FAQ:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowtoreducechurnsinISR%3F

Thanks,

Jun


On Mon, Aug 26, 2013 at 7:46 PM, James Wu  wrote:

> Hi Jun,
>
> I am curious Yu's questions too.
>
> 1. What is the best practice to set replica.lag.time.max.ms & 
> replica.lag.max.messages ? As long as possible or something else ?
>
> 2. If the broker exceeds one of these 2 configurations, how should we 
> do to bring the broker back to ISR ? Will controller automatic cover 
> this to catch broker up, the only thing we need to do is waiting for 
> the broker back ?
>
> Thanks.
>
>
>
>
> On Mon, Aug 26, 2013 at 11:15 PM, Jun Rao  wrote:
>
> > That's right. You shouldn't need to restart the whole cluster for a
> broker
> > to rejoin ISR. Do you see many ZK session expirations in the brokers 
> > (search for "(Expired)"? If so, you may need to tune the GC on the
> broker.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Aug 26, 2013 at 7:11 AM, Yu, Libo  wrote:
> >
> > > Hi Jun,
> > >
> > > Could you confirm the following?
> > > So after a broker is out of ISR, the only way to let it go back is 
> > > to restart it.
> > >
> > > We should set replica.lag.time.max.ms and replica.lag.max.messages  
> > > as large as possible to avoid a broker fall outside of ISR.
> > >
> > > What we have experienced is that when a broker is out of ISR 
> > > frequently we need to restart the whole cluster to make it back. 
> > > That is a
> blocking
> > > issue
> > > for us.
> > >
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> > > -Original Message-
> > > From: Jun Rao [mailto:jun...@gmail.com]
> > > Sent: Friday, August 23, 2013 11:41 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: questions about ISR
> > >
> > > When a broker is restarted, it will automatically catch up from 
> > > the
> > leader
> > > and will join ISR when it's caught up. Are you not seeing this
> happening?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Aug 23, 2013 at 11:33 AM, Yu, Libo  wrote:
> > >
> > > > Hi,
> > > >
> > > > When a broker is not in a topic's ISR, will it try to catch up 
> > > > to go back to ISR itself?
> > > > Or do we have to restart it?
> > > >
> > > > We can increase replica.lag.time.max.ms and 
> > > > replica.lag.max.messages to let brokers stay longer in ISR. Is 
> > > > that good practice? Still this is related to the first 
> > > > questions. We want to know what happens after a broker falls out of ISR 
> > > > and what we should do. Thanks.
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > >
> >
>
>
>
> --
>
> Friendly regards,
>
> *James Wu <https://plus.google.com/u/0/100829801349304669533>
> *
>


RE: questions about ISR

2013-08-26 Thread Yu, Libo
Hi Jun,

Could you confirm the following?
So after a broker is out of ISR, the only way to let it go back is to restart 
it.

We should set replica.lag.time.max.ms and replica.lag.max.messages  as 
large as possible to avoid a broker fall outside of ISR.

What we have experienced is that when a broker is out of ISR frequently 
we need to restart the whole cluster to make it back. That is a blocking issue 
for us.


Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Friday, August 23, 2013 11:41 PM
To: users@kafka.apache.org
Subject: Re: questions about ISR

When a broker is restarted, it will automatically catch up from the leader and 
will join ISR when it's caught up. Are you not seeing this happening?

Thanks,

Jun


On Fri, Aug 23, 2013 at 11:33 AM, Yu, Libo  wrote:

> Hi,
>
> When a broker is not in a topic's ISR, will it try to catch up to go 
> back to ISR itself?
> Or do we have to restart it?
>
> We can increase replica.lag.time.max.ms and replica.lag.max.messages 
> to let brokers stay longer in ISR. Is that good practice? Still this 
> is related to the first questions. We want to know what happens after 
> a broker falls out of ISR and what we should do. Thanks.
>
>
> Regards,
>
> Libo
>
>


RE: delete a topic

2013-08-23 Thread Yu, Libo
Here is the list of paths that should be deleted:
/brokers/topics/my_topic
/consumers/[groupId]/owners/my_topic
/consumers/[groupId]/offsets/my_topic (if the topic has been consumed)
Let me know if I miss anything. Thanks.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Friday, August 23, 2013 4:35 PM
To: users@kafka.apache.org
Subject: Re: delete a topic

You can either try using rmr on zookeeper client 3.4.x (./zkCli.sh rmr
 /node) or write your own script that does a recursive delete -
https://issues.apache.org/jira/browse/ZOOKEEPER-729

Thanks,
Neha


On Fri, Aug 23, 2013 at 10:52 AM, Yu, Libo  wrote:

> I will give it a try. I know how to delete log files. But to delete 
> the zookeeper data, do I only need to run the delete script?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Friday, August 23, 2013 1:43 PM
> To: users@kafka.apache.org
> Subject: Re: delete a topic
>
> Without the proper functionality of delete topic, I'm not sure if 
> deleting the zookeeper and kafka data just for that topic is enough or 
> not. On the surface, this approach seems sufficient, but I'm not sure 
> of all the consequences of doing that. Also, remember to bounce the 
> entire cluster once you've deleted the zookeeper and kafka data for the topic 
> in question.
>
> Can you give it a try and let us know how it went?
>
> Thanks,
> Neha
>
>
> On Fri, Aug 23, 2013 at 10:15 AM, Yu, Libo  wrote:
>
> > Hi Neha,
> >
> > One more questions. Assume I want to delete a topic. When you say 
> > deleting zookeeper data and kafka data, do you mean deleting 
> > zookeeper data and kafka data for ALL the topics or only for that 
> > particular topic?
> >
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > Sent: Friday, August 23, 2013 11:33 AM
> > To: users@kafka.apache.org
> > Subject: Re: delete a topic
> >
> > By that, I meant bringing down all brokers, deleting zookeeper data 
> > and kafka data and restarting the brokers. I suspect attempting a 
> > delete topic might have caused something bad to happen on the ISR 
> > side. It will be great if you can start clean and then see if the 
> > ISR is
> still an issue.
> >
> > Also, I filed a bug to include a tool that makes it easy to check 
> > the replica lag and progress, while troubleshooting ISR membership 
> > issues
> > -
> > https://issues.apache.org/jira/browse/KAFKA-1021
> >
> > Thanks,
> > Neha
> >
> >
> > On Fri, Aug 23, 2013 at 6:34 AM, Yu, Libo  wrote:
> >
> > > Hi Neha,
> > >
> > > "Wipe out the cluster" Do you mean you uninstall the cluster and 
> > > reinstall it?
> > > Or you just delete all kafka data and zookeeper data for the cluster?
> > > This is not a blocking issue for us. Our blocking issue is that 
> > > some broker will fall out of ISR and never get back to it. We have 
> > > observed that from more that one clusters. I am not sure if it is 
> > > related to the deleting issue.
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> > > -Original Message-
> > > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > > Sent: Thursday, August 22, 2013 5:01 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: delete a topic
> > >
> > > In production, we are not deleting topics yet. In test 
> > > environments, if we have to delete topics, we wipe out the 
> > > cluster. If this is a feature that most users are blocked on, I 
> > > think it makes sense to prioritize 
> > > https://issues.apache.org/jira/browse/KAFKA-330.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Thu, Aug 22, 2013 at 1:01 PM, Vadim Keylis 
> > > 
> > > wrote:
> > >
> > > > How do you guys delete a topic if such need arise?
> > > >
> > > >
> > > > On Thu, Aug 22, 2013 at 12:29 PM, Neha Narkhede 
> > > >  > > > >wrote:
> > > >
> > > > > I mentioned this in a different thread. I think the
> > > > 0.8.0-beta1-candidate1
> > > > > includes the script, but we have removed it later. I think 
> > > > > when we
> > > > publish
> > > > > 0.8-final, this should be resolved.
>

questions about ISR

2013-08-23 Thread Yu, Libo
Hi,

When a broker is not in a topic's ISR, will it try to catch up to go back to 
ISR itself?
Or do we have to restart it?

We can increase replica.lag.time.max.ms and replica.lag.max.messages
to let brokers stay longer in ISR. Is that good practice? Still this is
related to the first questions. We want to know what happens after
a broker falls out of ISR and what we should do. Thanks.


Regards,

Libo



RE: delete a topic

2013-08-23 Thread Yu, Libo
I will give it a try. I know how to delete log files. But to delete the 
zookeeper data,
do I only need to run the delete script?

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Friday, August 23, 2013 1:43 PM
To: users@kafka.apache.org
Subject: Re: delete a topic

Without the proper functionality of delete topic, I'm not sure if deleting the 
zookeeper and kafka data just for that topic is enough or not. On the surface, 
this approach seems sufficient, but I'm not sure of all the consequences of 
doing that. Also, remember to bounce the entire cluster once you've deleted the 
zookeeper and kafka data for the topic in question.

Can you give it a try and let us know how it went?

Thanks,
Neha


On Fri, Aug 23, 2013 at 10:15 AM, Yu, Libo  wrote:

> Hi Neha,
>
> One more questions. Assume I want to delete a topic. When you say 
> deleting zookeeper data and kafka data, do you mean deleting zookeeper 
> data and kafka data for ALL the topics or only for that particular 
> topic?
>
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Friday, August 23, 2013 11:33 AM
> To: users@kafka.apache.org
> Subject: Re: delete a topic
>
> By that, I meant bringing down all brokers, deleting zookeeper data 
> and kafka data and restarting the brokers. I suspect attempting a 
> delete topic might have caused something bad to happen on the ISR 
> side. It will be great if you can start clean and then see if the ISR is 
> still an issue.
>
> Also, I filed a bug to include a tool that makes it easy to check the 
> replica lag and progress, while troubleshooting ISR membership issues 
> -
> https://issues.apache.org/jira/browse/KAFKA-1021
>
> Thanks,
> Neha
>
>
> On Fri, Aug 23, 2013 at 6:34 AM, Yu, Libo  wrote:
>
> > Hi Neha,
> >
> > "Wipe out the cluster" Do you mean you uninstall the cluster and 
> > reinstall it?
> > Or you just delete all kafka data and zookeeper data for the cluster?
> > This is not a blocking issue for us. Our blocking issue is that some 
> > broker will fall out of ISR and never get back to it. We have 
> > observed that from more that one clusters. I am not sure if it is 
> > related to the deleting issue.
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > Sent: Thursday, August 22, 2013 5:01 PM
> > To: users@kafka.apache.org
> > Subject: Re: delete a topic
> >
> > In production, we are not deleting topics yet. In test environments, 
> > if we have to delete topics, we wipe out the cluster. If this is a 
> > feature that most users are blocked on, I think it makes sense to 
> > prioritize https://issues.apache.org/jira/browse/KAFKA-330.
> >
> > Thanks,
> > Neha
> >
> >
> > On Thu, Aug 22, 2013 at 1:01 PM, Vadim Keylis 
> > 
> > wrote:
> >
> > > How do you guys delete a topic if such need arise?
> > >
> > >
> > > On Thu, Aug 22, 2013 at 12:29 PM, Neha Narkhede 
> > >  > > >wrote:
> > >
> > > > I mentioned this in a different thread. I think the
> > > 0.8.0-beta1-candidate1
> > > > includes the script, but we have removed it later. I think when 
> > > > we
> > > publish
> > > > 0.8-final, this should be resolved.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Thu, Aug 22, 2013 at 12:24 PM, Neha Narkhede 
> > > >  > > > >wrote:
> > > >
> > > > > Vadim,
> > > > >
> > > > > The JIRA tracking this feature is 
> > > > > https://issues.apache.org/jira/browse/KAFKA-330.
> > > > > Until we have this feature, we should remove any scripts that 
> > > > > give the impression of deleting a topic, as Jay suggested. I 
> > > > > tried looking under bin/ but couldn't find any delete topic or 
> > > > > partition
> > script.
> > > > >
> > > > > Can you tell us which script you are using?
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > >
> > > > > On Thu, Aug 22, 2013 at 11:40 AM, Vadim Keylis 
> > > > > > > > >wrote:
> > > > >
> > > > >> Jay or Neha. What is the process of deleting the topic if the 

RE: KeeperErrorCode = BadVersion

2013-08-23 Thread Yu, Libo
Hi Neha,

This is not a logging issue. That broker' log is flooded by this exception.
The exception first appeared early this morning and kept flooding the log. 
The broker is not in any topic's ISR so it will not be a leader.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Friday, August 23, 2013 1:40 PM
To: users@kafka.apache.org
Subject: Re: KeeperErrorCode = BadVersion

I think we can certainly improve the logging, but this is normal when the 
leader tries to shrink/expand the ISR when the controller is in the middle of 
shrinking ISR or electing a new leader for the same partition.

Could you please file a JIRA to improve the quality of logging in this case?

Thanks,
Neha


On Fri, Aug 23, 2013 at 10:28 AM, Yu, Libo  wrote:

> Hi team,
>
> During normal operation, all of a sudden, we found many exceptions in 
> the log like this:
>
> It seems one thread' zookeeper's data is written unexpectedly by some 
> other thread.
> Any expertise will be appreciated.
>
> [2013-08-23 13:17:00,622] INFO Partition [our.own.topic one.default,0] 
> on broker 1: Cached zkVersion [4] not equal to that in zo okeeper, 
> skip updating ISR (kafka.cluster.Partition)
> [2013-08-23 13:17:00,622] INFO Partition [our.own.topic.two.default,2] 
> on broker 1: Shrinking ISR for partition [our.own.topic.two,2] from 
> 1,2,3 to 1 (kafka.cluster.Partition)
> [2013-08-23 13:17:00,623] ERROR Conditional update of path /brokers/t 
> opics/our.own.topic.two/partitions/2/state with d ata { 
> "controller_epoch":81, "isr":[ 1 ], "leader":1, "leader_epoch":
> 0, "version":1 } and expected version 10 failed (kafka.utils.ZkUtils$
> )
> org.I0Itec.zkclient.exception.ZkBadVersionException: org.apache.zooke
> eper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion 
> for /brokers/topics/our.own.topic.two/partitions/2/state
> at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:51)
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> at
> kafka.utils.ZkUtils$.conditionalUpdatePersistentPath(ZkUtils.scala:330)
> at kafka.cluster.Partition.updateIsr(Partition.scala:347)
> at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:291)
> at
> kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:285)
> at
> kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:285)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
> at
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$maybeShrinkIsr(ReplicaManager.scala:285)
> at
> kafka.server.ReplicaManager$$anonfun$startup$1.apply$mcV$sp(ReplicaManager.scala:108)
> at kafka.utils.Utils$$anon$2.run(Utils.scala:67)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.zookeeper.KeeperException$BadVersionException:
> KeeperErrorCode = BadVersion for
> /brokers/topics/our.own.topic.two/partitions/2/state
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:106)
> at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1044)
> at
> org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> ... 18 more
>
> Regards,
>
> Libo
>
>


KeeperErrorCode = BadVersion

2013-08-23 Thread Yu, Libo
Hi team,

During normal operation, all of a sudden, we found many exceptions in the log 
like this:

It seems one thread' zookeeper's data is written unexpectedly by some other 
thread.
Any expertise will be appreciated.

[2013-08-23 13:17:00,622] INFO Partition [our.own.topic
one.default,0] on broker 1: Cached zkVersion [4] not equal to that in zo
okeeper, skip updating ISR (kafka.cluster.Partition)
[2013-08-23 13:17:00,622] INFO Partition [our.own.topic.two.default,2]
on broker 1: Shrinking ISR for partition [our.own.topic.two,2] from 1,2,3 to 1 
(kafka.cluster.Partition)
[2013-08-23 13:17:00,623] ERROR Conditional update of path /brokers/t
opics/our.own.topic.two/partitions/2/state with d
ata { "controller_epoch":81, "isr":[ 1 ], "leader":1, "leader_epoch":
0, "version":1 } and expected version 10 failed (kafka.utils.ZkUtils$
)
org.I0Itec.zkclient.exception.ZkBadVersionException: org.apache.zooke
eper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for 
/brokers/topics/our.own.topic.two/partitions/2/state
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:51)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
at 
kafka.utils.ZkUtils$.conditionalUpdatePersistentPath(ZkUtils.scala:330)
at kafka.cluster.Partition.updateIsr(Partition.scala:347)
at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:291)
at 
kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:285)
at 
kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:285)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$maybeShrinkIsr(ReplicaManager.scala:285)
at 
kafka.server.ReplicaManager$$anonfun$startup$1.apply$mcV$sp(ReplicaManager.scala:108)
at kafka.utils.Utils$$anon$2.run(Utils.scala:67)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: 
KeeperErrorCode = BadVersion for 
/brokers/topics/our.own.topic.two/partitions/2/state
at org.apache.zookeeper.KeeperException.create(KeeperException.java:106)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1044)
at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
... 18 more

Regards,

Libo



RE: delete a topic

2013-08-23 Thread Yu, Libo
Hi Neha,

One more questions. Assume I want to delete a topic. When you say deleting 
zookeeper data and kafka data,
do you mean deleting zookeeper data and kafka data for ALL the topics or only 
for that particular topic?


Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Friday, August 23, 2013 11:33 AM
To: users@kafka.apache.org
Subject: Re: delete a topic

By that, I meant bringing down all brokers, deleting zookeeper data and kafka 
data and restarting the brokers. I suspect attempting a delete topic might have 
caused something bad to happen on the ISR side. It will be great if you can 
start clean and then see if the ISR is still an issue.

Also, I filed a bug to include a tool that makes it easy to check the replica 
lag and progress, while troubleshooting ISR membership issues -
https://issues.apache.org/jira/browse/KAFKA-1021

Thanks,
Neha


On Fri, Aug 23, 2013 at 6:34 AM, Yu, Libo  wrote:

> Hi Neha,
>
> "Wipe out the cluster" Do you mean you uninstall the cluster and 
> reinstall it?
> Or you just delete all kafka data and zookeeper data for the cluster?
> This is not a blocking issue for us. Our blocking issue is that some 
> broker will fall out of ISR and never get back to it. We have observed 
> that from more that one clusters. I am not sure if it is related to 
> the deleting issue.
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Thursday, August 22, 2013 5:01 PM
> To: users@kafka.apache.org
> Subject: Re: delete a topic
>
> In production, we are not deleting topics yet. In test environments, 
> if we have to delete topics, we wipe out the cluster. If this is a 
> feature that most users are blocked on, I think it makes sense to 
> prioritize https://issues.apache.org/jira/browse/KAFKA-330.
>
> Thanks,
> Neha
>
>
> On Thu, Aug 22, 2013 at 1:01 PM, Vadim Keylis 
> wrote:
>
> > How do you guys delete a topic if such need arise?
> >
> >
> > On Thu, Aug 22, 2013 at 12:29 PM, Neha Narkhede 
> >  > >wrote:
> >
> > > I mentioned this in a different thread. I think the
> > 0.8.0-beta1-candidate1
> > > includes the script, but we have removed it later. I think when we
> > publish
> > > 0.8-final, this should be resolved.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Thu, Aug 22, 2013 at 12:24 PM, Neha Narkhede 
> > >  > > >wrote:
> > >
> > > > Vadim,
> > > >
> > > > The JIRA tracking this feature is 
> > > > https://issues.apache.org/jira/browse/KAFKA-330.
> > > > Until we have this feature, we should remove any scripts that 
> > > > give the impression of deleting a topic, as Jay suggested. I 
> > > > tried looking under bin/ but couldn't find any delete topic or 
> > > > partition
> script.
> > > >
> > > > Can you tell us which script you are using?
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Thu, Aug 22, 2013 at 11:40 AM, Vadim Keylis 
> > > > > > >wrote:
> > > >
> > > >> Jay or Neha. What is the process of deleting the topic if the 
> > > >> script included in the repository creates false impression?
> > > >>
> > > >> Thanks,
> > > >> Vadim
> > > >>
> > > >>
> > > >> On Thu, Aug 22, 2013 at 11:14 AM, Jay Kreps 
> > > >> 
> > > wrote:
> > > >>
> > > >> > We should really remove the delete script from 0.8 if we plan 
> > > >> > to
> > > release
> > > >> > without delete support. Right now it appears to work but 
> > > >> > doesn't
> > which
> > > >> is
> > > >> > clearly not good.
> > > >> >
> > > >> > -Jay
> > > >> >
> > > >> >
> > > >> > On Thu, Aug 22, 2013 at 10:57 AM, Neha Narkhede <
> > > >> neha.narkh...@gmail.com
> > > >> > >wrote:
> > > >> >
> > > >> > > We currently don't have the delete topic feature in Kafka 
> > > >> > > 0.8. So
> > > any
> > > >> > > manual attempts to do so might have a negative impact on
> > > >> functionality.
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Neha
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Aug 22, 2013 at 10:30 AM, Yu, Libo 
> > > >> > > 
> > > wrote:
> > > >> > >
> > > >> > > > Hi team,
> > > >> > > >
> > > >> > > > When I delete a topic, the topic is deleted from 
> > > >> > > > zookeeper but
> > its
> > > >> log
> > > >> > > > files are not deleted from Brokers.
> > > >> > > >
> > > >> > > > When I restart a  broker, the broker will try to sync the 
> > > >> > > > log
> > > files
> > > >> > whose
> > > >> > > > topic has been deleted.
> > > >> > > > Manually deleting the log files will resolve the issue.
> > > >> > > > Should
> > > >> broker
> > > >> > > > ignore log files whose topic has been deleted?
> > > >> > > >
> > > >> > > > Regards,
> > > >> > > >
> > > >> > > > Libo
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>


RE: Producer message ordering problem

2013-08-23 Thread Yu, Libo
An auto-increment index can be assigned to a message as a key when it is being 
published.
The consumer can monitor this index when receiving. If the expected message 
does not 
show up,  buffer all received messages in a hashtable (use index as hash key) 
until it is 
received. Then handle all messages in  the hashtable. 

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Friday, August 23, 2013 11:20 AM
To: users@kafka.apache.org
Subject: Re: Producer message ordering problem

Ross,

This is a general issue with resending. Since resending is typically done on a 
new socket, essentially new messages are sent from a new instance of producer. 
So, there is no easy way to ensure that the new messages are ordered behind the 
ones sent by the old instance of the producer. So 0.8 will have similar issues. 
It may be possible to add some sort of per client sequence id and track that in 
the broker. But this may not be trivial and will need more thoughts.

Thanks,

Jun



On Thu, Aug 22, 2013 at 9:32 PM, Ross Black  wrote:

> Hi,
>
> I am using Kafka 0.7.1, and using the low-level SyncProducer to send 
> messages to a *single* partition from a *single* thread.
> The client sends messages that contain sequential numbers so it is 
> obvious at the consumer when message order is shuffled.
> I have noticed that messages can be saved out-or-order by Kafka when 
> there are connection problems, and am looking for possible solutions 
> (I think I already know the cause).
>
> The client sends messages in a retry loop so that it will wait for a 
> short period and then retry to send on any IO errors.  In 
> SyncProducer, any IOException triggers a disconnect.  Next time send 
> is called a new connection is established.  I believe that it is this 
> disconnect/reconnect cycle that can cause messages to be saved to the 
> kafka log in a different order to that of the client.
>
> I had previously had the same sort of issue with 
> reconnect.interval/time, which was fixed by disabling those reconnect 
> settings.
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201305.mbox/%3CCA
> M%2BbZhjssxmUhn_L%3Do0bGsD7PAXFGQHRpOKABcLz29vF3cNOzA%40mail.gmail.com
> %3E
>
> Is there anything in 0.7 that would allow me to solve this problem?  
> The only option I can see at the moment is to not perform retries.
>
> Does 0.8 handle this issue any differently?
>
> Thanks,
> Ross
>


consumer question

2013-08-23 Thread Yu, Libo
Hi team,

Right now, from a stream, an iterator can be obtained which has a blocking 
hasNext().
So what is the implementation behind the iterator? I assume there must be queue 
and
the iterator monitors the queue. And a separate thread fetches data and feeds 
to the
queue when it is almost empty. If that is the case, no more optimization needs 
to be
done by our users. Please confirm if there is a separate thread that fetches 
data and
feeds to the queue. Thanks.


Regards,

Libo



RE: delete a topic

2013-08-23 Thread Yu, Libo
Hi Neha,

"Wipe out the cluster" Do you mean you uninstall the cluster and reinstall it?
Or you just delete all kafka data and zookeeper data for the cluster?
This is not a blocking issue for us. Our blocking issue is that some broker 
will 
fall out of ISR and never get back to it. We have observed that from more
that one clusters. I am not sure if it is related to the deleting issue.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Thursday, August 22, 2013 5:01 PM
To: users@kafka.apache.org
Subject: Re: delete a topic

In production, we are not deleting topics yet. In test environments, if we have 
to delete topics, we wipe out the cluster. If this is a feature that most users 
are blocked on, I think it makes sense to prioritize 
https://issues.apache.org/jira/browse/KAFKA-330.

Thanks,
Neha


On Thu, Aug 22, 2013 at 1:01 PM, Vadim Keylis  wrote:

> How do you guys delete a topic if such need arise?
>
>
> On Thu, Aug 22, 2013 at 12:29 PM, Neha Narkhede 
>  >wrote:
>
> > I mentioned this in a different thread. I think the
> 0.8.0-beta1-candidate1
> > includes the script, but we have removed it later. I think when we
> publish
> > 0.8-final, this should be resolved.
> >
> > Thanks,
> > Neha
> >
> >
> > On Thu, Aug 22, 2013 at 12:24 PM, Neha Narkhede 
> >  > >wrote:
> >
> > > Vadim,
> > >
> > > The JIRA tracking this feature is
> > > https://issues.apache.org/jira/browse/KAFKA-330.
> > > Until we have this feature, we should remove any scripts that give 
> > > the impression of deleting a topic, as Jay suggested. I tried 
> > > looking under bin/ but couldn't find any delete topic or partition script.
> > >
> > > Can you tell us which script you are using?
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Thu, Aug 22, 2013 at 11:40 AM, Vadim Keylis 
> > > > >wrote:
> > >
> > >> Jay or Neha. What is the process of deleting the topic if the 
> > >> script included in the repository creates false impression?
> > >>
> > >> Thanks,
> > >> Vadim
> > >>
> > >>
> > >> On Thu, Aug 22, 2013 at 11:14 AM, Jay Kreps 
> > wrote:
> > >>
> > >> > We should really remove the delete script from 0.8 if we plan 
> > >> > to
> > release
> > >> > without delete support. Right now it appears to work but 
> > >> > doesn't
> which
> > >> is
> > >> > clearly not good.
> > >> >
> > >> > -Jay
> > >> >
> > >> >
> > >> > On Thu, Aug 22, 2013 at 10:57 AM, Neha Narkhede <
> > >> neha.narkh...@gmail.com
> > >> > >wrote:
> > >> >
> > >> > > We currently don't have the delete topic feature in Kafka 
> > >> > > 0.8. So
> > any
> > >> > > manual attempts to do so might have a negative impact on
> > >> functionality.
> > >> > >
> > >> > > Thanks,
> > >> > > Neha
> > >> > >
> > >> > >
> > >> > > On Thu, Aug 22, 2013 at 10:30 AM, Yu, Libo 
> > wrote:
> > >> > >
> > >> > > > Hi team,
> > >> > > >
> > >> > > > When I delete a topic, the topic is deleted from zookeeper 
> > >> > > > but
> its
> > >> log
> > >> > > > files are not deleted from
> > >> > > > Brokers.
> > >> > > >
> > >> > > > When I restart a  broker, the broker will try to sync the 
> > >> > > > log
> > files
> > >> > whose
> > >> > > > topic has been deleted.
> > >> > > > Manually deleting the log files will resolve the issue. 
> > >> > > > Should
> > >> broker
> > >> > > > ignore log files whose topic has been deleted?
> > >> > > >
> > >> > > > Regards,
> > >> > > >
> > >> > > > Libo
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>


partition does not exist (but it is there).

2013-08-22 Thread Yu, Libo
This is from the broker 3's log:
[2013-08-22 15:40:02,984] WARN [KafkaApi-3] Fetch request: Partition [tes
t.replica1.default,0] doesn't exist on 3 (kafka.server.KafkaApis)

Here is what list topic command shows:
topic: test.replica1.defaultpartition: 0leader: 3   replicas: 3 
isr: 3
topic: test.replica1.defaultpartition: 1leader: 1   replicas: 1 
isr: 1
topic: test.replica1.defaultpartition: 2leader: 2   replicas: 2 
isr: 2

Any idea on where the warning is from?

Regards,

Libo



RE: delete a topic

2013-08-22 Thread Yu, Libo
After I deleted a topic using the delete script and deleted all the log files, 
the brokers still 
tried to sync for that topic. It seems a cluster restart is required.

Regards,

Libo


-Original Message-
From: Vadim Keylis [mailto:vkeylis2...@gmail.com] 
Sent: Thursday, August 22, 2013 2:41 PM
To: users@kafka.apache.org
Subject: Re: delete a topic

Jay or Neha. What is the process of deleting the topic if the script included 
in the repository creates false impression?

Thanks,
Vadim


On Thu, Aug 22, 2013 at 11:14 AM, Jay Kreps  wrote:

> We should really remove the delete script from 0.8 if we plan to 
> release without delete support. Right now it appears to work but 
> doesn't which is clearly not good.
>
> -Jay
>
>
> On Thu, Aug 22, 2013 at 10:57 AM, Neha Narkhede 
>  >wrote:
>
> > We currently don't have the delete topic feature in Kafka 0.8. So 
> > any manual attempts to do so might have a negative impact on functionality.
> >
> > Thanks,
> > Neha
> >
> >
> > On Thu, Aug 22, 2013 at 10:30 AM, Yu, Libo  wrote:
> >
> > > Hi team,
> > >
> > > When I delete a topic, the topic is deleted from zookeeper but its 
> > > log files are not deleted from Brokers.
> > >
> > > When I restart a  broker, the broker will try to sync the log 
> > > files
> whose
> > > topic has been deleted.
> > > Manually deleting the log files will resolve the issue. Should 
> > > broker ignore log files whose topic has been deleted?
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> >
>


RE: issue with kafka-preferred-replica-election.sh

2013-08-22 Thread Yu, Libo
Thanks, Guozhang

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Thursday, August 22, 2013 12:42 PM
To: users@kafka.apache.org
Subject: Re: issue with kafka-preferred-replica-election.sh

Hello Libo,

I have created the Jira for this issue:

https://issues.apache.org/jira/browse/KAFKA-1019

Guozhang


On Wed, Aug 21, 2013 at 11:27 AM, Yu, Libo  wrote:

> We never deleted it. Either it was never created or deleted somehow.
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: Wednesday, August 21, 2013 11:41 AM
> To: users@kafka.apache.org
> Subject: Re: issue with kafka-preferred-replica-election.sh
>
> Libo,
>
> Just want to clarify, in your case after you created your topic, the 
> /brokers/topics/my_topic/partitions was never created or it was 
> deleted somehow?
>
> Guozhang
>
>
>
> On Wed, Aug 21, 2013 at 7:25 AM, Yu, Libo  wrote:
>
> > For the path /brokers/topics/my_topic/partitions, if you remove 
> > partitions and run kafka-preferred-replica-election.sh, it 
> > terminates right away with keeper exception.
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > Sent: Wednesday, August 21, 2013 12:23 AM
> > To: users@kafka.apache.org
> > Subject: Re: issue with kafka-preferred-replica-election.sh
> >
> > Libo,
> >
> > Sorry for the late reply. I will file a JIRA for this one. Could you 
> > please provide the full process and environment to reproduce this issue?
> >
> > Guozhang
> >
> >
> > On Tue, Aug 20, 2013 at 1:10 PM, Yu, Libo  wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks for the detailed reply. All the brokers are running.
> > > The path not created is 
> > > /brokers/topics/uattoqaaa.default/partitions
> > > not
> > > /brokers/topics/uattoqaaa.default/partitions/[partition-id]/state
> > > Actually, I found quite a few topics don't have the issue. The 
> > > controller must have failed to create them. I will keep monitoring 
> > > this
> > issue.
> > >
> > > Currently, when /brokers/topics/my_path/partitions is not there, 
> > > kafka-preferred-replica-election.sh stops right away with nonode 
> > > keeper exception. Could you file a JIRA to make it keep working in 
> > > that scenario?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Libo
> > >
> > >
> > > -Original Message-
> > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > Sent: Tuesday, August 20, 2013 3:56 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: issue with kafka-preferred-replica-election.sh
> > >
> > > The create command tool will only create the path 
> > > /brokers/topics/uattoqaaa.default, and then controller, once 
> > > noticing the change in ZK about the added topic, will elect 
> > > leaders from the assigned replicas written in 
> > > /brokers/topics/uattoqaaa.default and then create
> > /brokers/topics/uattoqaaa.default/partitions/[partition-id]/state.
> > >
> > > If you have created the topic for a long time but 
> > > /brokers/topics/uattoqaaa.default/partitions/[partition-id]/state 
> > > is not created by controller, it could because the replicas 
> > > specified in /brokers/topics/uattoqaaa.default does not exist yet. 
> > > Could you verify the brokers specified in this path are up and running?
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Aug 20, 2013 at 12:14 PM, Yu, Libo  wrote:
> > >
> > > > One more question:
> > > > "The create topic command will not immediately create the path; 
> > > > it will be created by the controller later during the creation 
> > > > of partitions and assign replicas to partitions."
> > > >
> > > > So when will the controller create the path and partitions? When 
> > > > the first message is published to the topic and partition?
> > > > All the partitions are created at once?
> > > >
> > > > Regards,
> > > >
> > > > Libo
> > > >
> > > >
> > > > -Original Message-
> > > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > > Sent: Tuesday, August 20, 2013 1:48 PM
> > > > To: users@kafka.apache.org
> 

RE: delete a topic

2013-08-22 Thread Yu, Libo
Thanks, Neha.

Regards,

Libo


-Original Message-
From: Neha Narkhede [mailto:neha.narkh...@gmail.com] 
Sent: Thursday, August 22, 2013 1:57 PM
To: users@kafka.apache.org
Subject: Re: delete a topic

We currently don't have the delete topic feature in Kafka 0.8. So any manual 
attempts to do so might have a negative impact on functionality.

Thanks,
Neha


On Thu, Aug 22, 2013 at 10:30 AM, Yu, Libo  wrote:

> Hi team,
>
> When I delete a topic, the topic is deleted from zookeeper but its log 
> files are not deleted from Brokers.
>
> When I restart a  broker, the broker will try to sync the log files 
> whose topic has been deleted.
> Manually deleting the log files will resolve the issue. Should broker 
> ignore log files whose topic has been deleted?
>
> Regards,
>
> Libo
>
>


delete a topic

2013-08-22 Thread Yu, Libo
Hi team,

When I delete a topic, the topic is deleted from zookeeper but its log files 
are not deleted from
Brokers.

When I restart a  broker, the broker will try to sync the log files whose topic 
has been deleted.
Manually deleting the log files will resolve the issue. Should broker ignore 
log files whose topic
has been deleted?

Regards,

Libo



RE: ordering

2013-08-22 Thread Yu, Libo
Hi Jun,

This is my original concern:
Assume the number of consumer threads is the same as the number of partitions 
of a topic.
And we want to leverage the fact that messages in a partition are ordered. But 
when starting 
multiple consumer threads or rebalancing, it is possible that one consumer 
thread will 
temporarily consume two or more partitions.

After thinking it over, I realize this can be solved in the consumer thread as 
long as we never
assume one consumer thread will only consume one partition.

Regards,

Libo


-Original Message-
From: Jun Rao [mailto:jun...@gmail.com] 
Sent: Thursday, August 22, 2013 12:01 AM
To: users@kafka.apache.org
Subject: Re: ordering

Actually, I am not sure if I understand the trouble that you mentioned.
Could you elaborate that a bit more?

Thanks,

Jun


On Wed, Aug 21, 2013 at 12:30 PM, Yu, Libo  wrote:

> Hi,
>
> This is from kafka faq:
>
>
>   *   Each partition is not consumed by more than one consumer
> thread/process in each consumer group. This allows to have each 
> process consume in a single threaded fashion to guarantee ordering to 
> the consumer within the partition (if we split up a partition of 
> ordered messages and handed them out to multiple consumers even though 
> the messages were stored in order they would be processed out of order at 
> times).
>
> Is this doable?
>
> Say a topic has 3 partitions. And there are 3 consumer processes in a 
> consumer group each of which has a single thread.
> When we start the 3 processes, when the first process is up, it will 
> consume all 3 partitions. When the second process is up,
> 1 process consumes 2 partitions and 1 consumes 1 partition. Only when 
> the third process is up, will each process consume One partition. This 
> will cause trouble. There seems to be no way to bound a stream to one 
> partition.
>
>
> Regards,
>
> Libo
>
>


  1   2   >