Re: expanding cluster and reassigning parititions without restarting producer

2014-11-09 Thread Shlomi Hazan
No I don't see anything like that, the question was aimed at learning if it
is worthwhile to make the effort of reimplementing the Python producer in
Java, I so I will not make all the effort just to be disappointed
afterwards.
understand I have nothing to worry about, so I will try to simulate this
situation in small scale...
maybe 3 brokers, one topic with one partition and then add partitions.
we'll see.
thanks for clarifying.
Oh, Good luck with Confluent!!
:)

On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede 
wrote:

> The producer might get an error code if the leader of the partitions being
> reassigned also changes. However it should retry and succeed. Do you see a
> behavior that suggests otherwise?
>
> On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan  wrote:
>
> > Hi All,
> > I recently had an issue producing from python where expanding a cluster
> > from 3 to 5 nodes and reassigning partitions forced me to restart the
> > producer b/c of KeyError thrown.
> > Is this situation handled by the Java producer automatically or need I do
> > something to have the java producer refresh itself to see the reassigned
> > partition layout and produce away ?
> > Shlomi
> >
>


Re: consumer ack for high-level consumer?

2014-11-09 Thread Chia-Chun Shih
Got it! Thanks for your response.



2014-11-07 13:14 GMT+08:00 Guozhang Wang :

> 0. Yes, if consumer crashed before commit its offset it can cause
> duplicates.
>
> 1. Yes, since from the consumer client's point of view, once the message is
> returned from the iterator it is considered as "consumed"; if you want
> consumer to only consider a message as consumed when it is processed by the
> application on top of it, you need to turn off auto offset and manually
> call commit.
>
> On Thu, Nov 6, 2014 at 6:25 PM, Chia-Chun Shih 
> wrote:
>
> > Hi,
> >
> > Thanks for your response. Therefore, offsets in ZK may be out-of-date. It
> > is possible to deliver duplicated messages when clients restart.
> >
> > I also wonder the possibilities of losing message. Is it possible that
> > things occur in this order?
> >
> >1. Client calls ConsumerIterator$next() to get a message, update local
> >offsets
> >2. ZookeeperConsumerConnector$commitOffset() is called, local offsets
> >sync to ZK
> >3. Client fails when processing this message
> >4. Client restarts, but this message is marked as consumed in ZK
> >
> > Thanks,
> > Chia-Chun
> >
> > 2014-11-07 1:45 GMT+08:00 Guozhang Wang :
> >
> > > That is correct.
> > >
> > > Guozhang
> > >
> > > On Wed, Nov 5, 2014 at 9:18 PM, Chia-Chun Shih <
> chiachun.s...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for your response. I just read source code and found that:
> > > >
> > > >   1) ConsumerIterator$next() use
> PartitionTopicInfo$resetConsumeOffset
> > to
> > > > update offsets in PartitionTopicInfo objects.
> > > >   2) ZookeeperConsumerConnector$commitOffset() gets latest offsets
> from
> > > > PartitionTopicInfo objects, and update offsets to ZK.
> > > >
> > > > So, when clients iterate through messages, offsets are updated
> locally
> > > > in PartitionTopicInfo
> > > > objects. When ZookeeperConsumerConnector$commitOffset is called,
> local
> > > > offsets are sync to ZK. Is it correct?
> > > >
> > > > regards,
> > > > Chia-Chun
> > > >
> > > > 2014-11-06 0:24 GMT+08:00 Guozhang Wang :
> > > >
> > > > > Hello,
> > > > >
> > > > > You can turn of auto.commit.offset and manually call
> > > > > connector.commitOffset() manually after you have processed the
> data.
> > > One
> > > > > thing to remember is that the commit frequency is related to ZK (in
> > the
> > > > > future, Kafka) writes and hence you may not want to commit after
> > > > processed
> > > > > every single message but only a batch of messages.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Nov 4, 2014 at 10:42 PM, Chia-Chun Shih <
> > > chiachun.s...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am a new to Kafka. In my understanding, high-level consumer (
> > > > > > ZookeeperConsumerConnector) changes offset when message is drawn
> > > > > > by ConsumerIterator. But I would like to change offset when
> message
> > > is
> > > > > > processed, not when message is drawn from broker. So if a
> consumer
> > > dies
> > > > > > before a message is completely processed, the message will be
> > > processed
> > > > > > again. Is it possible?
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: One question about "New Producer Configs"

2014-11-09 Thread Chia-Chun Shih
Dear Genlong,

"New Producer Configs" is for upcoming versions. If you are using 0.8.1,
please refer to "Producer Configs".

acks=n, in which n means number of in-sync replicas, not number of
partitions.

regards,
Chia-Chun

2014-11-10 10:51 GMT+08:00 hi <995174...@qq.com>:

> Dear sir or madam,
>
> There is one question when I'm using Kafka:
> From the "documentation:New Producer Configs" ,I get that acks=n means
> the number of acknowledgments the producer requires the leader to have
> received before considering a request complete. But why can the producer
> send messages successfully when I use the  Producer API‍ and  the acks is
> set to 2 (the number of partitions is 3 but there is only one partition‍
> can work)?
>
> Thanks a lot and look forward to hearing from you soon.
>
> Best Regards,
>
> Genlong Wang
> ‍
>
> ‍‍‍


Re: Consumer thread dies

2014-11-09 Thread Jun Rao
If C1 dies, C2 will be owning that partition. However, C1 has to really
die, which typically means that either you close the consumer connector or
the jvm of C1 is gone.

In your case, it seems that C1 didn't die, it just hung. Do you know why C1
hung?

Thanks,

Jun

On Fri, Nov 7, 2014 at 3:34 PM, Srinivas Reddy Kancharla  wrote:

> Hi,
>
> I have a scenario where I have 1 partition and 1 consumer group having 2
> consumer threads running say C1 and C2. Since there is only one partition
> for a given topic, say C1 is holding that partition. Now due to some reason
> if C1 dies, can C2 get hold of that partition?
>
> i.e. C1 was busy with KafkaStream instance, for any reason if C1 dies or in
> hung state, Can we make C2 talking to KafkaStream (for Partition 0).
> I am facing this issue where I have 10 messages in partition 0 and C1 was
> consuming it. At message 4, C1 went into hung state. Now I would like to
> make C2 to consumer other messages which are not consumed by C1.
>
> Thank and regards,
> Srini
>


Re: Add partitions with replica assignment in same command

2014-11-09 Thread Jun Rao
Yes, it seems that we need to fix the tool to support that. It's probably
more intuitive to have TopicCommand just take the replica-assignment (for
the new partitions) when altering a topic. Could you file a jira?

Thanks,

Jun

On Fri, Nov 7, 2014 at 4:17 PM, Allen Wang 
wrote:

> I am trying to figure out how to add partitions and assign replicas using
> one admin command. I tried kafka.admin.TopicCommand to increase the
> partition number from 9 to 12 with the following options:
>
> /apps/kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand  --zookeeper
> ${ZOOKEEPER} --alter --topic test_topic_4 --partitions 12
> --replica-assignment 2:1,0:2,1:0,1:2,2:0,0:1,1:0,2:1,0:2,2:1,0:2,1:0
>
> This gives me an error
>
> Option "[replica-assignment]" can't be used with option"[partitions]"
>
> Looking into the TopicCommand, alterTopic function seems to be able to
> handle that but the command exits with the above error before this function
> is invoked.
>
> Is there any workaround or other recommended way to achieve this?
>
> Thanks,
> Allen
>


One question about "New Producer Configs"

2014-11-09 Thread hi
Dear sir or madam,

There is one question when I'm using Kafka:
From the "documentation:New Producer Configs" ,I get that acks=n means the 
number of acknowledgments the producer requires the leader to have received 
before considering a request complete. But why can the producer  send messages 
successfully when I use the  Producer API‍ and  the acks is set to 2 (the 
number of partitions is 3 but there is only one partition‍ can work)? 

Thanks a lot and look forward to hearing from you soon.

Best Regards,

Genlong Wang
‍

‍‍‍

Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-09 Thread Neha Narkhede
A rebalance should trigger on all consumers when you add a new consumer to
the group. If you don't see the zookeeper watch fire, the consumer may have
somehow lost the watch. We have seen this behavior on older zk versions, I
wonder it that bug got reintroduced. To verify if this is the case, you can
run the wchp zookeeper command on the zk leader and check if each consumer
has a watch registered.

Do you have a way to try this on zk 3.3.4? I would recommend you try the
wchp suggestion as well.

On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria 
wrote:

> Hi all,
>
> Can someone help here. We are getting constant rebalance failure each time
> a consumer is added beyond a certain number. Did quite a lot of debugging
> on this and still not able to figure out the pattern.
>
> -Thanks,
> Mohit
>
> On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria 
> wrote:
>
> > Neha,
> >
> > Looks like an issue with the consumer rebalance not able to complete
> > successfully. We were able to reproduce the issue on topic with 30
> > partitions,  3 consumer processes(p1,p2 and p3), properties -  40
> > rebalance.max.retries and 1(10s) rebalance.backoff.ms.
> >
> > Before the process p3 was started, partition ownership was as expected:
> >
> > partitions 0-14 owned by p1
> > partitions 15-29 -> owner p2
> >
> > As the process p3 started, rebalance was triggered. Process p3 was
> > successfully able to acquire partition ownership for partitions 20-29 as
> > expected as per the rebalance algorithm. However, process p2 while trying
> > to acquire ownership of partitions 10-19 saw rebalance failure after 40
> > retries.
> >
> > Attaching the logs from process p2 and process p1. It says that p2 was
> > attempting to rebalance, it was trying to acquire ownership of partitions
> > 10-14 which were owned by process p1. However, at the same time process
> p1
> > did not get any event for giving up the partition ownership for
> partitions
> > 1-14.
> > We were expecting a rebalance to have triggered in p1 - but it didn't and
> > hence not giving up ownership. Is our assumption correct/incorrect?
> > And if the rebalance gets triggered in p1 - how to figure out apart from
> > logs as the logs on p1 did not have anything.
> >
> > *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
> > [topic_consumerIdString], waiting for the partition ownership to be
> > deleted: 11*
> >
> > During and after the rebalance failed on process p2, Partition Ownership
> > was as below:
> > 0-14 -> owner p1
> > 15-19 -> none
> > 20-29 -> owner p3
> >
> > This left the consumers in inconsistent state as 5 partitions were never
> > consumer from and neither was the partitions ownership balanced.
> >
> > However, there was no conflict in creating the ephemeral node which was
> > the case last time. Just to note that the ephemeral node conflict which
> we
> > were seeing earlier also appeared after rebalance failed. My hunch is
> that
> > fixing the rebalance failure will fix that issue as well.
> >
> > -Thanks,
> > Mohit
> >
> >
> >
> > On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede 
> > wrote:
> >
> >> Mohit,
> >>
> >> I wonder if it is related to
> >> https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper
> expires
> >> a
> >> session, it doesn't delete the ephemeral nodes immediately. So if you
> end
> >> up trying to recreate ephemeral nodes quickly, it could either be in the
> >> valid latest session or from the previously expired session. If you hit
> >> this problem, then waiting would resolve it. But if not, then this may
> be
> >> a
> >> legitimate bug in ZK 3.4.6.
> >>
> >> Can you try shutting down all your consumers, waiting until session
> >> timeout
> >> and restarting them?
> >>
> >> Thanks,
> >> Neha
> >>
> >> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria  >
> >> wrote:
> >>
> >> > Dear Experts,
> >> >
> >> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
> >> > topic with 30 partitions and 2 replicas. We are using High level
> >> consumer
> >> > api.
> >> > Each consumer process which is a storm topolofy has 5 streams which
> >> > connects to 1 or more partitions. We are not using storm's inbuilt
> kafka
> >> > spout. Everything runs fine till the 5th consumer process(25 streams)
> is
> >> > added for this topic.
> >> >
> >> > As soon as the sixth consumer process is added, the newly added
> >> partition
> >> > does not get the ownership of the partitions that it requests for as
> the
> >> > already existing owners have not yet given up the ownership.
> >> >
> >> > We changed certain properties on consumer :
> >> >
> >> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
> >> > rebalance.max.retries >> zk connection timeout)
> >> > 2. Back off ms between rebalances - 1 (10seconds)
> >> > 3. ZK connection timeout - 100,000 (100 seconds)
> >> >
> >> > Although when I am looking in the zookeeper shell when the rebalance
> is
> >> > happening, the consumer is registered fine on the zookeeper. Just t

Re: Interrupting controlled shutdown breaks Kafka cluster

2014-11-09 Thread Guozhang Wang
Solon,

You may be hitting this: KAFKA-1305
. You can try with
0.8.2-beta version and see if this issue is re-producible.

Guozhang

On Fri, Nov 7, 2014 at 11:52 AM, Solon Gordon  wrote:

> We're using 0.8.1.1 with auto.leader.rebalance.enable=true.
>
> On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang  wrote:
>
> > Solon,
> >
> > Which version of Kafka are you running and are you enabling auto leader
> > rebalance at the same time?
> >
> > Guozhang
> >
> > On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon  wrote:
> >
> > > Hi all,
> > >
> > > My team has observed that if a broker process is killed in the middle
> of
> > > the controlled shutdown procedure, the remaining brokers start spewing
> > > errors and do not properly rebalance leadership. The cluster cannot
> > recover
> > > without major manual intervention.
> > >
> > > Here is how to reproduce the problem:
> > > 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them
> A,
> > > B, and C.) Set controlled.shutdown.enable=true.
> > > 2. Create a topic with replication_factor = 3 and a large number of
> > > partitions (say 100).
> > > 3. Send a TERM signal to broker A. This initiates controlled shutdown.
> > > 4. Before controlled shutdown completes, quickly send a KILL signal to
> > > broker A.
> > >
> > > Result:
> > > - Brokers B and C start logging ReplicaFetcherThread connection errors
> > > every few milliseconds. (See below for an example.)
> > > - Broker A is still listed as a leader and ISR for any partitions which
> > > were not transferred during controlled shutdown. This causes connection
> > > errors when clients try to produce to or consume from these partitions.
> > >
> > > This scenario is difficult to recover from. The only ways we have found
> > are
> > > to restart broker A multiple times (if it still exists) or to kill
> both B
> > > and C and then start them one by one. Without this kind of
> intervention,
> > > the above issues persist indefinitely.
> > >
> > > This may sound like a contrived scenario, but it's exactly what we have
> > > seen when a Kafka EC2 instance gets terminated by AWS. So this seems
> > like a
> > > real liability.
> > >
> > > Are there any existing JIRA tickets which cover this behavior? And do
> you
> > > have any recommendations for avoiding it, other than forsaking
> controlled
> > > shutdowns entirely?
> > >
> > > Thanks,
> > > Solon
> > >
> > > Error example:
> > > [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225],
> > Error
> > > in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
> > > ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500
> > ms;
> > > MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
> > > PartitionFetchInfo(503,10485760),[my-topic,63] ->
> > > PartitionFetchInfo(386,10485760),[my-topic,99] ->
> > > PartitionFetchInfo(525,10485760),[my-topic,84] ->
> > > PartitionFetchInfo(436,10485760),[my-topic,48] ->
> > > PartitionFetchInfo(484,10485760),[my-topic,75] ->
> > > PartitionFetchInfo(506,10485760),[my-topic,45] ->
> > > PartitionFetchInfo(473,10485760),[my-topic,66] ->
> > > PartitionFetchInfo(532,10485760),[my-topic,30] ->
> > > PartitionFetchInfo(435,10485760),[my-topic,96] ->
> > > PartitionFetchInfo(517,10485760),[my-topic,27] ->
> > > PartitionFetchInfo(470,10485760),[my-topic,36] ->
> > > PartitionFetchInfo(472,10485760),[my-topic,9] ->
> > > PartitionFetchInfo(514,10485760),[my-topic,33] ->
> > > PartitionFetchInfo(582,10485760),[my-topic,69] ->
> > > PartitionFetchInfo(504,10485760),[my-topic,57] ->
> > > PartitionFetchInfo(444,10485760),[my-topic,78] ->
> > > PartitionFetchInfo(559,10485760),[my-topic,12] ->
> > > PartitionFetchInfo(417,10485760),[my-topic,90] ->
> > > PartitionFetchInfo(429,10485760),[my-topic,18] ->
> > > PartitionFetchInfo(497,10485760),[my-topic,0] ->
> > > PartitionFetchInfo(402,10485760),[my-topic,6] ->
> > > PartitionFetchInfo(527,10485760),[my-topic,54] ->
> > > PartitionFetchInfo(524,10485760),[my-topic,15] ->
> > > PartitionFetchInfo(448,10485760),[console,0] ->
> > > PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at
> > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > > at
> > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at
> > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka

Re: Interrupting controlled shutdown breaks Kafka cluster

2014-11-09 Thread Neha Narkhede
We fixed a couple issues related to automatic leader balancing and
controlled shutdown. Would you mind trying out 0.8.2-beta?

On Fri, Nov 7, 2014 at 11:52 AM, Solon Gordon  wrote:

> We're using 0.8.1.1 with auto.leader.rebalance.enable=true.
>
> On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang  wrote:
>
> > Solon,
> >
> > Which version of Kafka are you running and are you enabling auto leader
> > rebalance at the same time?
> >
> > Guozhang
> >
> > On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon  wrote:
> >
> > > Hi all,
> > >
> > > My team has observed that if a broker process is killed in the middle
> of
> > > the controlled shutdown procedure, the remaining brokers start spewing
> > > errors and do not properly rebalance leadership. The cluster cannot
> > recover
> > > without major manual intervention.
> > >
> > > Here is how to reproduce the problem:
> > > 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them
> A,
> > > B, and C.) Set controlled.shutdown.enable=true.
> > > 2. Create a topic with replication_factor = 3 and a large number of
> > > partitions (say 100).
> > > 3. Send a TERM signal to broker A. This initiates controlled shutdown.
> > > 4. Before controlled shutdown completes, quickly send a KILL signal to
> > > broker A.
> > >
> > > Result:
> > > - Brokers B and C start logging ReplicaFetcherThread connection errors
> > > every few milliseconds. (See below for an example.)
> > > - Broker A is still listed as a leader and ISR for any partitions which
> > > were not transferred during controlled shutdown. This causes connection
> > > errors when clients try to produce to or consume from these partitions.
> > >
> > > This scenario is difficult to recover from. The only ways we have found
> > are
> > > to restart broker A multiple times (if it still exists) or to kill
> both B
> > > and C and then start them one by one. Without this kind of
> intervention,
> > > the above issues persist indefinitely.
> > >
> > > This may sound like a contrived scenario, but it's exactly what we have
> > > seen when a Kafka EC2 instance gets terminated by AWS. So this seems
> > like a
> > > real liability.
> > >
> > > Are there any existing JIRA tickets which cover this behavior? And do
> you
> > > have any recommendations for avoiding it, other than forsaking
> controlled
> > > shutdowns entirely?
> > >
> > > Thanks,
> > > Solon
> > >
> > > Error example:
> > > [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225],
> > Error
> > > in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
> > > ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500
> > ms;
> > > MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
> > > PartitionFetchInfo(503,10485760),[my-topic,63] ->
> > > PartitionFetchInfo(386,10485760),[my-topic,99] ->
> > > PartitionFetchInfo(525,10485760),[my-topic,84] ->
> > > PartitionFetchInfo(436,10485760),[my-topic,48] ->
> > > PartitionFetchInfo(484,10485760),[my-topic,75] ->
> > > PartitionFetchInfo(506,10485760),[my-topic,45] ->
> > > PartitionFetchInfo(473,10485760),[my-topic,66] ->
> > > PartitionFetchInfo(532,10485760),[my-topic,30] ->
> > > PartitionFetchInfo(435,10485760),[my-topic,96] ->
> > > PartitionFetchInfo(517,10485760),[my-topic,27] ->
> > > PartitionFetchInfo(470,10485760),[my-topic,36] ->
> > > PartitionFetchInfo(472,10485760),[my-topic,9] ->
> > > PartitionFetchInfo(514,10485760),[my-topic,33] ->
> > > PartitionFetchInfo(582,10485760),[my-topic,69] ->
> > > PartitionFetchInfo(504,10485760),[my-topic,57] ->
> > > PartitionFetchInfo(444,10485760),[my-topic,78] ->
> > > PartitionFetchInfo(559,10485760),[my-topic,12] ->
> > > PartitionFetchInfo(417,10485760),[my-topic,90] ->
> > > PartitionFetchInfo(429,10485760),[my-topic,18] ->
> > > PartitionFetchInfo(497,10485760),[my-topic,0] ->
> > > PartitionFetchInfo(402,10485760),[my-topic,6] ->
> > > PartitionFetchInfo(527,10485760),[my-topic,54] ->
> > > PartitionFetchInfo(524,10485760),[my-topic,15] ->
> > > PartitionFetchInfo(448,10485760),[console,0] ->
> > > PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at
> > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > > at
> > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at
> > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply

Re: expanding cluster and reassigning parititions without restarting producer

2014-11-09 Thread Neha Narkhede
The producer might get an error code if the leader of the partitions being
reassigned also changes. However it should retry and succeed. Do you see a
behavior that suggests otherwise?

On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan  wrote:

> Hi All,
> I recently had an issue producing from python where expanding a cluster
> from 3 to 5 nodes and reassigning partitions forced me to restart the
> producer b/c of KeyError thrown.
> Is this situation handled by the Java producer automatically or need I do
> something to have the java producer refresh itself to see the reassigned
> partition layout and produce away ?
> Shlomi
>


Re: corrupt recovery checkpoint file issue....

2014-11-09 Thread Jun Rao
Guozhang,

In OffsetCheckpoint.write(), we don't catch any exceptions. There is only a
finally clause to close the writer. So, it there is any exception during
write or sync, the exception will be propagated back to the caller and
swapping will be skipped.

Thanks,

Jun

On Fri, Nov 7, 2014 at 9:47 AM, Guozhang Wang  wrote:

> Jun,
>
> Checking the OffsetCheckpoint.write function, if
> "fileOutputStream.getFD.sync" throws exception it will just be caught and
> forgotten, and the swap will still happen, may be we need to catch the
> SyncFailedException and re-throw it as a FATAIL error to skip the swap.
>
> Guozhang
>
>
> On Thu, Nov 6, 2014 at 8:50 PM, Jason Rosenberg  wrote:
>
> > I'm still not sure what caused the reboot of the system (but yes it
> appears
> > to have crashed hard).  The file system is xfs, on CentOs linux.  I'm not
> > yet sure, but I think also before the crash, the system might have become
> > wedged.
> >
> > It appears the corrupt recovery files actually contained all zero bytes,
> > after looking at it with odb.
> >
> > I'll file a Jira.
> >
> > On Thu, Nov 6, 2014 at 7:09 PM, Jun Rao  wrote:
> >
> > > I am also wondering how the corruption happened. The way that we update
> > the
> > > OffsetCheckpoint file is to first write to a tmp file and flush the
> data.
> > > We then rename the tmp file to the final file. This is done to prevent
> > > corruption caused by a crash in the middle of the writes. In your case,
> > was
> > > the host crashed? What kind of storage system are you using? Is there
> any
> > > non-volatile cache on the storage system?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg 
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > We recently had a kafka node go down suddenly. When it came back up,
> it
> > > > apparently had a corrupt recovery file, and refused to startup:
> > > >
> > > > 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
> > > > starting up KafkaServer
> > > > java.lang.NumberFormatException: For input string:
> > > >
> > > >
> > >
> >
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
> > > >
> > > >
> > >
> >
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> > > > at
> > > >
> > >
> >
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> > > > at java.lang.Integer.parseInt(Integer.java:481)
> > > > at java.lang.Integer.parseInt(Integer.java:527)
> > > > at
> > > >
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> > > > at
> > scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> > > > at
> > kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> > > > at
> > > > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> > > > at
> > > > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> > > > at
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > > > at
> > > > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> > > > at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> > > > at kafka.log.LogManager.(LogManager.scala:57)
> > > > at
> > > kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
> > > >
> > > > And since the app is under a monitor (so it was repeatedly restarting
> > and
> > > > failing with this error for several minutes before we got to it)…
> > > >
> > > > We moved the ‘recovery-point-offset-checkpoint’ file out of the way,
> > and
> > > it
> > > > then restarted cleanly (but of course re-synced all it’s data from
> > > > replicas, so we had no data loss).
> > > >
> > > > Anyway, I’m wondering if that’s the expected behavior? Or should it
> not
> > > > declare it corrupt and then proceed automatically to an unclean
> > restart?
> > > >
> > > > Should this NumberFormatException be handled a bit more gracefully?
> > > >
> > > > We saved the corrupt file if it’s worth inspecting (although I doubt
> it
> > > > will be useful!)….
> > > >
> > > > Jason
> > > > ​
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Adding replicas to existing topic cause data loss in some partitions

2014-11-09 Thread Jun Rao
Any error in the controller/state-change log when you increased the
replication factor? If you describe those topics, are both replicas in ISR?
The answers to those questions will help us understand whether this is a
broker side or consumer-side issue.

Thanks,

Jun

On Thu, Nov 6, 2014 at 11:56 PM, Shangan Chen 
wrote:

> I have a kafka cluster, every topic in it has only one replica. Recently I
> extend every topic with 2 replicas. Most topics work fine, but some large
> topics have some problems with part of partitions. Consumer throw offset
> OutOfRange exception, the fact is consumer request offset is bigger than
> the latest offset. I doubt there is some bug with the tool which can add
> replicas to existing topic.
>
> I add replicas by the following  guide:
>
> http://kafka.apache.org/081/documentation.html#basic_ops_increase_replication_factor
>
>
> Does anyone  face the same problem before and can figure out how to avoid
> this ?
>
> --
> have a good day!
> chenshang'an
>


Re: kafka test jars in sbt?

2014-11-09 Thread Joe Crobak
For sbt, you need to use something like:

"org.apache.kafka" %% "kafka" %"0.8.2-beta" % "test" classifier "test"

That tells sbt to pull in the kafka artifact with the "test" classifier
only when running tests. The %% tells sbt to fill in the scala version (so
it'll map to "kafka_2.10" like in your example).

On Thu, Nov 6, 2014 at 6:56 PM, Jun Rao  wrote:

> The following is how samza references the kafka test jar in gradle.
>
> testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
>
> Thanks,
>
> Jun
>
>
> On Thu, Nov 6, 2014 at 6:38 AM, Markus Jais  wrote:
>
> > Hello,
> >
> > I want to use the kafka_2.10-0.8.2-beta-test.jar in my Scala project.
> >
> > It can be found here:
> > http://repo.maven.apache.org/maven2/org/apache/kafka/kafka_2.10/0.8.1.1/
> >
> >
> > In my build.sbt I write the following definition:
> > "org.apache.kafka" % "kafka_2.10" % "0.8.2-beta-test"
> >
> >
> > But sbt cannot find it. Has anybody has any success with this?
> >
> > I already used "gradle testJar" and it test jar gets published to :
> >
> > .m2/repository/org/apache/kafka/kafka_2.10/0.8.2-beta
> >
> >
> > but sbt is looking for a:
> >
> >
> .m2/repository/org/apache/kafka/kafka_2.10/0.8.2-beta-test/kafka_2.10-0.8.2-beta-test.pom
> >
> > any tips on how to use the kafka test jar (together with the regular
> kafka
> > jar) in an build.sbt file?
> >
> > I want to start a kafka cluster for a unit test.
> >
> > Cheers,
> >
> > Marus
>


Re: powered by kafka

2014-11-09 Thread Gwen Shapira
I'm not Jay, but fixed it anyways ;)

Gwen

On Sun, Nov 9, 2014 at 10:34 AM, vipul jhawar 
wrote:

> Hi Jay
>
> Thanks for posting the update.
>
> However, i checked the page history and the hyperlink is pointing to the
> wrong domain.
> Exponential refers to www.exponential.com. I sent the twitter handle,
> should have sent the domain.
> Please correct.
>
> Thanks
>
> On Sat, Nov 8, 2014 at 3:45 PM, vipul jhawar 
> wrote:
>
> > Exponential @exponentialinc is using kafka in production to power the
> > events ingestion pipeline for real time analytics and log feed
> consumption.
> >
> > Please post on powered by kafka wiki -
> > https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
> >
> > Thanks
> > Vipul
> > http://in.linkedin.com/in/vjhawar/
> >
>


Re: powered by kafka

2014-11-09 Thread vipul jhawar
Hi Jay

Thanks for posting the update.

However, i checked the page history and the hyperlink is pointing to the
wrong domain.
Exponential refers to www.exponential.com. I sent the twitter handle,
should have sent the domain.
Please correct.

Thanks

On Sat, Nov 8, 2014 at 3:45 PM, vipul jhawar  wrote:

> Exponential @exponentialinc is using kafka in production to power the
> events ingestion pipeline for real time analytics and log feed consumption.
>
> Please post on powered by kafka wiki -
> https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
>
> Thanks
> Vipul
> http://in.linkedin.com/in/vjhawar/
>


Re: Consumer thread dies

2014-11-09 Thread Srinivas Reddy Kancharla
Hi,

Further I looked at this scenario, Is it correct that above scenario can be
handled if I use SimpleConsumer approach instead of using
"ConsumerConnector.createMessageStreams()" , this way I have better control
on partition. This way my partition is not bound with any specific consumer
thread. Please let me know if I am missing anything with
"ConsumerConnector.createMessageStreams()".

Thanks and regards,
Srini


On Fri, Nov 7, 2014 at 3:34 PM, Srinivas Reddy Kancharla  wrote:

> Hi,
>
> I have a scenario where I have 1 partition and 1 consumer group having 2
> consumer threads running say C1 and C2. Since there is only one partition
> for a given topic, say C1 is holding that partition. Now due to some reason
> if C1 dies, can C2 get hold of that partition?
>
> i.e. C1 was busy with KafkaStream instance, for any reason if C1 dies or
> in hung state, Can we make C2 talking to KafkaStream (for Partition 0).
> I am facing this issue where I have 10 messages in partition 0 and C1 was
> consuming it. At message 4, C1 went into hung state. Now I would like to
> make C2 to consumer other messages which are not consumed by C1.
>
> Thank and regards,
> Srini
>


Re: powered by kafka

2014-11-09 Thread Gwen Shapira
Updated. Thanks!

On Sat, Nov 8, 2014 at 12:16 PM, Jimmy John  wrote:

> Livefyre (http://web.livefyre.com/) uses kafka for the real time
> notifications, analytics pipeline and as the primary mechanism for general
> pub/sub.
>
> thx...
> jim
>
> On Sat, Nov 8, 2014 at 7:41 AM, Gwen Shapira 
> wrote:
>
> > Done!
> >
> > Thank you for using Kafka and letting us know :)
> >
> > On Sat, Nov 8, 2014 at 2:15 AM, vipul jhawar 
> > wrote:
> >
> > > Exponential @exponentialinc is using kafka in production to power the
> > > events ingestion pipeline for real time analytics and log feed
> > consumption.
> > >
> > > Please post on powered by kafka wiki -
> > > https://cwiki.apache.org/confluence/display/KAFKA/Powered+By
> > >
> > > Thanks
> > > Vipul
> > > http://in.linkedin.com/in/vjhawar/
> > >
> >
>


Re: Issues Running Kafka Producer Java example

2014-11-09 Thread Gwen Shapira
The producer code here looks fine. It may be an issue with the consumer, or
how the consumer is used.

If you are running the producer before starting a consumer, make sure you
get all messages by setting auto.offset.reset=smallest (in the console
consumer you can use --from-beginning)

Also, you can use the ConsumerOffsetChecker tool to see:
1. How much data you have in the topic
2. Whether the consumer is consuming anything at all


Gwen

On Sat, Nov 8, 2014 at 1:25 PM, Hardik Pandya 
wrote:

> Hello Champs,
>
> I am trying to run  first java producer example.
>
> Upon running this example, producer successfully sends the message, at
> least it looks like it does, there is no java dump
>
> But trying to verify the messages on consumer side - it does not return any
> data sent by producer
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
>
> here's my Producer code - thanks in advance!
>
> package example.kafka;
>
> import java.util.Date;
> import java.util.Properties;
> import java.util.Random;
>
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
>
> public class TestProducer {
>
> /**
>  * @param args
>  */
> public static void main(String[] args) {
> // TODO Auto-generated method stub
> long events = Long.parseLong(args[0]);
> Properties props = new Properties();
> props.put("metadata.broker.list", "10.0.2.15:9092,10.0.2.15:9093,
> 10.0.2.15:9094,10.0.2.15:9095");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("partitioner.class", "example.kafka.SimplePartitioner");
> props.put("request.required.acks", "1");
>  ProducerConfig config = new ProducerConfig(props);
> Producer producer = new Producer(config);
>  Random rnd = new Random();
>  for(long event=0;event < events;event++) {
> Long runtime = new Date().getTime();
> String msgKey= "192.168.2." + rnd.nextInt(255);
> String msg =  runtime  + ",www.exmaple.com," + msgKey;
> KeyedMessage data = new KeyedMessage String>("page_visits", msgKey, msg);
> producer.send(data);
> System.out.println("message sent");
> }
> producer.close();
>  }
>
> }
>