kafka.common.ConsumerRebalanceFailedException: can't rebalance after 4

2016-08-30 Thread Ratha v
HI all;
Im using 0.8 consumer with kafka 0.10.0.1.
When i run my consumer app (within wildfly 10.0.x) Im getting follwoing
exception and consumer is not listning any messages;
I tried increasing *"rebalance.backoff.ms ",
"zookeeper.session.timeout.ms
","rebalance.max.retries" values,* but
still no luck.
If I write a simple java program with main method, it works for same topic
and groupID.

Can anyone point me the fix? (I could not update to latest version
(0.9.x)of consumer APIs

*Consumer.properties*

auto.commit.enable=true

rebalance.max.retries=4

auto.commit.interval.ms=101

zookeeper.connect=zk1.abc.com\:2181,zk2. abc.com\:2181,zk3.abc.com\:2181

auto.offset.reset=largest

rebalance.backoff.ms=1

zookeeper.session.timeout.ms=6000

group.id=lob1

consumer.timeout.ms=-1

fetch.min.bytes=1


[1]

05:24:05,492 INFO  [stdout] (ServerService Thread Pool -- 75)
kafka.common.ConsumerRebalanceFailedException:
lob1_18bd15ac0c9c-1472621001050-5764721e can't rebalance after 4 retries

filerouter_1  | 05:24:05,492 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,492 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:977)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,493 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:264)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,493 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at
com.abc.core.listener.KafkaMessageListener.start(KafkaMessageListener.java:126)
[core-0.0.1-SNAPSHOT.jar:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at com.labc.bean.KafkaServiceBean.initialize(KafkaServiceBean.java:47)
[classes:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_91]

filerouter_1  | 05:24:05,495 INFO  [stdout] (ServerService Thread Pool --
75) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_91]

filerouter_1  | 05:24:05,495 INFO  [stdout] (ServerService Thread Pool --
75) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_91]

filerouter_1  | 05:24:05,495 INFO  [stdout] (ServerService Thread Pool --
75) at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_91]

filerouter_1  | 05:24:05,496 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.as.ee.component.ManagedReferenceLifecycleMethodInterceptor.processInvocation(ManagedReferenceLifecycleMethodInterceptor.java:96)
[wildfly-ee-10.1.0.Final.jar!/:10.1.0.Final]

filerouter_1  | 05:24:05,496 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
[jboss-invocation-1.4.1.Final.jar!/:1.4.1.Final]

filerouter_1  | 05:24:05,496 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.doLifecycleInterception(Jsr299BindingsInterceptor.java:114)
[wildfly-weld-10.1.0.Final.jar!/:10.1.0.Final]

filerouter_1  | 05:24:05,497 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.processInvocation(Jsr299BindingsInterceptor.java:103)
[wildfly-weld-10.1.0.Final.jar!/:10.1.0.Final]

filerouter_1  | 05:24:05,497 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
[jboss-invocation-1.4.1.Final.jar!/:1.4.1.Final]

filerouter_1  | 05:24:05,497 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:437)
[jboss-invocation-1.4.1.Final.jar!/:1.4.1.Final]

filerouter_1  | 05:24:05,498 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:73)
[weld-core-impl-2.3.5.Final.jar!/:2.3.5.Final]

filerouter_1  | 05:24:05,498 INFO  [stdout] (ServerService Thread Pool --
75) at

How to resolve WARN messages in kafka.server.ReplicaFetcherThread$FetchRequest: No response received within 30000 ms and Connection to xxx was disconnected

2016-08-30 Thread Daniel Kan
Hi,
We have been using kafka_2.11-0.9.0.1. We’re seeing the following 2 warning 
messages in kafka.server.ReplicaFetcherThread whenever there is large read by 
spark. It’s always accompanied by Shrinking ISR and Expanding ISR messages. It 
seems that Spark is still receiving data. However, it’s unclear to me what is 
the repercussion of these warning messages, why they occur, and how to get rid 
of them. I appreciate any insight on this. Thank you. 

Dan


[2016-08-31 02:15:56,918] WARN [ReplicaFetcherThread-0-112], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@5e5f9e6. Possible cause: 
java.io.IOException: Connection to 112 was disconnected before the response was 
read (kafka.server.ReplicaFetcherThread)


[2016-08-31 02:16:10,036] WARN [ReplicaFetcherThread-0-111], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@5dd468b5. Possible cause: 
java.net.SocketTimeoutException: No response received within 3 ms 
(kafka.server.ReplicaFetcherThread)


[2016-08-31 02:15:56,968] INFO Partition [prod_metrics,78] on broker 110: 
Shrinking ISR for partition [prod_metrics,78] from 110,112 to 110 
(kafka.cluster.Partition)
[2016-08-31 02:15:57,053] INFO Partition [prod_metrics,82] on broker 110: 
Shrinking ISR for partition [prod_metrics,82] from 110,113 to 110 
(kafka.cluster.Partition)



Re: How distributed countByKey works in KStream ?

2016-08-30 Thread Tommy Q
Tried the word count example as discussed, the result in wc-out is wrong:

a 1
> b 1
> a 1
> b 1
> c 1


The expected result should be:

a 2
> b 2
> c 1


Kafka version is 0.10.0.1


On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax 
wrote:

> No. It does not support hidden topics.
>
> The only explanation might be, that there is no repartitioning step. But
> than the question would be, if there is a bug in Kafka Streams, because
> between map() and countByKey() repartitioning is required.
>
> Can you verify that the result is correct?
>
> -Matthias
>
> On 08/30/2016 03:24 PM, Tommy Q wrote:
> > Does Kafka support hidden topics ? (Since all the topics infos are stored
> > in ZK, this probably not the case )
> >
> > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax 
> > wrote:
> >
> >> Hi Tommy,
> >>
> >> yes, you do understand Kafka Streams correctly. And yes, for shuffling,
> >> na internal topic will be created under the hood. It should be named
> >> "-something-repartition". I am not sure, why it is not
> >> listed via bin/kafka-topics.sh
> >>
> >> The internal topic "-counts-changelog" you see is
> >> created to back the state of countByKey() operator.
> >>
> >> See
> >> https://cwiki.apache.org/confluence/display/KAFKA/
> >> Kafka+Streams%3A+Internal+Data+Management
> >>
> >> and
> >>
> >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> >> streams-resetting-a-streams-application
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 08/30/2016 06:55 AM, Tommy Q wrote:
> >>> Michael, Thanks for your help.
> >>>
> >>> Take the word count example, I am trying to walk through the code based
> >> on
> >>> your explanation:
> >>>
> >>> val textLines: KStream[String, String] =
> >> builder.stream("input-topic")
> >>> val wordCounts: KStream[String, JLong] = textLines
> >>>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> >>>   .map((key: String, word: String) => new KeyValue(word, word))
> >>>   .countByKey("counts")
> >>>   .toStream
> >>>
> >>> wordCounts.to(stringSerde, longSerde, "wc-out")
> >>>
> >>> Suppose the input-topic has two partitions and each partition has a
> >> string
> >>> record produced into:
> >>>
> >>> input-topic_0 : "a b"
>  input-topic_1 : "a b c"
> >>>
> >>>
> >>> Suppose we started two instance of the stream topology ( task_0 and
> >>> task_1). So after flatMapValues & map executed, they should have the
> >>> following task state:
> >>>
> >>> task_0 :  [ (a, "a"), (b, "b") ]
>  task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> >>>
> >>>
> >>> Before the execution of  countByKey, the kafka-stream framework should
> >>> insert a invisible shuffle phase internally:
> >>>
> >>> shuffled across the network :
> 
> >>>
> >>>
>  _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
>  _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> >>>
> >>>
> >>> countByKey (reduce) :
> >>>
> >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> >>>
> >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> >>>
> >>>
> >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> >>> "wc-out")`, we get the word count output in wc-out topic:
> >>>
> >>> task_0 (wc-out_0) :  [ (a, 2) ]
> >>>
> >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> >>>
> >>>
> >>>
> >>> According the steps list above, do I understand the internals of
> kstream
> >>> word count correctly ?
> >>> Another question is does the shuffle across the network work by
> creating
> >>> intermediate topics ? If so, why can't I find the intermediate topics
> >> using
> >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only
> see
> >>> the counts-changelog got created by the kstream framework.
> >>>
> >>>
> >>>
> >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll 
> >> wrote:
> >>>
>  In Kafka Streams, data is partitioned according to the keys of the
>  key-value records, and operations such as countByKey operate on these
>  stream partitions.  When reading data from Kafka, these stream
> >> partitions
>  map to the partitions of the Kafka input topic(s), but these may
> change
>  once you add processing operations.
> 
>  To your question:  The first step, if the data isn't already keyed as
>  needed, is to select the key you want to count by, which results in 1+
>  output stream partitions.  Here, data may get shuffled across the
> >> network
>  (but if won't if there's no need to, e.g. when the data is already
> >> keyed as
>  needed).  Then the count operation is performed for each stream
> >> partition,
>  which is similar to the sort-and-reduce phase in Hadoop.
> 
>  On Mon, Aug 29, 2016 at 5:31 PM, Tommy  wrote:
> 
> > Hi,
> >
> > For "word count" example in Hadoop, there are shuffle-sort-and-reduce
> > phases that handles outputs from different mappers, how does it work
> in
> > 

Re: Kafka Streaming Join for range of gps coordinates

2016-08-30 Thread Guozhang Wang
Hello Farhon,

I think your idea about KStream-KTable join is a good approach with some
tweaks, more specifically:

1. Model your rider request as a normal record stream with the combo key of
(latitude, longitude).

2. Model your driver location as an ever-updating table with the combo key
of (latitude, longitude), and value as the driver information. Whenever a
driver's location gets updated, it is captured as two updates: a delete
with the old location key and an insert with the new location key.

3. Then upon receiving a new record from the rider stream, issue the range
query based on its key with the range on the materialized table.


Does this work for you?


Guozhang



On Mon, Aug 29, 2016 at 11:36 AM, Michael Noll  wrote:

> Quick reply only, since I am on my mobile.  Not an exact answer to your
> problem but still somewhat related:
> http://www.infolace.com/blog/2016/07/14/simple-spatial-
> windowing-with-kafka-streams/
> (perhaps you have seen this already).
>
> -Michael
>
>
>
> On Sun, Aug 28, 2016 at 4:55 AM, Farhon Zaharia 
> wrote:
>
> > Hello friends,
> >
> > I am designing a new streaming component and am looking at how to use
> Kafka
> > Streams.
> >
> > I need some guidance with the appropriate flow.
> >
> > *Problem to solve:*
> > The problem I am working on is I have a large group of riders and
> drivers.
> > I would like to match available riders to nearby drivers.  (For
> simplicity
> > think of Uber or Lyft)
> >
> > The goal is to have the drivers within a certain delta of gps coordinates
> > be notified of a potential rider.
> >
> > For example, a rider requests to be picked up at location:
> > 37.788517, -122.406812
> >
> > I would like to select the nearby drivers to send notifications of an
> > available match by selecting nearby drivers within a range
> >
> > latitude < 37.798517 && latitude > 37.778517 && longitude < -122.4106812
> &&
> > longitude > -122.3906812
> >
> > *Note this is a POC and would prefer to select the most nearby drivers,
> > then after lookup the address and use my own graph for streets and
> > calculate the shortest path on my own.
> >
> > I would like to have 3 initial topics:  riders, drivers, and
> paired-onride
> >
> > What is the best way to do this with Kafka Streams?
> >
> > *What I have tried or considered:*
> > I was considering storing drivers in a Ktable and having riders in a
> > KStream and joining them.  But I don't think this will work because the
> > join is dependent on the key, which in this case I was looking more for a
> > select statement to look for a range of gps coordinates as noted above.
> > The drivers location will be updated periodically.
> >
> > I was also thinking of filtering the KStream based on the gps range and
> > making a smaller subselection of available drivers within a certain
> > distance to a rider.
> >
> > At this point I am seeking some guidance and if this is not an ideal
> > use-case that is also ok.  Thanks for any information or direction you
> can
> > provide.
> >
> >
> > -Farhon
> >
>



-- 
-- Guozhang


Re: Monitoring the max lag of a kafka streams application.

2016-08-30 Thread Guozhang Wang
Hi Rohit,

Just for clarification, as stated in the java doc, metricChange "is called
whenever a metric is updated or added". It is not the function when a
metric is recorded; in fact, the metrics collection is in the "pull" model,
where the implemented reporters can fetch the current calculated values
based on its metric type (rate, min, max, histogram, etc). You may want to
first take a look at the default JMXReporter implementation with the "
getAttribute" function which does the pulling to make sure your customized
reporter does the right behavior.



Guozhang

On Tue, Aug 30, 2016 at 6:01 PM, Guozhang Wang  wrote:

> Hello Rohit,
>
> As you are already aware, monitoring kafka streams is no difference than
> monitoring kafka producers / consumers. So you can just monitor on its
> embedded consumer's "records-lag-max" metric, which gets recorded
> whenever the consumer received the fetch response.
>
> As for your application, your way of passing the class name of your
> implemented MetricsReporter through the StreamsConfig is correct. Have
> you seen records being processing by your streams application, meaning that
> there are indeed some fetched records from the fetch response?
>
> Guozhang
>
>
> On Mon, Aug 29, 2016 at 10:48 AM, Rohit Valsakumar 
> wrote:
>
>> Hi all,
>>
>> Any opinions about monitoring the records-lag-max for a kafka streams job?
>>
>> Thanks,
>> Rohit
>>
>> On 8/26/16, 2:53 PM, "Rohit Valsakumar"  wrote:
>>
>> >Hi all,
>> >
>> >I want to monitor the max lag of a kafka streams job which is consuming
>> >from three topics and to do that I have implemented the MetricsReporter
>> >interface which I pass through the Streams Configuration to the
>> >KafkaStreams object. In the implementation¹s metricChange()  method I
>> >have logging for the metric with group consumer-fetch-manager-metrics and
>> >name records-lag-max. However, the implementation never receives a
>> >metricChange notification for 'records-lag-max¹.
>> >
>> >I would like to know if what I am doing makes sense, is the correct
>> >approach and whether the implementation should be getting notifications
>> >for 'records-lag-max¹ periodically or only when the max lag is changing
>> >i.e. Increasing/decreasing.
>> >
>> >Thanks,
>> >Rohit
>> >
>> >
>> >
>> >
>> >This email and any attachments may contain confidential and privileged
>> >material for the sole use of the intended recipient. Any review, copying,
>> >or distribution of this email (or any attachments) by others is
>> >prohibited. If you are not the intended recipient, please contact the
>> >sender immediately and permanently delete this email and any attachments.
>> >No employee or agent of TiVo Inc. is authorized to conclude any binding
>> >agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> >Inc. may only be made by a signed written agreement.
>>
>>
>> 
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review, copying,
>> or distribution of this email (or any attachments) by others is prohibited.
>> If you are not the intended recipient, please contact the sender
>> immediately and permanently delete this email and any attachments. No
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: Monitoring the max lag of a kafka streams application.

2016-08-30 Thread Guozhang Wang
Hello Rohit,

As you are already aware, monitoring kafka streams is no difference than
monitoring kafka producers / consumers. So you can just monitor on its
embedded consumer's "records-lag-max" metric, which gets recorded whenever
the consumer received the fetch response.

As for your application, your way of passing the class name of your
implemented MetricsReporter through the StreamsConfig is correct. Have you
seen records being processing by your streams application, meaning that
there are indeed some fetched records from the fetch response?

Guozhang


On Mon, Aug 29, 2016 at 10:48 AM, Rohit Valsakumar 
wrote:

> Hi all,
>
> Any opinions about monitoring the records-lag-max for a kafka streams job?
>
> Thanks,
> Rohit
>
> On 8/26/16, 2:53 PM, "Rohit Valsakumar"  wrote:
>
> >Hi all,
> >
> >I want to monitor the max lag of a kafka streams job which is consuming
> >from three topics and to do that I have implemented the MetricsReporter
> >interface which I pass through the Streams Configuration to the
> >KafkaStreams object. In the implementation¹s metricChange()  method I
> >have logging for the metric with group consumer-fetch-manager-metrics and
> >name records-lag-max. However, the implementation never receives a
> >metricChange notification for 'records-lag-max¹.
> >
> >I would like to know if what I am doing makes sense, is the correct
> >approach and whether the implementation should be getting notifications
> >for 'records-lag-max¹ periodically or only when the max lag is changing
> >i.e. Increasing/decreasing.
> >
> >Thanks,
> >Rohit
> >
> >
> >
> >
> >This email and any attachments may contain confidential and privileged
> >material for the sole use of the intended recipient. Any review, copying,
> >or distribution of this email (or any attachments) by others is
> >prohibited. If you are not the intended recipient, please contact the
> >sender immediately and permanently delete this email and any attachments.
> >No employee or agent of TiVo Inc. is authorized to conclude any binding
> >agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> >Inc. may only be made by a signed written agreement.
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>



-- 
-- Guozhang


Re: What should my Consumer connect to Kafka/Zookeeper

2016-08-30 Thread David Esposito
Your consumers/producers will push/pull messages directly from kafka. More
specifically, to/from the broker that is responsibile for the partition
assigned to the consumer or corresponding to the published message key.

Others should correct me if I am wrong, but zookeeper was used to store
information about topics, consumers, and offsets (version 0.8.*-ish). In
newer versions of kafka (0.9.* and 0.10.*) those values are instead written
to internal topics for Kafka to manage consumers, topics, and offsets.

So much (possibly all?) of the functionality that zookeeper provided is
being deprecated in favor of storing that info in kafka.

Another thing to consider is your serialization strategy and contract
between producers and consumers. You should consider predefining schemas
for your topics. Confluent.io
 provides steps to start
using a schema registry
 to enforce
producers. There are also libraries that leverage this schema registry to
provide fast and efficient serialization and deserialization. Check out Apache
Avro .

Also checkout https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem


On Tue, Aug 30, 2016 at 7:16 PM, Jesse Whitham 
wrote:

> Hi Everyone,
>
>
> I am trying to get some information around what my consumer should connect
> to. According to the docs sending messages should be directly to the Kafka
> broker and consuming messages should be from Zookeeper? I am really after a
> reason this is the case as I am aware that you can just consume directly
> from the Kafka broker.
>
>
> Thanks in advance for any help!
>
>
> Jesse Whitham
>
> Devops Engineer
>
> e. jesse.whit...@loyalty.co.nz
>
> w. loyalty.co.nz
>
> [Loyalty NZ]
>
> Loyalty New Zealand Ltd. PO Box 3451, New Zealand 6011
>
> Level 3, NZX Building, 11 Cable Street, Wellington
>



-- 
*David Esposito* | Product Developer | m. 678-653-1225
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305

"We could do that better. We’ve got the right technology."

-
Meet Brandon Williams, our newest Modern Workplace Innovator


Re: How do I remove myself from mailing list?

2016-08-30 Thread Philippe Derome
Shikhar,

I never got the "confirm unsubscribe from..." follow up email, hence the
problem.

Phil


On Tue, Aug 30, 2016 at 6:49 PM, Shikhar Bhushan 
wrote:

> Just to confirm: after you unsubscribe by sending an email to
> users-unsubscr...@kafka.apache.org, you should receive an email titled
> "confirm unsubscribe from users@kafka.apache.org", which you need to reply
> to in order for the unsubscribe to take effect. Did you folks do that?
>
> On Tue, Aug 30, 2016 at 3:40 PM Philippe Derome 
> wrote:
>
> > count me out too! ;-)
> >
> > On Tue, Aug 30, 2016 at 12:54 PM, Heath Ivie 
> > wrote:
> >
> > > I have done the same several times, but I still get these emails.
> > >
> > > -Original Message-
> > > From: Spencer Owen [mailto:so...@netdocuments.com]
> > > Sent: Tuesday, August 30, 2016 8:33 AM
> > > To: users@kafka.apache.org
> > > Subject: How do I remove myself from mailing list?
> > >
> > > I've gone through the unsubscribe process more times than I can count,
> > but
> > > I'm still getting emails.
> > >
> > > How can I unsubscribe? Who is an admin?
> > > 
> > > This message may contain information that is privileged or
> confidential.
> > > If you received this transmission in error, please notify the sender by
> > > reply e-mail and delete the message and any attachments. Any
> duplication,
> > > dissemination or distribution of this message or any attachments by
> > > unintended recipients is prohibited. Please note: All email sent to
> this
> > > address will be received by the NetDocuments corporate e-mail system
> and
> > is
> > > subject to archival and review by someone other than the recipient.
> > >
> > >
> >
>


Re: How do I remove myself from mailing list?

2016-08-30 Thread Shikhar Bhushan
Just to confirm: after you unsubscribe by sending an email to
users-unsubscr...@kafka.apache.org, you should receive an email titled
"confirm unsubscribe from users@kafka.apache.org", which you need to reply
to in order for the unsubscribe to take effect. Did you folks do that?

On Tue, Aug 30, 2016 at 3:40 PM Philippe Derome  wrote:

> count me out too! ;-)
>
> On Tue, Aug 30, 2016 at 12:54 PM, Heath Ivie 
> wrote:
>
> > I have done the same several times, but I still get these emails.
> >
> > -Original Message-
> > From: Spencer Owen [mailto:so...@netdocuments.com]
> > Sent: Tuesday, August 30, 2016 8:33 AM
> > To: users@kafka.apache.org
> > Subject: How do I remove myself from mailing list?
> >
> > I've gone through the unsubscribe process more times than I can count,
> but
> > I'm still getting emails.
> >
> > How can I unsubscribe? Who is an admin?
> > 
> > This message may contain information that is privileged or confidential.
> > If you received this transmission in error, please notify the sender by
> > reply e-mail and delete the message and any attachments. Any duplication,
> > dissemination or distribution of this message or any attachments by
> > unintended recipients is prohibited. Please note: All email sent to this
> > address will be received by the NetDocuments corporate e-mail system and
> is
> > subject to archival and review by someone other than the recipient.
> >
> >
>


Re: How do I remove myself from mailing list?

2016-08-30 Thread Philippe Derome
count me out too! ;-)

On Tue, Aug 30, 2016 at 12:54 PM, Heath Ivie  wrote:

> I have done the same several times, but I still get these emails.
>
> -Original Message-
> From: Spencer Owen [mailto:so...@netdocuments.com]
> Sent: Tuesday, August 30, 2016 8:33 AM
> To: users@kafka.apache.org
> Subject: How do I remove myself from mailing list?
>
> I've gone through the unsubscribe process more times than I can count, but
> I'm still getting emails.
>
> How can I unsubscribe? Who is an admin?
> 
> This message may contain information that is privileged or confidential.
> If you received this transmission in error, please notify the sender by
> reply e-mail and delete the message and any attachments. Any duplication,
> dissemination or distribution of this message or any attachments by
> unintended recipients is prohibited. Please note: All email sent to this
> address will be received by the NetDocuments corporate e-mail system and is
> subject to archival and review by someone other than the recipient.
>
>


Kafka unable to process message

2016-08-30 Thread Ghosh, Achintya (Contractor)
Hi there,

What does the below error mean and  how to avoid this? I see this error one of 
the kafkaServer.out file when other broker is down.

And not able to process any message as we see o.a.k.c.c.i.AbstractCoordinator - 
Issuing group metadata request to broker 5  from application log

[2016-08-30 20:40:28,621] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@8b198c3 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: 
null) failed
   at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
   at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
   at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
   at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
   at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
   at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
   at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
   at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
   at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
   at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


Re: a broker is already registered on path /brokers/ids/1

2016-08-30 Thread Nomar Morado
Kafka broker in my case

Printing e-mails wastes valuable natural resources. Please don't print this 
message unless it is absolutely necessary. Thank you for thinking green!

Sent from my iPhone

> On Aug 30, 2016, at 1:19 PM, Flavio Junqueira  wrote:
> 
> Is the zookeeper process you're referring to the server or the client (broker 
> in the case of Kafka)?
> 
> If you're referring to the server, then the ensemble recovers from the disk 
> state of the servers. If it is a follower, then there is nothing to recover, 
> the leader already has all the necessary data. If it is the leader, then it 
> will get the ephemeral/session data from disk.
> 
> If you're referring to the Kafka broker, then you should be seeing a message 
> like this in your log in the case the broker finds a znode upon registration:
> 
> case e: ZkNodeExistsException =>
>  throw new RuntimeException("A broker is already registered on the path " + 
> brokerIdPath
>  + ". This probably " + "indicates that you either have configured a 
> brokerid that is already in use, or "
>  + "else you have shutdown this broker and restarted it faster than 
> the zookeeper "
>  + "timeout so it appears to be re-registering.")
> 
> 
> Does it help?
> 
> -Flavio
> 
> 
>> On 30 Aug 2016, at 14:35, J316 Services  wrote:
>> 
>> Setup is 3 participants + 1 observer.
>> 
>> We used 3.5.0 for dynamic configs.
>> 
>> What happens when a system forced killed the zookeeper process and it did 
>> not have time to clean up ephemeral? Does it recover in that scenario?
>> 
>> 
>> Thanks.
>> 
>> Sent from my iPad
>> 
>>> On Aug 30, 2016, at 8:53 AM, Flavio Junqueira  wrote:
>>> 
>>> I think you're saying that the session isn't expiring that and the 
>>> ephemeral isn't getting deleted. Or maybe the session is expiring but the 
>>> ephemeral isn't being deleted? It'd be great if you could check that the 
>>> broker session is expiring eventually.
>>> 
>>> Since you're on the 3.5 branch, you may want to use 3.5.2-alpha or simply 
>>> try 3.4.8 as Harsha suggested.
>>> 
>>> -Flavio
>>> 
 On 29 Aug 2016, at 18:44, Harsha Chintalapani  wrote:
 
 how many brokers you've in this cluster. Do you try using a stable
 zookeeper release like 3.4.8?
 -Harhsa
 
> On Mon, Aug 29, 2016 at 5:21 AM Nomar Morado  
> wrote:
> 
> we are using kafka 0.9.0.1 and zk 3.5.0-alpha
> 
> On Mon, Aug 29, 2016 at 8:12 AM, Nomar Morado 
> wrote:
> 
>> we would get this occasionally after a weekend reboot/restart.
>> 
>> we tried restarting a couple of times all to naught.
>> 
>> we had to delete dk's directory to get his going again.
>> 
>> any ideas what might cause this issue and suggestions on how to resolve
>> this?
>> 
>> 
>> thanks.
> 
> 
> 
> --
> Regards,
> Nomar Morado
>>> 
> 


RE: How do I remove myself from mailing list?

2016-08-30 Thread Heath Ivie
I have done the same several times, but I still get these emails.

-Original Message-
From: Spencer Owen [mailto:so...@netdocuments.com] 
Sent: Tuesday, August 30, 2016 8:33 AM
To: users@kafka.apache.org
Subject: How do I remove myself from mailing list?

I've gone through the unsubscribe process more times than I can count, but I'm 
still getting emails.

How can I unsubscribe? Who is an admin?

This message may contain information that is privileged or confidential. If you 
received this transmission in error, please notify the sender by reply e-mail 
and delete the message and any attachments. Any duplication, dissemination or 
distribution of this message or any attachments by unintended recipients is 
prohibited. Please note: All email sent to this address will be received by the 
NetDocuments corporate e-mail system and is subject to archival and review by 
someone other than the recipient.



How do I remove myself from mailing list?

2016-08-30 Thread Spencer Owen
I've gone through the unsubscribe process more times than I can count, but I'm 
still getting emails.

How can I unsubscribe? Who is an admin?

This message may contain information that is privileged or confidential. If you 
received this transmission in error, please notify the sender by reply e-mail 
and delete the message and any attachments. Any duplication, dissemination or 
distribution of this message or any attachments by unintended recipients is 
prohibited. Please note: All email sent to this address will be received by the 
NetDocuments corporate e-mail system and is subject to archival and review by 
someone other than the recipient.



ApacheCon Seville CFP closes September 9th

2016-08-30 Thread Rich Bowen
It's traditional. We wait for the last minute to get our talk proposals
in for conferences.

Well, the last minute has arrived. The CFP for ApacheCon Seville closes
on September 9th, which is less than 2 weeks away. It's time to get your
talks in, so that we can make this the best ApacheCon yet.

It's also time to discuss with your developer and user community whether
there's a track of talks that you might want to propose, so that you
have more complete coverage of your project than a talk or two.

For Apache Big Data, the relevant URLs are:
Event details:
http://events.linuxfoundation.org/events/apache-big-data-europe
CFP:
http://events.linuxfoundation.org/events/apache-big-data-europe/program/cfp

For ApacheCon Europe, the relevant URLs are:
Event details: http://events.linuxfoundation.org/events/apachecon-europe
CFP: http://events.linuxfoundation.org/events/apachecon-europe/program/cfp

This year, we'll be reviewing papers "blind" - that is, looking at the
abstracts without knowing who the speaker is. This has been shown to
eliminate the "me and my buddies" nature of many tech conferences,
producing more diversity, and more new speakers. So make sure your
abstracts clearly explain what you'll be talking about.

For further updated about ApacheCon, follow us on Twitter, @ApacheCon,
or drop by our IRC channel, #apachecon on the Freenode IRC network.

-- 
Rich Bowen
WWW: http://apachecon.com/
Twitter: @ApacheCon


Re: a broker is already registered on path /brokers/ids/1

2016-08-30 Thread J316 Services
Setup is 3 participants + 1 observer.

We used 3.5.0 for dynamic configs.

What happens when a system forced killed the zookeeper process and it did not 
have time to clean up ephemeral? Does it recover in that scenario?


Thanks.

Sent from my iPad

> On Aug 30, 2016, at 8:53 AM, Flavio Junqueira  wrote:
> 
> I think you're saying that the session isn't expiring that and the ephemeral 
> isn't getting deleted. Or maybe the session is expiring but the ephemeral 
> isn't being deleted? It'd be great if you could check that the broker session 
> is expiring eventually.
> 
> Since you're on the 3.5 branch, you may want to use 3.5.2-alpha or simply try 
> 3.4.8 as Harsha suggested.
> 
> -Flavio
> 
>> On 29 Aug 2016, at 18:44, Harsha Chintalapani  wrote:
>> 
>> how many brokers you've in this cluster. Do you try using a stable
>> zookeeper release like 3.4.8?
>> -Harhsa
>> 
>>> On Mon, Aug 29, 2016 at 5:21 AM Nomar Morado  wrote:
>>> 
>>> we are using kafka 0.9.0.1 and zk 3.5.0-alpha
>>> 
>>> On Mon, Aug 29, 2016 at 8:12 AM, Nomar Morado 
>>> wrote:
>>> 
 we would get this occasionally after a weekend reboot/restart.
 
 we tried restarting a couple of times all to naught.
 
 we had to delete dk's directory to get his going again.
 
 any ideas what might cause this issue and suggestions on how to resolve
 this?
 
 
 thanks.
>>> 
>>> 
>>> 
>>> --
>>> Regards,
>>> Nomar Morado
> 


Re: How distributed countByKey works in KStream ?

2016-08-30 Thread Matthias J. Sax
No. It does not support hidden topics.

The only explanation might be, that there is no repartitioning step. But
than the question would be, if there is a bug in Kafka Streams, because
between map() and countByKey() repartitioning is required.

Can you verify that the result is correct?

-Matthias

On 08/30/2016 03:24 PM, Tommy Q wrote:
> Does Kafka support hidden topics ? (Since all the topics infos are stored
> in ZK, this probably not the case )
> 
> On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax 
> wrote:
> 
>> Hi Tommy,
>>
>> yes, you do understand Kafka Streams correctly. And yes, for shuffling,
>> na internal topic will be created under the hood. It should be named
>> "-something-repartition". I am not sure, why it is not
>> listed via bin/kafka-topics.sh
>>
>> The internal topic "-counts-changelog" you see is
>> created to back the state of countByKey() operator.
>>
>> See
>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams%3A+Internal+Data+Management
>>
>> and
>>
>> http://www.confluent.io/blog/data-reprocessing-with-kafka-
>> streams-resetting-a-streams-application
>>
>>
>> -Matthias
>>
>>
>> On 08/30/2016 06:55 AM, Tommy Q wrote:
>>> Michael, Thanks for your help.
>>>
>>> Take the word count example, I am trying to walk through the code based
>> on
>>> your explanation:
>>>
>>> val textLines: KStream[String, String] =
>> builder.stream("input-topic")
>>> val wordCounts: KStream[String, JLong] = textLines
>>>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
>>>   .map((key: String, word: String) => new KeyValue(word, word))
>>>   .countByKey("counts")
>>>   .toStream
>>>
>>> wordCounts.to(stringSerde, longSerde, "wc-out")
>>>
>>> Suppose the input-topic has two partitions and each partition has a
>> string
>>> record produced into:
>>>
>>> input-topic_0 : "a b"
 input-topic_1 : "a b c"
>>>
>>>
>>> Suppose we started two instance of the stream topology ( task_0 and
>>> task_1). So after flatMapValues & map executed, they should have the
>>> following task state:
>>>
>>> task_0 :  [ (a, "a"), (b, "b") ]
 task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
>>>
>>>
>>> Before the execution of  countByKey, the kafka-stream framework should
>>> insert a invisible shuffle phase internally:
>>>
>>> shuffled across the network :

>>>
>>>
 _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
 _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
>>>
>>>
>>> countByKey (reduce) :
>>>
>>> task_0 (counts-changelog_0) :  [ (a, 2) ]
>>>
>>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
>>>
>>>
>>> And after the execution of `wordCounts.to(stringSerde, longSerde,
>>> "wc-out")`, we get the word count output in wc-out topic:
>>>
>>> task_0 (wc-out_0) :  [ (a, 2) ]
>>>
>>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
>>>
>>>
>>>
>>> According the steps list above, do I understand the internals of kstream
>>> word count correctly ?
>>> Another question is does the shuffle across the network work by creating
>>> intermediate topics ? If so, why can't I find the intermediate topics
>> using
>>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only see
>>> the counts-changelog got created by the kstream framework.
>>>
>>>
>>>
>>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll 
>> wrote:
>>>
 In Kafka Streams, data is partitioned according to the keys of the
 key-value records, and operations such as countByKey operate on these
 stream partitions.  When reading data from Kafka, these stream
>> partitions
 map to the partitions of the Kafka input topic(s), but these may change
 once you add processing operations.

 To your question:  The first step, if the data isn't already keyed as
 needed, is to select the key you want to count by, which results in 1+
 output stream partitions.  Here, data may get shuffled across the
>> network
 (but if won't if there's no need to, e.g. when the data is already
>> keyed as
 needed).  Then the count operation is performed for each stream
>> partition,
 which is similar to the sort-and-reduce phase in Hadoop.

 On Mon, Aug 29, 2016 at 5:31 PM, Tommy  wrote:

> Hi,
>
> For "word count" example in Hadoop, there are shuffle-sort-and-reduce
> phases that handles outputs from different mappers, how does it work in
> KStream ?
>

>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: kafka-streams project compiles using maven but failed using sbt

2016-08-30 Thread Tommy Q
Got it. Thanks Michael.

On Tue, Aug 30, 2016 at 2:31 AM, Michael Noll  wrote:

> Most probably because, in your build.sbt, you didn't enable the
> -Xexperimental compiler flag for Scala.  This is required when using Scala
> 2.11 (as you do) to enable SAM for Java 8 lambda support.  Because this
> compiler flag is not set your build fails because it can translate
> `_.toUpperCase()` into a Java 8 lambda.
>
> See
> https://github.com/confluentinc/examples/blob/
> kafka-0.10.0.0-cp-3.0.0/kafka-streams/pom.xml#L209-L214
> .
>
> Also, note that your build.sbt is not equivalent to your pom.xml.  For
> example, the org.apache.kafka:kafka-clients dependency is missing in
> build.sbt.
>
>
> On Sat, Aug 27, 2016 at 2:01 PM, Tommy Go  wrote:
>
> > Hi
> >
> > I am playing with kafka-streams using Scala, but found some strange
> issue,
> > the following project compiles using mvn but failed using sbt:
> >
> > https://github.com/deeplambda/kstream-debug
> >
> > [error] /Users/tommy/tmp/kstream-debug/src/main/scala/kafka/
> > streams/WordCountDemo.scala:49:
> > missing parameter type for expanded function ((x$1) =>
> > x$1.toUpperCase())
> > [error] val uppercasedWithMapValues: KStream[Array[Byte], String]
> > = textLines.mapValues(_.toUpperCase())
> > [error]
> >  ^
> > [error] one error found
> > [error] (compile:compileIncremental) Compilation failed
> >
> >
> >
> > Any ideas why the demo failed compiling using sbt ?
> >
> > Thanks,
> > Tommy
> >
>


Re: How distributed countByKey works in KStream ?

2016-08-30 Thread Tommy Q
Does Kafka support hidden topics ? (Since all the topics infos are stored
in ZK, this probably not the case )

On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax 
wrote:

> Hi Tommy,
>
> yes, you do understand Kafka Streams correctly. And yes, for shuffling,
> na internal topic will be created under the hood. It should be named
> "-something-repartition". I am not sure, why it is not
> listed via bin/kafka-topics.sh
>
> The internal topic "-counts-changelog" you see is
> created to back the state of countByKey() operator.
>
> See
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams%3A+Internal+Data+Management
>
> and
>
> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> streams-resetting-a-streams-application
>
>
> -Matthias
>
>
> On 08/30/2016 06:55 AM, Tommy Q wrote:
> > Michael, Thanks for your help.
> >
> > Take the word count example, I am trying to walk through the code based
> on
> > your explanation:
> >
> > val textLines: KStream[String, String] =
> builder.stream("input-topic")
> > val wordCounts: KStream[String, JLong] = textLines
> >   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> >   .map((key: String, word: String) => new KeyValue(word, word))
> >   .countByKey("counts")
> >   .toStream
> >
> > wordCounts.to(stringSerde, longSerde, "wc-out")
> >
> > Suppose the input-topic has two partitions and each partition has a
> string
> > record produced into:
> >
> > input-topic_0 : "a b"
> >> input-topic_1 : "a b c"
> >
> >
> > Suppose we started two instance of the stream topology ( task_0 and
> > task_1). So after flatMapValues & map executed, they should have the
> > following task state:
> >
> > task_0 :  [ (a, "a"), (b, "b") ]
> >> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> >
> >
> > Before the execution of  countByKey, the kafka-stream framework should
> > insert a invisible shuffle phase internally:
> >
> > shuffled across the network :
> >>
> >
> >
> >> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> >> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> >
> >
> > countByKey (reduce) :
> >
> > task_0 (counts-changelog_0) :  [ (a, 2) ]
> >
> > task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> >
> >
> > And after the execution of `wordCounts.to(stringSerde, longSerde,
> > "wc-out")`, we get the word count output in wc-out topic:
> >
> > task_0 (wc-out_0) :  [ (a, 2) ]
> >
> > task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> >
> >
> >
> > According the steps list above, do I understand the internals of kstream
> > word count correctly ?
> > Another question is does the shuffle across the network work by creating
> > intermediate topics ? If so, why can't I find the intermediate topics
> using
> > `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only see
> > the counts-changelog got created by the kstream framework.
> >
> >
> >
> > On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll 
> wrote:
> >
> >> In Kafka Streams, data is partitioned according to the keys of the
> >> key-value records, and operations such as countByKey operate on these
> >> stream partitions.  When reading data from Kafka, these stream
> partitions
> >> map to the partitions of the Kafka input topic(s), but these may change
> >> once you add processing operations.
> >>
> >> To your question:  The first step, if the data isn't already keyed as
> >> needed, is to select the key you want to count by, which results in 1+
> >> output stream partitions.  Here, data may get shuffled across the
> network
> >> (but if won't if there's no need to, e.g. when the data is already
> keyed as
> >> needed).  Then the count operation is performed for each stream
> partition,
> >> which is similar to the sort-and-reduce phase in Hadoop.
> >>
> >> On Mon, Aug 29, 2016 at 5:31 PM, Tommy  wrote:
> >>
> >>> Hi,
> >>>
> >>> For "word count" example in Hadoop, there are shuffle-sort-and-reduce
> >>> phases that handles outputs from different mappers, how does it work in
> >>> KStream ?
> >>>
> >>
> >
>
>


Re: kafka broker is dropping the messages after acknowledging librdkafka

2016-08-30 Thread Mazhar Shaikh
Hi Jun,

Yes, the data is lost during leader broker failure.
But the leader broker failed due to zookeeper session expiry.
GC logs doesn't show any error/warns during this period.

Its not easy reproduce. during long run (>12hrs) with 30k msg/sec load
balanced across 96 partitions, some time in between this failure is noticed
(once/twice).

looks like this issue is similar to "
https://issues.apache.org/jira/browse/KAFKA-1211;.

But here the leader broker could have synced/committed all the the existing
data to replica before the replica is elected as leader.

below are few log around this time.

broker b2 was controller.

b2:
Server.log:
[2016-08-26 16:15:49,701] INFO re-registering broker info in ZK for broker
1 (kafka.server.KafkaHealthcheck)
[2016-08-26 16:15:49,738] INFO Registered broker 1 at path /brokers/ids/1
with address b2.broker.com:9092. (kafka.utils.ZkUtils$)
[2016-08-26 16:15:49,739] INFO done re-registering broker
(kafka.server.KafkaHealthcheck)
[2016-08-26 16:15:49,740] INFO Subscribing to /brokers/topics path to watch
for new topics (kafka.server.KafkaHealthcheck)
[2016-08-26 16:15:50,055] INFO New leader is 0
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-08-26 16:15:50,538] WARN [KafkaApi-1] Produce request with
correlation id 422786 from client rdkafka on partition [topic,92] failed
due to Leader not local for partition [topic,92] on broker 1
(kafka.server.KafkaApis)
[2016-08-26 16:15:50,544] INFO Truncating log topic-92 to offset 1746617.
(kafka.log.Log)
[2016-08-26 16:15:50,562] WARN [KafkaApi-1] Produce request with
correlation id 422793 from client rdkafka on partition [topic,92] failed
due to Leader not local for partition [topic,92] on broker 1
(kafka.server.KafkaApis)
[2016-08-26 16:15:50,578] WARN [KafkaApi-1] Produce request with
correlation id 422897 from client rdkafka on partition [topic,92] failed
due to Leader not local for partition [topic,92] on broker 1
(kafka.server.KafkaApis)
[2016-08-26 16:15:50,719] ERROR Closing socket for /169.254.2.116 because
of error (kafka.network.Processor)
kafka.common.KafkaException: Size of FileMessageSet
/data/kafka/broker-b2/topic-66/.log has been truncated
during write: old size 1048576, new size 0
at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:744)
[2016-08-26 16:15:50,729] ERROR Closing socket for /169.254.2.116 because
of error (kafka.network.Processor)
kafka.common.KafkaException: Size of FileMessageSet
/data/kafka/broker-b2/topic-68/.log has been truncated
during write: old size 1048576, new size 0
at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:744)


kafkaServer-gc.log:
2016-08-26T16:14:47.123+0530: 6123.763: [GC2016-08-26T16:14:47.123+0530:
6123.763: [ParNew: 285567K->5992K(314560K), 0.0115200 secs]
648907K->369981K(1013632K), 0.0116800 secs] [Times: user=0.19 sys=0.00,
real=0.01 secs]
2016-08-26T16:14:56.327+0530: 6132.967: [GC2016-08-26T16:14:56.327+0530:
6132.967: [ParNew: 285608K->5950K(314560K), 0.0105600 secs]
649597K->370626K(1013632K), 0.0107550 secs] [Times: user=0.15 sys=0.01,
real=0.01 secs]
2016-08-26T16:15:06.615+0530: 6143.255: [GC2016-08-26T16:15:06.615+0530:
6143.255: [ParNew: 285566K->6529K(314560K), 0.0214330 secs]
650242K->371864K(1013632K), 0.0216380 secs] [Times: user=0.30 sys=0.03,
real=0.02 secs]
2016-08-26T16:15:17.255+0530: 6153.895: [GC2016-08-26T16:15:17.255+0530:
6153.895: [ParNew: 286145K->6413K(314560K), 0.0248930 secs]
651480K->372390K(1013632K), 0.0251060 secs] [Times: user=0.32 sys=0.09,
real=0.03 secs]
2016-08-26T16:15:36.892+0530: 6173.533: [GC2016-08-26T16:15:36.892+0530:
6173.533: [ParNew: 286029K->5935K(314560K), 0.0083220 secs]
652006K->372627K(1013632K), 0.0085410 secs] [Times: user=0.11 sys=0.02,
real=0.01 secs]
2016-08-26T16:15:50.113+0530: 6186.753: [GC2016-08-26T16:15:50.113+0530:
6186.753: [ParNew: 285551K->15693K(314560K), 0.0139890 secs]
652243K->383039K(1013632K), 

Re: a broker is already registered on path /brokers/ids/1

2016-08-30 Thread Flavio Junqueira
I think you're saying that the session isn't expiring that and the ephemeral 
isn't getting deleted. Or maybe the session is expiring but the ephemeral isn't 
being deleted? It'd be great if you could check that the broker session is 
expiring eventually.

Since you're on the 3.5 branch, you may want to use 3.5.2-alpha or simply try 
3.4.8 as Harsha suggested.

-Flavio
 
> On 29 Aug 2016, at 18:44, Harsha Chintalapani  wrote:
> 
> how many brokers you've in this cluster. Do you try using a stable
> zookeeper release like 3.4.8?
> -Harhsa
> 
> On Mon, Aug 29, 2016 at 5:21 AM Nomar Morado  wrote:
> 
>> we are using kafka 0.9.0.1 and zk 3.5.0-alpha
>> 
>> On Mon, Aug 29, 2016 at 8:12 AM, Nomar Morado 
>> wrote:
>> 
>>> we would get this occasionally after a weekend reboot/restart.
>>> 
>>> we tried restarting a couple of times all to naught.
>>> 
>>> we had to delete dk's directory to get his going again.
>>> 
>>> any ideas what might cause this issue and suggestions on how to resolve
>>> this?
>>> 
>>> 
>>> thanks.
>>> 
>> 
>> 
>> 
>> --
>> Regards,
>> Nomar Morado
>> 



Auto offset commit failed

2016-08-30 Thread yuanjia8...@163.com
Hi All,
My kafka cluster is kafka0.10.0.
I have found two reasons for Auto offset commit failed in the log file. One 
is Commit cannot be completed since the group has already rebalanced and 
assigned the partitions to another member, and another is Commit offsets failed 
with retriable exception. You should retry committing offsets.
My questions are:
   1. When the consumer rejoin group?
   2. Could it be consuming the message repeatly during rebalancing?
  
Thanks.



Yuanjia Li


Re: Question: Data Loss and Data Duplication in Kafka

2016-08-30 Thread R Krishna
Experimenting with kafka myself, and found timeouts/batch expiry (valid and
invalid configurations), and max retries also can drop messages unless you
handle and log them gracefully. There are also a bunch of
org.apache.kafka.common.KafkaException hierarchy exceptions some of which
are thrown for valid reasons but also drop messages like size of messages,
buffer size, etc.,.

On Sun, Aug 28, 2016 at 1:55 AM, Jayesh Thakrar  wrote:

> I am looking at ways how one might have data loss and duplication in a
> Kafka cluster and need some help/pointers/discussions.
> So far, here's what I have come up with:
> Loss at producer-sideSince the data send call is actually adding data to a
> cache/buffer, a crash of the producer can potentially result in data
> loss.Another scenario for data loss is a producer exiting without closing
> the producer connection.
> Loss at broker-sideI think there are several situations here - all of
> which are triggered by a broker or controller crash or network issues with
> zookeepers (kind of simulating broker crashes).
> If I understand correctly, KAFKA-1211 (https://issues.apache.org/
> jira/browse/KAFKA-1211) implies that when acks is set to 0/1 and the
> leader crashes, there is a probability of data loss. Hopefully
> implementation of leader generation will help avoid this (
> https://issues.apache.org/jira/browse/KAFKA-1211?
> focusedCommentId=15402622=com.atlassian.jira.
> plugin.system.issuetabpanels:comment-tabpanel#comment-15402622)
> And a unique situation as described in KAFKA-3410 (
> https://issues.apache.org/jira/browse/KAFKA-3410) can cause broker or
> cluster shutdown leading to data loss as described in KAFKA-3924 (resolved
> in 0.10.0.1).
> And data duplication can attributed primarily to consumer offset
> management which is done at batch/periodic intervals.
> Can anyone think or know of any other scenarios?
> Thanks,Jayesh
>
>
>
>


-- 
Radha Krishna, Proddaturi
253-234-5657


Re: Kafka bootup exception while recovering log file

2016-08-30 Thread Gaurav Agarwal
Kafka version: 0.10.0

Exception Trace

java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
at kafka.log.LogSegment.recover(LogSegment.scala:189)
at kafka.log.Log.recoverLog(Log.scala:268)
at kafka.log.Log.loadSegments(Log.scala:243)
at kafka.log.Log.(Log.scala:101)
at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Test Code (same exception trace is see in broker logs as well on prod
machines with exactly the same lof files as given in this mini test)
-

val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer)
val config = LogConfig(logProps)
val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12")
var log = new Log(cp, config, 0, time.scheduler, time


On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai 
wrote:

> Can you paste the entire exception stacktrace please?
>
> -Jaikiran
>
> On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:
>
>> Hi there, just wanted to bump up the thread one more time to check if
>> someone can point us in the right direction... This one was quite a
>> serious
>> failure that took down many of our kafka brokers..
>>
>> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal > >
>> wrote:
>>
>> Hi All,
>>>
>>> We are facing a weird problem where Kafka broker fails to start due to an
>>> unhandled exception while 'recovering' a log segment. I have been able to
>>> isolate the problem to a single record and providing the details below:
>>>
>>> During Kafka restart, if index files are corrupted or they don't exist,
>>> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes
>>> -
>>> LogSegment:recover()
>>> I the main while loop here which iterates over the entries in the log:
>>> while(iter.hasNext) { val entry = iter.next}, I get an entry with
>>> complete underlying byte buffer as follows:
>>>
>>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
>>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
>>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
>>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
>>> 10,
>>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
>>> 108,
>>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46,
>>> 114,
>>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
>>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
>>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
>>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11,
>>> 8,
>>> 34, 20, 34, 9, -122, 56, 52, 

Kafka 0.9 consumer gets stuck in epoll

2016-08-30 Thread Rajiv Kurian
We had a Kafka 0.9 consumer stuck in the epoll native call under the
following circumstances.


1. It was started bootstrapped with a cluster with 3 brokers A, B and C
with ids 1,2,3.
2. Change the assignment of the brokers to some topic partitions. Seek to
the beginning of each topic partition.
3. NO poll calls were made at all.
4. Each of the brokers A,B and C were replaced one by one by three new
brokers D, E and F with the same ids 1,2,3. The process of replacement was:
 1. Shut down broker A (has id 1).
 2. Bring up broker B (has id 1 i.e same as A).
 3. Give it a minute odd and do the same with B and C>


5. So by this time none of the bootstrapped brokers were alive. They were
all replaced. I can imagine that this would cause a problem with the new
0.9 consumer since it doesn't have a watch on the brokers directory in ZK
any more.

6. Call poll finally on the consumer.

Expected result - Some kind of exception or just empty results since the
none of the brokers in the bootstrap list are present any more.

Observed result - The poll call is just blocked in Kafka. Even though a
timeout of 500ms was provided it never returned. I am not sure why this
would happen but the same thing happened on 45 hosts so I am guessing this
is pretty reproducible. This led to the thread just getting stuck. We had
to ultimately kill -9 our processes to recover from this. Ideally a Kafka
poll call with a given timeout should never block indefinitely. Here is the
stack trace I was able to get:

 java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.$$YJP$$epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x000504e58468> (a sun.nio.ch.Util$2)
- locked <0x000504e58450> (a java.util.Collections$UnmodifiableSet)
- locked <0x000504e029d8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at
sf.org.apache.kafka9.common.network.Selector.select(Selector.java:425)
at sf.org.apache.kafka9.common.network.Selector.poll(Selector.java:254)
at
sf.org.apache.kafka9.clients.NetworkClient.poll(NetworkClient.java:256)
at
sf.org.apache.kafka9.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at
sf.org.apache.kafka9.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at
sf.org.apache.kafka9.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
sf.org.apache.kafka9.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at
sf.org.apache.kafka9.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
at
sf.org.apache.kafka9.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
at
sf.org.apache.kafka9.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)

sf.org.apache.kafka9 is just our shaded jar but this is the stock Kafka 0.9
consumer code.

Is this a known issue? Even though this happened under extraordinary
circumstances (i.e the entire bootstrap list was replaced) blocking is
ended up stalling the entire thread this code was running on.

Thanks,
Rajiv


Re: Kafka bootup exception while recovering log file

2016-08-30 Thread Jaikiran Pai

Can you paste the entire exception stacktrace please?

-Jaikiran
On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:

Hi there, just wanted to bump up the thread one more time to check if
someone can point us in the right direction... This one was quite a serious
failure that took down many of our kafka brokers..

On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal 
wrote:


Hi All,

We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
48, 72]

A toString() on this entry yields:

*MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
= null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*

It appears that this record is corrupt and deserializing/decompressing it
causes exceptions which are unhandled. Specifically in 0.10.0 version this
calls fails with NoSuchElementException

ByteBufferMessageSet.deepIterator(entry).next().offset

Note: This message was written to disk using* kafka 0.10.0 broker running
snappy jar version 1.1.1.7* (which is known to have some read time bugs).
The log file itself is 512MB large and this message appears at around 4MB
in the file.

We have upgraded snappy; but should this condition be handled correctly?
What is the correct behavior here? Should the exception be handled and log
file be truncated? At the moment this causes kafka to completely crash with
no recovery path except of deleting the bad data file manually and then
starting kafka.

--

cheers,

gaurav


A test case to repro the crash

@Test

def testCorruptLog() {

  val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
-59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
-58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
-115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
52, 54, 48, 56, 48, 72);

   val msg = new Message(ByteBuffer.wrap(buf), None, None)

   val entry = new MessageAndOffset(msg, 4449011L)

   val deepIterator: Iterator[MessageAndOffset] =
ByteBufferMessageSet.deepIterator(entry)

   deepIterator.next().offset

}