Re: Kafka Source Connector tasks run only in same node

2016-08-31 Thread dhanuka ranasinghe
Please note *zookeeper* running on only in *10.72.133.195
*

On Thu, Sep 1, 2016 at 9:49 AM, dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> As you can see, connector/worker and task run in different hosts
>
> On Thu, Sep 1, 2016 at 9:48 AM, dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com> wrote:
>
>> kafka connector status
>>
>> {"name":"socket-connector","connector":{"state":"RUNNING","worker_id":"*10.72.133.194:8084
>> *"},"tasks":[{"state":"RUNNING"
>> ,"id":0,"worker_id":"*10.72.133.195:8084 *"}]}
>>
>> On Wed, Aug 31, 2016 at 8:34 PM, dhanuka ranasinghe <
>> dhanuka.priyan...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I'm new to Kafka and I created TCP Kafka Source connector [1] and
>>> deployed in Kafka Connect Cluster in distributed mode. There is two
>>> node/instance of connectors in Kafka Connector cluster.
>>>
>>> But out of them only in one node run *tasks* even though it shows
>>> connector in both nodes active and running status.
>>>
>>> In here I have used RxNetty  server, so what happened is netty server up
>>> in one node/host but tasks are running in other node/host
>>>
>>> Could you please let me know the issue with this? Thanks in advance.
>>>
>>> node==host
>>>
>>>
>>> [1] https://github.com/dhanuka84/kafka-connect-tcp
>>>
>>> cheers,
>>> Dhanuka
>>>
>>> --
>>> Nothing Impossible,Creativity is more important than knowledge.
>>>
>>
>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>



-- 
Nothing Impossible,Creativity is more important than knowledge.


Re: Kafka Source Connector tasks run only in same node

2016-08-31 Thread dhanuka ranasinghe
As you can see, connector/worker and task run in different hosts

On Thu, Sep 1, 2016 at 9:48 AM, dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> kafka connector status
>
> {"name":"socket-connector","connector":{"state":"RUNNING","worker_id":"*10.72.133.194:8084
> *"},"tasks":[{"state":"
> RUNNING","id":0,"worker_id":"*10.72.133.195:8084
> *"}]}
>
> On Wed, Aug 31, 2016 at 8:34 PM, dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm new to Kafka and I created TCP Kafka Source connector [1] and
>> deployed in Kafka Connect Cluster in distributed mode. There is two
>> node/instance of connectors in Kafka Connector cluster.
>>
>> But out of them only in one node run *tasks* even though it shows
>> connector in both nodes active and running status.
>>
>> In here I have used RxNetty  server, so what happened is netty server up
>> in one node/host but tasks are running in other node/host
>>
>> Could you please let me know the issue with this? Thanks in advance.
>>
>> node==host
>>
>>
>> [1] https://github.com/dhanuka84/kafka-connect-tcp
>>
>> cheers,
>> Dhanuka
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>



-- 
Nothing Impossible,Creativity is more important than knowledge.


Re: Kafka Source Connector tasks run only in same node

2016-08-31 Thread dhanuka ranasinghe
kafka connector status

{"name":"socket-connector","connector":{"state":"RUNNING","worker_id":"*10.72.133.194:8084
*
"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"*10.72.133.195:8084
*"}]}

On Wed, Aug 31, 2016 at 8:34 PM, dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> Hi All,
>
> I'm new to Kafka and I created TCP Kafka Source connector [1] and deployed
> in Kafka Connect Cluster in distributed mode. There is two node/instance of
> connectors in Kafka Connector cluster.
>
> But out of them only in one node run *tasks* even though it shows
> connector in both nodes active and running status.
>
> In here I have used RxNetty  server, so what happened is netty server up
> in one node/host but tasks are running in other node/host
>
> Could you please let me know the issue with this? Thanks in advance.
>
> node==host
>
>
> [1] https://github.com/dhanuka84/kafka-connect-tcp
>
> cheers,
> Dhanuka
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>



-- 
Nothing Impossible,Creativity is more important than knowledge.


Re: Kafka consumers unable to process message

2016-08-31 Thread Jason Gustafson
The replicas are always trying to fetch new data from the partition leader.
When the leader fails, any in-flight fetches will fail and cause errors
such as the ones you saw in the broker log. Eventually the replicas will
discover the new leader and begin fetching again. And of course one of the
replicas will become the new leader.

-Jason

On Wed, Aug 31, 2016 at 8:04 PM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> I'm trying get the consumer logs and will send you. So it means it can
> happen even my local datacenter too. Still I'm not understanding if 3 nodes
> are up and message already replicated why it's trying to fetch the data
> from failed node. Can you please explain bit details how it works. Thanks
> for your response.
>
> -Original Message-
> From: Jason Gustafson [mailto:ja...@confluent.io]
> Sent: Wednesday, August 31, 2016 10:56 PM
> To: users@kafka.apache.org
> Cc: d...@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> The exceptions show one of the replica fetcher threads on the broker
> failing which makes perfect sense since some of the partitions were bound
> to have leaders in the failed datacenter. I'd actually like to see the
> consumer logs at DEBUG level if possible.
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) <
> achintya_gh...@comcast.com> wrote:
>
> > Hi Jason,
> >
> > No, I didn't bring down any zookeeper server. Even I tried with 3
> > zookeeper server one as an 'Observer' but the same issue.
> >
> > Here is the server log from one of the node of my other datacenter:
> >
> > [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> > (kafka.log.Log)
> > [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> > (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4]
> > Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker
> > BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> > ReplicaFetcherManager)
> > [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to 3 was disconnected before the
> > response was read
> > at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > NetworkClientBlockingOps.scala:87)
> > at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > NetworkClientBlockingOps.scala:84)
> > at scala.Option.foreach(Option.scala:257)
> > at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > scala:84)
> > at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > scala:80)
> > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> > NetworkClientBlockingOps.scala:137)
> > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollContinuously$extension(
> > NetworkClientBlockingOps.scala:143)
> > at
> > kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> > extension(NetworkClientBlockingOps.scala:80)
> > at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:244)
> > at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> > at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> > at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> > at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> > at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> > at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> > at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> > at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> > at kafka.server.ReplicaFetcherThread.sendRequ

RE: Kafka consumers unable to process message

2016-08-31 Thread Ghosh, Achintya (Contractor)
I'm trying get the consumer logs and will send you. So it means it can happen 
even my local datacenter too. Still I'm not understanding if 3 nodes are up and 
message already replicated why it's trying to fetch the data from failed node. 
Can you please explain bit details how it works. Thanks for your response.

-Original Message-
From: Jason Gustafson [mailto:ja...@confluent.io] 
Sent: Wednesday, August 31, 2016 10:56 PM
To: users@kafka.apache.org
Cc: d...@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

The exceptions show one of the replica fetcher threads on the broker failing 
which makes perfect sense since some of the partitions were bound to have 
leaders in the failed datacenter. I'd actually like to see the consumer logs at 
DEBUG level if possible.

Thanks,
Jason

On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) < 
achintya_gh...@comcast.com> wrote:

> Hi Jason,
>
> No, I didn't bring down any zookeeper server. Even I tried with 3 
> zookeeper server one as an 'Observer' but the same issue.
>
> Here is the server log from one of the node of my other datacenter:
>
> [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> (kafka.log.Log)
> [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] 
> Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker 
> BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> ReplicaFetcherManager)
> [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the 
> response was read
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
> at scala.Option.foreach(Option.scala:257)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
> at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> NetworkClientBlockingOps.scala:137)
> at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollContinuously$extension(
> NetworkClientBlockingOps.scala:143)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
> at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:244)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
> at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
> at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
> at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
> at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
> at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
> at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
> at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
> at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
> at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@5ad

Re: Kafka consumers unable to process message

2016-08-31 Thread Jason Gustafson
The exceptions show one of the replica fetcher threads on the broker
failing which makes perfect sense since some of the partitions were bound
to have leaders in the failed datacenter. I'd actually like to see the
consumer logs at DEBUG level if possible.

Thanks,
Jason

On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Hi Jason,
>
> No, I didn't bring down any zookeeper server. Even I tried with 3
> zookeeper server one as an 'Observer' but the same issue.
>
> Here is the server log from one of the node of my other datacenter:
>
> [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> (kafka.log.Log)
> [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added
> fetcher for partitions List([[TEST3,0], initOffset 0 to broker
> BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> ReplicaFetcherManager)
> [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the response
> was read
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
> at scala.Option.foreach(Option.scala:257)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
> at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> NetworkClientBlockingOps.scala:137)
> at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollContinuously$extension(
> NetworkClientBlockingOps.scala:143)
> at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
> at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:244)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
> at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
> at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
> at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
> at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
> at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
> at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
> at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
> at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
> at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
> at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
> at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
> at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
> at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
> at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkCli

RE: Kafka consumers unable to process message

2016-08-31 Thread Ghosh, Achintya (Contractor)
Hi Jason,

No, I didn't bring down any zookeeper server. Even I tried with 3 zookeeper 
server one as an 'Observer' but the same issue.

Here is the server log from one of the node of my other datacenter:

[2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0. 
(kafka.log.Log)
[2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting  
(kafka.server.ReplicaFetcherThread)
[2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added 
fetcher for partitions List([[TEST3,0], initOffset 0 to broker 
BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) 
(kafka.server.ReplicaFetcherManager)
[2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was 
read
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
at scala.Option.foreach(Option.scala:257)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: 
null) failed
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: 
null) failed
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
at 
kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
   

Re: Kafka consumers unable to process message

2016-08-31 Thread Jason Gustafson
Hi Achintya,

Just to clarify, you did not take down either of the zookeepers in this
test, right? Having only two zookeepers in the ensemble would mean that if
either one of them failed, zookeeper wouldn't be able to reach quorum.

I'm not entirely sure why this would happen. One possibility is that the
consumer is failing to find the new coordinator, which might happen if all
the replicas for one of the __consumer_offsets partitions were located in
the "failed" datacenter. Perhaps you can enable DEBUG logging and post some
logs so we can see what it's actually doing during poll().

By the way, I noticed that your consumer configuration settings seem a
little mixed up. The new consumer doesn't actually communicate with
Zookeeper, so there's no need for those settings. And you don't need to
include the "offsets.storage" option since Kafka is the only choice. Also,
I don't think "consumer.timeout.ms" is an option.

-Jason


On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Hi Jason,
>
> Thanks for your response.
>
> I know that is a known issue and I resolved it calling wakeup method by
> another thread. But here my problem is different, let me explain , it's
> very basic
>
> I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> another(remote) datacenter and kept replication factor 6 with 2 zookeeper
> servers one from each datacenter ). Now I brought down all 3 nodes of my
> local datacenter and produced few messages and I see producer is working
> fine even my local data center nodes are down. It successfully writes the
> messages to other data center nodes. But when I'm trying to consume the
> messages the consumer.poll method gets stuck as my local datacenter is down
> though other datacenter's nodes are up.
>
> My question is as the data has been written successfully to other
> datacenter why consumer part is not working?
>
> Here is my Producer settings:
>
> props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,
> psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
> props.put("acks", "1");
> props.put("max.block.ms", 1000);
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
> props.put("value.serializer", "com.comcast.ps.kafka.object.
> CustomMessageSer");
>
> and here is Consumer settings:
>
> props.put("group.id", "app-consumer");
> props.put("enable.auto.commit", "false");
> props.put("auto.offset.reset", "earliest");
> props.put("auto.commit.interval.ms", "500");
> props.put("session.timeout.ms", "12");
> props.put("consumer.timeout.ms", "1");
> props.put("zookeeper.session.timeout.ms", "12");
> props.put("zookeeper.connection.timeout.ms", "6");
> props.put("offsets.storage","kafka");
> props.put("request.timeout.ms", "15");
> props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:
> 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
> props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
> props.put("value.deserializer",
> "com.comcast.ps.kafka.object.CustomMessageDeSer");
>
> Is it because of consumer is not able to get the broker metadata if it is
> trying to connect other datacenter's zookeeper server? I tried with to
> increate the zookeeper session timeout and connection time out but no luck.
>
> Please help on this.
> Thanks
> Achintya
>
>
> -Original Message-
> From: Jason Gustafson [mailto:ja...@confluent.io]
> Sent: Wednesday, August 31, 2016 4:05 PM
> To: users@kafka.apache.org
> Cc: d...@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> exception in this case or do you just want to keep it from blocking
> indefinitely? If the latter, you could escape the poll from another thread
> using wakeup().
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> achintya_gh...@comcast.com> wrote:
>
> > Hi there,
> >
> > Kafka consumer gets stuck at consumer.poll() method if my current
> > datacenter is down and replicated messages are in remote datacenter.
> >
> > How to solve that issue?
> >
> > Thanks
> > Achintya
> >
>


MirrorMaker: mirror data from 0.8 to 0.10?

2016-08-31 Thread Ren Cao
Hi,

We've been trying  to use 0.10 build-in MirrorMaker to mirror data from a 0.8.2 
cluster to a 0.10 cluster, but keep seeing follow errors:

[cid:image002.jpg@01D203AE.0B3B3C70]

It looks like a incompatibility issue when Kafka 0.10 consumes byte steam 
produced by Kafka 0.8 cluster.

Can MirrorMaker mirror data from 0.8 cluster to 0.10 cluster?

Thank you,
Ren


RE: Kafka consumers unable to process message

2016-08-31 Thread Ghosh, Achintya (Contractor)
Hi Jason,

Thanks for your response.

I know that is a known issue and I resolved it calling wakeup method by another 
thread. But here my problem is different, let me explain , it's very basic

I created one cluster with 6 nodes( 3 from one datacenter and 3 from 
another(remote) datacenter and kept replication factor 6 with 2 zookeeper 
servers one from each datacenter ). Now I brought down all 3 nodes of my local 
datacenter and produced few messages and I see producer is working fine even my 
local data center nodes are down. It successfully writes the messages to other 
data center nodes. But when I'm trying to consume the messages the 
consumer.poll method gets stuck as my local datacenter is down though other 
datacenter's nodes are up. 

My question is as the data has been written successfully to other datacenter 
why consumer part is not working?

Here is my Producer settings:

props.put("bootstrap.servers", 
"psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617");
props.put("acks", "1");
props.put("max.block.ms", 1000);
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.comcast.ps.kafka.object.CustomMessageSer");

and here is Consumer settings:

props.put("group.id", "app-consumer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "500");
props.put("session.timeout.ms", "12");
props.put("consumer.timeout.ms", "1");
props.put("zookeeper.session.timeout.ms", "12");
props.put("zookeeper.connection.timeout.ms", "6");
props.put("offsets.storage","kafka");
props.put("request.timeout.ms", "15");
props.put("bootstrap.servers", 
"psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617");

props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
"com.comcast.ps.kafka.object.CustomMessageDeSer");

Is it because of consumer is not able to get the broker metadata if it is 
trying to connect other datacenter's zookeeper server? I tried with to increate 
the zookeeper session timeout and connection time out but no luck.

Please help on this.
Thanks
Achintya


-Original Message-
From: Jason Gustafson [mailto:ja...@confluent.io] 
Sent: Wednesday, August 31, 2016 4:05 PM
To: users@kafka.apache.org
Cc: d...@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

Hi Achintya,

We have a JIRA for this problem: https://issues.
apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an 
exception in this case or do you just want to keep it from blocking 
indefinitely? If the latter, you could escape the poll from another thread 
using wakeup().

Thanks,
Jason

On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < 
achintya_gh...@comcast.com> wrote:

> Hi there,
>
> Kafka consumer gets stuck at consumer.poll() method if my current 
> datacenter is down and replicated messages are in remote datacenter.
>
> How to solve that issue?
>
> Thanks
> Achintya
>


Re: Kafka consumers unable to process message

2016-08-31 Thread Jason Gustafson
Kafka clients have tended to make broker retries transparent to the user.
There's been discussion on various JIRAs about what we should do when all
the known brokers become unreachable. One option is to revert to the
configured bootstrap broker list, which is nice if you've configured a vip
for bootstrapping. More generally, we've discussed introducing a pluggable
interface for broker discovery, which allows for integration with service
discovery frameworks like consul. I'm supportive of this option, but we
probably need a champion with a little more time to investigate the options
and push it through. For the JIRA that I linked to above, I'm inclined to
have poll() silently retry since that is consistent with current behavior,
but it should not block longer than the passed timeout.

-Jason

On Wed, Aug 31, 2016 at 3:00 PM, Jim Jagielski  wrote:

> Yeah, let's figure out the "best" action to take...
>
> Looks like something I'd like to get a handle on.
>
> > On Aug 31, 2016, at 4:05 PM, Jason Gustafson  wrote:
> >
> > Hi Achintya,
> >
> > We have a JIRA for this problem: https://issues.
> > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> > exception in this case or do you just want to keep it from blocking
> > indefinitely? If the latter, you could escape the poll from another
> thread
> > using wakeup().
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> > achintya_gh...@comcast.com> wrote:
> >
> >> Hi there,
> >>
> >> Kafka consumer gets stuck at consumer.poll() method if my current
> >> datacenter is down and replicated messages are in remote datacenter.
> >>
> >> How to solve that issue?
> >>
> >> Thanks
> >> Achintya
> >>
>
>


Re: Kafka Source Connector tasks run only in same node

2016-08-31 Thread Shikhar Bhushan
Hi Dhanuka,

For something like a TCP connector where you have a dependency on the
placement, the standalone mode makes more sense. With distributed mode you
can't have guarantees of what instances your connector's tasks will be
distributed to.

Best,

Shikhar

On Wed, Aug 31, 2016 at 8:05 AM dhanuka ranasinghe <
dhanuka.priyan...@gmail.com> wrote:

> Hi All,
>
> I'm new to Kafka and I created TCP Kafka Source connector [1] and deployed
> in Kafka Connect Cluster in distributed mode. There is two node/instance of
> connectors in Kafka Connector cluster.
>
> But out of them only in one node run *tasks* even though it shows
> connector in both nodes active and running status.
>
> In here I have used RxNetty  server, so what happened is netty server up in
> one node/host but tasks are running in other node/host
>
> Could you please let me know the issue with this? Thanks in advance.
>
> node==host
>
>
> [1] https://github.com/dhanuka84/kafka-connect-tcp
>
> cheers,
> Dhanuka
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>


Re: Kafka consumers unable to process message

2016-08-31 Thread Jim Jagielski
Yeah, let's figure out the "best" action to take...

Looks like something I'd like to get a handle on.

> On Aug 31, 2016, at 4:05 PM, Jason Gustafson  wrote:
> 
> Hi Achintya,
> 
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> exception in this case or do you just want to keep it from blocking
> indefinitely? If the latter, you could escape the poll from another thread
> using wakeup().
> 
> Thanks,
> Jason
> 
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> achintya_gh...@comcast.com> wrote:
> 
>> Hi there,
>> 
>> Kafka consumer gets stuck at consumer.poll() method if my current
>> datacenter is down and replicated messages are in remote datacenter.
>> 
>> How to solve that issue?
>> 
>> Thanks
>> Achintya
>> 



Re: Monitoring the max lag of a kafka streams application.

2016-08-31 Thread Rohit Valsakumar
Hi Guozhang,

I verified that the metric is available to the custom reporter I
registered with the Kafka Streams configuration and the class does an
asynchronous poll on the metric periodically to get it’s value and logs
it. Once the job has caught up to the end offsets in the input topic, the
value of the metric comes back as ‘-Infinity’. Is this expected I would
have expected something like ‘0’.

The log line in the async poller:

MetricName name = metric.metricName();

gLogger.info("{}, {}, {}", name.group(), name.name(), metric.value());


And the output:

consumer-fetch-manager-metrics, records-lag-max, -Infinity


Thanks,
Rohit




On 8/31/16, 1:20 PM, "Rohit Valsakumar"  wrote:

>I will take a look at it.
>
>Thanks,
>Rohit
>
>On 8/30/16, 6:10 PM, "Guozhang Wang"  wrote:
>
>>Hi Rohit,
>>
>>Just for clarification, as stated in the java doc, metricChange "is
>>called
>>whenever a metric is updated or added". It is not the function when a
>>metric is recorded; in fact, the metrics collection is in the "pull"
>>model,
>>where the implemented reporters can fetch the current calculated values
>>based on its metric type (rate, min, max, histogram, etc). You may want
>>to
>>first take a look at the default JMXReporter implementation with the "
>>getAttribute" function which does the pulling to make sure your
>>customized
>>reporter does the right behavior.
>>
>>
>>
>>Guozhang
>>
>>On Tue, Aug 30, 2016 at 6:01 PM, Guozhang Wang 
>>wrote:
>>
>>> Hello Rohit,
>>>
>>> As you are already aware, monitoring kafka streams is no difference
>>>than
>>> monitoring kafka producers / consumers. So you can just monitor on its
>>> embedded consumer's "records-lag-max" metric, which gets recorded
>>> whenever the consumer received the fetch response.
>>>
>>> As for your application, your way of passing the class name of your
>>> implemented MetricsReporter through the StreamsConfig is correct. Have
>>> you seen records being processing by your streams application, meaning
>>>that
>>> there are indeed some fetched records from the fetch response?
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Aug 29, 2016 at 10:48 AM, Rohit Valsakumar
>>>
>>> wrote:
>>>
 Hi all,

 Any opinions about monitoring the records-lag-max for a kafka streams
job?

 Thanks,
 Rohit

 On 8/26/16, 2:53 PM, "Rohit Valsakumar"  wrote:

 >Hi all,
 >
 >I want to monitor the max lag of a kafka streams job which is
consuming
 >from three topics and to do that I have implemented the
MetricsReporter
 >interface which I pass through the Streams Configuration to the
 >KafkaStreams object. In the implementation¹s metricChange()  method I
 >have logging for the metric with group consumer-fetch-manager-metrics
and
 >name records-lag-max. However, the implementation never receives a
 >metricChange notification for 'records-lag-max¹.
 >
 >I would like to know if what I am doing makes sense, is the correct
 >approach and whether the implementation should be getting
notifications
 >for 'records-lag-max¹ periodically or only when the max lag is
changing
 >i.e. Increasing/decreasing.
 >
 >Thanks,
 >Rohit
 >
 >
 >
 >
 >This email and any attachments may contain confidential and
privileged
 >material for the sole use of the intended recipient. Any review,
copying,
 >or distribution of this email (or any attachments) by others is
 >prohibited. If you are not the intended recipient, please contact the
 >sender immediately and permanently delete this email and any
attachments.
 >No employee or agent of TiVo Inc. is authorized to conclude any
binding
 >agreement on behalf of TiVo Inc. by email. Binding agreements with
TiVo
 >Inc. may only be made by a signed written agreement.


 

 This email and any attachments may contain confidential and privileged
 material for the sole use of the intended recipient. Any review,
copying,
 or distribution of this email (or any attachments) by others is
prohibited.
 If you are not the intended recipient, please contact the sender
 immediately and permanently delete this email and any attachments. No
 employee or agent of TiVo Inc. is authorized to conclude any binding
 agreement on behalf of TiVo Inc. by email. Binding agreements with
TiVo
 Inc. may only be made by a signed written agreement.

>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>>
>>--
>>-- Guozhang
>
>
>
>
>This email and any attachments may contain confidential and privileged
>material for the sole use of the intended recipient. Any review, copying,
>or distribution of this email (or any attachments) by others is
>prohibited. If you are not the intended recipient, please contact the
>sender immediately and permanently delete this email and a

Re: How distributed countByKey works in KStream ?

2016-08-31 Thread Matthias J. Sax
Hi Tommy,

I did checkout your github project and can verify the "issue". As you
are using Kafka 0.10.0.1 the automatic repartitioning step is not available.

If you use "trunk" version, your program will run as expected. If you
want to stay with 0.10.0.1, you need to repartition the data after map()
explicitly, via a call to through():

> val wordCounts: KStream[String, JLong] = textLines
>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
>   .map((key: String, word: String) => new KeyValue(word, word))
>   .through("my-repartitioing-topic")
>   .countByKey("counts")
>   .toStream

Keep in mind, that it is recommended to create all user topics manually.
Thus, you should create your repartitioning topic you specify in
through() before you start your Kafka Streams application.


-Matthias


On 08/31/2016 09:07 PM, Guozhang Wang wrote:
> Hello Tommy,
> 
> Which version of Kafka are you using?
> 
> Guozhang
> 
> On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q  wrote:
> 
>> I cleaned up all the zookeeper & kafka states and run the WordCountDemo
>> again, the results in wc-out is still wrong:
>>
>> a 1
>>> b 1
>>> a 1
>>> b 1
>>> c 1
>>
>>
>>
>> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll 
>> wrote:
>>
>>> Can you double-check whether the results in wc-out are not rather:
>>>
>>> a 1
>>> b 1
>>> a 2
>>> b 2
>>> c 1
>>>
>>> ?
>>>
>>> On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q  wrote:
>>>
 Tried the word count example as discussed, the result in wc-out is
>> wrong:

 a 1
> b 1
> a 1
> b 1
> c 1


 The expected result should be:

 a 2
> b 2
> c 1


 Kafka version is 0.10.0.1


 On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <
>> matth...@confluent.io

 wrote:

> No. It does not support hidden topics.
>
> The only explanation might be, that there is no repartitioning step.
>>> But
> than the question would be, if there is a bug in Kafka Streams,
>> because
> between map() and countByKey() repartitioning is required.
>
> Can you verify that the result is correct?
>
> -Matthias
>
> On 08/30/2016 03:24 PM, Tommy Q wrote:
>> Does Kafka support hidden topics ? (Since all the topics infos are
 stored
>> in ZK, this probably not the case )
>>
>> On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
 matth...@confluent.io>
>> wrote:
>>
>>> Hi Tommy,
>>>
>>> yes, you do understand Kafka Streams correctly. And yes, for
 shuffling,
>>> na internal topic will be created under the hood. It should be
>> named
>>> "-something-repartition". I am not sure, why it
>> is
 not
>>> listed via bin/kafka-topics.sh
>>>
>>> The internal topic "-counts-changelog" you see is
>>> created to back the state of countByKey() operator.
>>>
>>> See
>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>> Kafka+Streams%3A+Internal+Data+Management
>>>
>>> and
>>>
>>> http://www.confluent.io/blog/data-reprocessing-with-kafka-
>>> streams-resetting-a-streams-application
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 08/30/2016 06:55 AM, Tommy Q wrote:
 Michael, Thanks for your help.

 Take the word count example, I am trying to walk through the code
 based
>>> on
 your explanation:

 val textLines: KStream[String, String] =
>>> builder.stream("input-topic")
 val wordCounts: KStream[String, JLong] = textLines
   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.
>>> asJava)
   .map((key: String, word: String) => new KeyValue(word,
>> word))
   .countByKey("counts")
   .toStream

 wordCounts.to(stringSerde, longSerde, "wc-out")

 Suppose the input-topic has two partitions and each partition
>> has a
>>> string
 record produced into:

 input-topic_0 : "a b"
> input-topic_1 : "a b c"


 Suppose we started two instance of the stream topology ( task_0
>> and
 task_1). So after flatMapValues & map executed, they should have
>>> the
 following task state:

 task_0 :  [ (a, "a"), (b, "b") ]
> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]


 Before the execution of  countByKey, the kafka-stream framework
 should
 insert a invisible shuffle phase internally:

 shuffled across the network :
>


> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]


 countByKey (reduce) :

 task_0 (counts-changelog_0) :  [ (a, 2) ]

 task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]


 And after the execu

Re: Monitoring the max lag of a kafka streams application.

2016-08-31 Thread Rohit Valsakumar
I will take a look at it.

Thanks,
Rohit

On 8/30/16, 6:10 PM, "Guozhang Wang"  wrote:

>Hi Rohit,
>
>Just for clarification, as stated in the java doc, metricChange "is called
>whenever a metric is updated or added". It is not the function when a
>metric is recorded; in fact, the metrics collection is in the "pull"
>model,
>where the implemented reporters can fetch the current calculated values
>based on its metric type (rate, min, max, histogram, etc). You may want to
>first take a look at the default JMXReporter implementation with the "
>getAttribute" function which does the pulling to make sure your customized
>reporter does the right behavior.
>
>
>
>Guozhang
>
>On Tue, Aug 30, 2016 at 6:01 PM, Guozhang Wang  wrote:
>
>> Hello Rohit,
>>
>> As you are already aware, monitoring kafka streams is no difference than
>> monitoring kafka producers / consumers. So you can just monitor on its
>> embedded consumer's "records-lag-max" metric, which gets recorded
>> whenever the consumer received the fetch response.
>>
>> As for your application, your way of passing the class name of your
>> implemented MetricsReporter through the StreamsConfig is correct. Have
>> you seen records being processing by your streams application, meaning
>>that
>> there are indeed some fetched records from the fetch response?
>>
>> Guozhang
>>
>>
>> On Mon, Aug 29, 2016 at 10:48 AM, Rohit Valsakumar
>>
>> wrote:
>>
>>> Hi all,
>>>
>>> Any opinions about monitoring the records-lag-max for a kafka streams
>>>job?
>>>
>>> Thanks,
>>> Rohit
>>>
>>> On 8/26/16, 2:53 PM, "Rohit Valsakumar"  wrote:
>>>
>>> >Hi all,
>>> >
>>> >I want to monitor the max lag of a kafka streams job which is
>>>consuming
>>> >from three topics and to do that I have implemented the
>>>MetricsReporter
>>> >interface which I pass through the Streams Configuration to the
>>> >KafkaStreams object. In the implementation¹s metricChange()  method I
>>> >have logging for the metric with group consumer-fetch-manager-metrics
>>>and
>>> >name records-lag-max. However, the implementation never receives a
>>> >metricChange notification for 'records-lag-max¹.
>>> >
>>> >I would like to know if what I am doing makes sense, is the correct
>>> >approach and whether the implementation should be getting
>>>notifications
>>> >for 'records-lag-max¹ periodically or only when the max lag is
>>>changing
>>> >i.e. Increasing/decreasing.
>>> >
>>> >Thanks,
>>> >Rohit
>>> >
>>> >
>>> >
>>> >
>>> >This email and any attachments may contain confidential and privileged
>>> >material for the sole use of the intended recipient. Any review,
>>>copying,
>>> >or distribution of this email (or any attachments) by others is
>>> >prohibited. If you are not the intended recipient, please contact the
>>> >sender immediately and permanently delete this email and any
>>>attachments.
>>> >No employee or agent of TiVo Inc. is authorized to conclude any
>>>binding
>>> >agreement on behalf of TiVo Inc. by email. Binding agreements with
>>>TiVo
>>> >Inc. may only be made by a signed written agreement.
>>>
>>>
>>> 
>>>
>>> This email and any attachments may contain confidential and privileged
>>> material for the sole use of the intended recipient. Any review,
>>>copying,
>>> or distribution of this email (or any attachments) by others is
>>>prohibited.
>>> If you are not the intended recipient, please contact the sender
>>> immediately and permanently delete this email and any attachments. No
>>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>>> Inc. may only be made by a signed written agreement.
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
>--
>-- Guozhang




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Monitoring the max lag of a kafka streams application.

2016-08-31 Thread Rohit Valsakumar
Hi Guozhang,

Thanks for confirming the approach and yes I have a Processor node to
track the flow of records through the job and it shows that the job is
processing the records. I am trying to simulate a slow processing job so
that I can verify if the metricChange callback is called for
records-lag-max metric. But I haven’t been successful yet.

Thanks,
Rohit

On 8/30/16, 6:01 PM, "Guozhang Wang"  wrote:

>Hello Rohit,
>
>As you are already aware, monitoring kafka streams is no difference than
>monitoring kafka producers / consumers. So you can just monitor on its
>embedded consumer's "records-lag-max" metric, which gets recorded whenever
>the consumer received the fetch response.
>
>As for your application, your way of passing the class name of your
>implemented MetricsReporter through the StreamsConfig is correct. Have you
>seen records being processing by your streams application, meaning that
>there are indeed some fetched records from the fetch response?
>
>Guozhang
>
>
>On Mon, Aug 29, 2016 at 10:48 AM, Rohit Valsakumar 
>wrote:
>
>> Hi all,
>>
>> Any opinions about monitoring the records-lag-max for a kafka streams
>>job?
>>
>> Thanks,
>> Rohit
>>
>> On 8/26/16, 2:53 PM, "Rohit Valsakumar"  wrote:
>>
>> >Hi all,
>> >
>> >I want to monitor the max lag of a kafka streams job which is consuming
>> >from three topics and to do that I have implemented the MetricsReporter
>> >interface which I pass through the Streams Configuration to the
>> >KafkaStreams object. In the implementation¹s metricChange()  method I
>> >have logging for the metric with group consumer-fetch-manager-metrics
>>and
>> >name records-lag-max. However, the implementation never receives a
>> >metricChange notification for 'records-lag-max¹.
>> >
>> >I would like to know if what I am doing makes sense, is the correct
>> >approach and whether the implementation should be getting notifications
>> >for 'records-lag-max¹ periodically or only when the max lag is changing
>> >i.e. Increasing/decreasing.
>> >
>> >Thanks,
>> >Rohit
>> >
>> >
>> >
>> >
>> >This email and any attachments may contain confidential and privileged
>> >material for the sole use of the intended recipient. Any review,
>>copying,
>> >or distribution of this email (or any attachments) by others is
>> >prohibited. If you are not the intended recipient, please contact the
>> >sender immediately and permanently delete this email and any
>>attachments.
>> >No employee or agent of TiVo Inc. is authorized to conclude any binding
>> >agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> >Inc. may only be made by a signed written agreement.
>>
>>
>> 
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review,
>>copying,
>> or distribution of this email (or any attachments) by others is
>>prohibited.
>> If you are not the intended recipient, please contact the sender
>> immediately and permanently delete this email and any attachments. No
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>
>
>
>
>--
>-- Guozhang




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Kafka consumers unable to process message

2016-08-31 Thread Jason Gustafson
Hi Achintya,

We have a JIRA for this problem: https://issues.
apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
exception in this case or do you just want to keep it from blocking
indefinitely? If the latter, you could escape the poll from another thread
using wakeup().

Thanks,
Jason

On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Hi there,
>
> Kafka consumer gets stuck at consumer.poll() method if my current
> datacenter is down and replicated messages are in remote datacenter.
>
> How to solve that issue?
>
> Thanks
> Achintya
>


OffsetsLoadInProgressException and Consumer Rewind

2016-08-31 Thread Thomas Norden
Hi,

I am running kafka 0.8.2.1 and we have some consumers that is using kafka
to store its offsets (offsets.storage=kafka).  About once a week the
consumer will see the exception OffsetsLoadInProgressException and when the
exception goes away the consumer is reset the the earliest offset.  It does
not affect all of the consumers at the same time.

What causes OffsetsLoadInProgressException to happen?

Did the offset manager change?

Would the ConsumerConnector start reading from the earliest offset if
OffsetsLoadInProgressException is thrown?

Thanks,
Tom


Kafka consumers unable to process message

2016-08-31 Thread Ghosh, Achintya (Contractor)
Hi there,

Kafka consumer gets stuck at consumer.poll() method if my current datacenter is 
down and replicated messages are in remote datacenter.

How to solve that issue?

Thanks
Achintya


Re: How distributed countByKey works in KStream ?

2016-08-31 Thread Guozhang Wang
Hello Tommy,

Which version of Kafka are you using?

Guozhang

On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q  wrote:

> I cleaned up all the zookeeper & kafka states and run the WordCountDemo
> again, the results in wc-out is still wrong:
>
> a 1
> > b 1
> > a 1
> > b 1
> > c 1
>
>
>
> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll 
> wrote:
>
> > Can you double-check whether the results in wc-out are not rather:
> >
> > a 1
> > b 1
> > a 2
> > b 2
> > c 1
> >
> > ?
> >
> > On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q  wrote:
> >
> > > Tried the word count example as discussed, the result in wc-out is
> wrong:
> > >
> > > a 1
> > > > b 1
> > > > a 1
> > > > b 1
> > > > c 1
> > >
> > >
> > > The expected result should be:
> > >
> > > a 2
> > > > b 2
> > > > c 1
> > >
> > >
> > > Kafka version is 0.10.0.1
> > >
> > >
> > > On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > > > No. It does not support hidden topics.
> > > >
> > > > The only explanation might be, that there is no repartitioning step.
> > But
> > > > than the question would be, if there is a bug in Kafka Streams,
> because
> > > > between map() and countByKey() repartitioning is required.
> > > >
> > > > Can you verify that the result is correct?
> > > >
> > > > -Matthias
> > > >
> > > > On 08/30/2016 03:24 PM, Tommy Q wrote:
> > > > > Does Kafka support hidden topics ? (Since all the topics infos are
> > > stored
> > > > > in ZK, this probably not the case )
> > > > >
> > > > > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >> Hi Tommy,
> > > > >>
> > > > >> yes, you do understand Kafka Streams correctly. And yes, for
> > > shuffling,
> > > > >> na internal topic will be created under the hood. It should be
> named
> > > > >> "-something-repartition". I am not sure, why it
> is
> > > not
> > > > >> listed via bin/kafka-topics.sh
> > > > >>
> > > > >> The internal topic "-counts-changelog" you see is
> > > > >> created to back the state of countByKey() operator.
> > > > >>
> > > > >> See
> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/
> > > > >> Kafka+Streams%3A+Internal+Data+Management
> > > > >>
> > > > >> and
> > > > >>
> > > > >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > > > >> streams-resetting-a-streams-application
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 08/30/2016 06:55 AM, Tommy Q wrote:
> > > > >>> Michael, Thanks for your help.
> > > > >>>
> > > > >>> Take the word count example, I am trying to walk through the code
> > > based
> > > > >> on
> > > > >>> your explanation:
> > > > >>>
> > > > >>> val textLines: KStream[String, String] =
> > > > >> builder.stream("input-topic")
> > > > >>> val wordCounts: KStream[String, JLong] = textLines
> > > > >>>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.
> > asJava)
> > > > >>>   .map((key: String, word: String) => new KeyValue(word,
> word))
> > > > >>>   .countByKey("counts")
> > > > >>>   .toStream
> > > > >>>
> > > > >>> wordCounts.to(stringSerde, longSerde, "wc-out")
> > > > >>>
> > > > >>> Suppose the input-topic has two partitions and each partition
> has a
> > > > >> string
> > > > >>> record produced into:
> > > > >>>
> > > > >>> input-topic_0 : "a b"
> > > >  input-topic_1 : "a b c"
> > > > >>>
> > > > >>>
> > > > >>> Suppose we started two instance of the stream topology ( task_0
> and
> > > > >>> task_1). So after flatMapValues & map executed, they should have
> > the
> > > > >>> following task state:
> > > > >>>
> > > > >>> task_0 :  [ (a, "a"), (b, "b") ]
> > > >  task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> > > > >>>
> > > > >>>
> > > > >>> Before the execution of  countByKey, the kafka-stream framework
> > > should
> > > > >>> insert a invisible shuffle phase internally:
> > > > >>>
> > > > >>> shuffled across the network :
> > > > 
> > > > >>>
> > > > >>>
> > > >  _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> > > >  _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> > > > >>>
> > > > >>>
> > > > >>> countByKey (reduce) :
> > > > >>>
> > > > >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> > > > >>>
> > > > >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> > > > >>>
> > > > >>>
> > > > >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> > > > >>> "wc-out")`, we get the word count output in wc-out topic:
> > > > >>>
> > > > >>> task_0 (wc-out_0) :  [ (a, 2) ]
> > > > >>>
> > > > >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> According the steps list above, do I understand the internals of
> > > > kstream
> > > > >>> word count correctly ?
> > > > >>> Another question is does the shuffle across the network work by
> > > > creating
> > > > >>> intermediate topics ? If so, why can't I find the intermediate
> > topics
> > > > >> using
> > > > >>> `bin/k

Re: Auto offset commit failed

2016-08-31 Thread Guozhang Wang
Hello Yuanjia,

1) You can take a look at this wiki for details in consumer (re)-join group.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal

In short summary, the coordinator will detect if a consumer has failed
based on the heartbeat protocol, and if yes tells other consumers to
re-join the group in order to rebalance.

2) That is possible, if the consumer who is owning the partition does not
commit the offsets before revoking the assigned partition, for example if
it was kicked out of the group by the coordinator (in the above scenario),
then the newly assigned consumer of the partition will resume from the
previously committed offsets, and hence some of the consumed records from
the previous owner maybe re-consumed again.

Guozhang


On Tue, Aug 30, 2016 at 1:26 AM, yuanjia8...@163.com 
wrote:

> Hi All,
> My kafka cluster is kafka0.10.0.
> I have found two reasons for Auto offset commit failed in the log
> file. One is Commit cannot be completed since the group has already
> rebalanced and assigned the partitions to another member, and another is
> Commit offsets failed with retriable exception. You should retry committing
> offsets.
> My questions are:
>1. When the consumer rejoin group?
>2. Could it be consuming the message repeatly during rebalancing?
>
> Thanks.
>
>
>
> Yuanjia Li
>



-- 
-- Guozhang


Re: handling generics in Kafka Scala

2016-08-31 Thread Dean Wampler
Okay, the type parameters with the variances need to be after the method
name, like this:

private def createNewConsumer[K <: java.util.ArrayList[Byte],V <:
java.util.ArrayList[Byte]](): KafkaConsumer[K,V] = {...}

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Lightbend 
@deanwampler 
http://polyglotprogramming.com

On Wed, Aug 31, 2016 at 8:08 AM, Martin Gainty  wrote:

> supposedly gmail wont strip \n so we'll try again with non-html mail
> composer
>
>  noob with Scala so Im looking for an experienced answer
>
>  ConsumerGroupCommand.scala
>
>  //private def createNewConsumer(): KafkaConsumer[String, String] = {
>  //private def createNewConsumer(): KafkaConsumer[K extends
> // java.util.ArrayList[Byte],V extends java.util.ArrayList[Byte]] = {
>
>  private def createNewConsumer(): KafkaConsumer[K <:
>  java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = {
>
>val properties = new java.util.Properties()
>
>val deserializer = (new StringDeserializer).getClass.getName
>
>val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
>
>  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
>
>  properties.put(ConsumerConfig.GROUP_ID_CONFIG,opts.options.
> valueOf(opts.groupOpt))
>
>properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
>
>properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")
>
>properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>  deserializer)
>
>   properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>  deserializer)
>
>if (opts.options.has(opts.commandConfigOpt))
>  properties.putAll(Utils.loadProps(opts.options.
> valueOf(opts.commandConfigOpt)))
>
>new KafkaConsumer(properties).asInstanceOf[KafkaConsumer[K,V]]
>  }
>
> scala-compiler displays:
>  [ERROR] \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> ConsumerGroupCommand.scala:309:
>  error: ']' expected but '<:' found.
>
> [ERROR] private def createNewConsumer(): KafkaConsumer[? <:
>  java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {
>  [ERROR]  ^
>
>  [ERROR]
>  \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> ConsumerGroupCommand.scala:309:
>  error: '=' expected but ',' found.
>
>  [ERROR] private def createNewConsumer(): KafkaConsumer[? <:
>  java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {
>  [ERROR]
>^
>
>  [ERROR]
>  \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> ConsumerGroupCommand.scala:322:
>  error: illegal start of simple expression
>
>  i need 2 datatype parameter types extending java.util.ArrayList
>  in regular java this would be:
>
> public KafkaConsumer,V extends
>  java.util.ArrayList>  createNewConsumer() { }
>
>  this works in java but
>  how do I setup a function or class declaration K, V whose parameter
>  datatype extends java.util.ArrayList ?
>
>  Martin
>
> >>
> >> From: mgai...@hotmail.com
> >> To: mathieu.fenn...@replicon.com; users@kafka.apache.org
> >> Subject: RE: handling generics in Kafka Scala
> >> Date: Tue, 30 Aug 2016 23:00:29 -0400
> >>
> >>
> >>
> >>
> >> noob with Scala so Im looking for an experienced answer
> >> ConsumerGroupCommand.scala
> >> //private def createNewConsumer(): KafkaConsumer[String, String] =
> >> {//private def createNewConsumer(): KafkaConsumer[K extends
> >> java.util.ArrayList[Byte],V extends java.util.ArrayList[Byte]] = {
> >> private def createNewConsumer(): KafkaConsumer[K <:
> >> java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = {  val
> >> properties = new java.util.Properties()  val deserializer = (new
> >> StringDeserializer).getClass.getName  val brokerUrl =
> >> opts.options.valueOf(opts.bootstrapServerOpt)
> >> properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
> >> properties.put(ConsumerConfig.GROUP_ID_CONFIG,
> >> opts.options.valueOf(opts.groupOpt))
> >> properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
> >> properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")
> >> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> >> deserializer)
> >>properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> >> deserializer)  if (opts.options.has(opts.commandConfigOpt))
> >> properties.putAll(Utils.loadProps(opts.options.
> valueOf(opts.commandConfigOpt)))
> >>  new KafkaConsumer(properties).asInstanceOf[KafkaConsumer[K,V]]
> }
> >> scala-compiler displays:
> >> [ERROR]
> >> \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> ConsumerGroupCommand.scala:309:
> >> error: ']' expected but '<:' found.[ERROR] private def
> >> createNewConsumer(): KafkaConsumer[? <: java.util.ArrayList[Byte],? <:
> >> java.util.ArrayList[Byte]] = {[ERROR]
> >>   ^[ERROR]
> >> \kafka\kafka-trunk

Kafka Source Connector tasks run only in same node

2016-08-31 Thread dhanuka ranasinghe
Hi All,

I'm new to Kafka and I created TCP Kafka Source connector [1] and deployed
in Kafka Connect Cluster in distributed mode. There is two node/instance of
connectors in Kafka Connector cluster.

But out of them only in one node run *tasks* even though it shows
connector in both nodes active and running status.

In here I have used RxNetty  server, so what happened is netty server up in
one node/host but tasks are running in other node/host

Could you please let me know the issue with this? Thanks in advance.

node==host


[1] https://github.com/dhanuka84/kafka-connect-tcp

cheers,
Dhanuka

-- 
Nothing Impossible,Creativity is more important than knowledge.


Re: handling generics in Kafka Scala

2016-08-31 Thread Martin Gainty
supposedly gmail wont strip \n so we'll try again with non-html mail composer

 noob with Scala so Im looking for an experienced answer

 ConsumerGroupCommand.scala

 //private def createNewConsumer(): KafkaConsumer[String, String] = {
 //private def createNewConsumer(): KafkaConsumer[K extends
// java.util.ArrayList[Byte],V extends java.util.ArrayList[Byte]] = {

 private def createNewConsumer(): KafkaConsumer[K <:
 java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = {

   val properties = new java.util.Properties()

   val deserializer = (new StringDeserializer).getClass.getName

   val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)

 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)

 
properties.put(ConsumerConfig.GROUP_ID_CONFIG,opts.options.valueOf(opts.groupOpt))

   properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")

   properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 deserializer)

  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 deserializer)

   if (opts.options.has(opts.commandConfigOpt))
 properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))

   new KafkaConsumer(properties).asInstanceOf[KafkaConsumer[K,V]]
 }

scala-compiler displays:
 [ERROR] 
\kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:309:
 error: ']' expected but '<:' found.

[ERROR] private def createNewConsumer(): KafkaConsumer[? <:
 java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {
 [ERROR]  ^

 [ERROR]
 
\kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:309:
 error: '=' expected but ',' found.

 [ERROR] private def createNewConsumer(): KafkaConsumer[? <:
 java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {
 [ERROR]
   ^

 [ERROR]
 
\kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:322:
 error: illegal start of simple expression

 i need 2 datatype parameter types extending java.util.ArrayList
 in regular java this would be:

public KafkaConsumer,V extends
 java.util.ArrayList>  createNewConsumer() { }

 this works in java but
 how do I setup a function or class declaration K, V whose parameter
 datatype extends java.util.ArrayList ?

 Martin

>>
>> From: mgai...@hotmail.com
>> To: mathieu.fenn...@replicon.com; users@kafka.apache.org
>> Subject: RE: handling generics in Kafka Scala
>> Date: Tue, 30 Aug 2016 23:00:29 -0400
>>
>>
>>
>>
>> noob with Scala so Im looking for an experienced answer
>> ConsumerGroupCommand.scala
>> //private def createNewConsumer(): KafkaConsumer[String, String] =
>> {//private def createNewConsumer(): KafkaConsumer[K extends
>> java.util.ArrayList[Byte],V extends java.util.ArrayList[Byte]] = {
>> private def createNewConsumer(): KafkaConsumer[K <:
>> java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = {  val
>> properties = new java.util.Properties()  val deserializer = (new
>> StringDeserializer).getClass.getName  val brokerUrl =
>> opts.options.valueOf(opts.bootstrapServerOpt)
>> properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
>> properties.put(ConsumerConfig.GROUP_ID_CONFIG,
>> opts.options.valueOf(opts.groupOpt))
>> properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
>> properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")
>> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> deserializer)
>>properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> deserializer)  if (opts.options.has(opts.commandConfigOpt))
>> properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))
>>  new KafkaConsumer(properties).asInstanceOf[KafkaConsumer[K,V]]}
>> scala-compiler displays:
>> [ERROR]
>> \kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:309:
>> error: ']' expected but '<:' found.[ERROR] private def
>> createNewConsumer(): KafkaConsumer[? <: java.util.ArrayList[Byte],? <:
>> java.util.ArrayList[Byte]] = {[ERROR]
>>   ^[ERROR]
>> \kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:309:
>> error: '=' expected but ',' found.[ERROR] private def
>> createNewConsumer(): KafkaConsumer[? <: java.util.ArrayList[Byte],? <:
>> java.util.ArrayList[Byte]] = {[ERROR]
>>   ^[ERROR]
>> \kafka\kafka-trunk\core\src\main\scala\kafka\admin\ConsumerGroupCommand.scala:322:
>> error: illegal start of simple expression
>> i want 2 datatype parameter types extending java.util.ArrayList in
>> regular java this would be:
>> public KafkaConsumer,V extends
>> java.util.ArrayList>  createNewConsumer() {}
>> how do I setup a function or class declaration K, V whose parameter
>> datatype
>> extends java.util.Arr

Re: handling generics in Kafka Scala

2016-08-31 Thread Dean Wampler
Could you format that so it's readable?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Lightbend 
@deanwampler 
http://polyglotprogramming.com

On Tue, Aug 30, 2016 at 10:00 PM, Martin Gainty  wrote:

> noob with Scala so Im looking for an experienced answer
> ConsumerGroupCommand.scala
> //private def createNewConsumer(): KafkaConsumer[String, String] =
> {//private def createNewConsumer(): KafkaConsumer[K extends
> java.util.ArrayList[Byte],V extends java.util.ArrayList[Byte]] = {
> private def createNewConsumer(): KafkaConsumer[K <:
> java.util.ArrayList[Byte],V <: java.util.ArrayList[Byte]] = {  val
> properties = new java.util.Properties()  val deserializer = (new
> StringDeserializer).getClass.getName  val brokerUrl =
> opts.options.valueOf(opts.bootstrapServerOpt)
> properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
> properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
> opts.options.valueOf(opts.groupOpt))
> properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3")
> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> deserializer)  
> properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> deserializer)  if (opts.options.has(opts.commandConfigOpt))
> properties.putAll(Utils.loadProps(opts.options.
> valueOf(opts.commandConfigOpt)))  new KafkaConsumer(properties).
> asInstanceOf[KafkaConsumer[K,V]]}
> scala-compiler displays:
> [ERROR] \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> ConsumerGroupCommand.scala:309: error: ']' expected but '<:'
> found.[ERROR] private def createNewConsumer(): KafkaConsumer[? <:
> java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {[ERROR]
>   ^[ERROR]
> \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> ConsumerGroupCommand.scala:309: error: '=' expected but ','
> found.[ERROR] private def createNewConsumer(): KafkaConsumer[? <:
> java.util.ArrayList[Byte],? <: java.util.ArrayList[Byte]] = {[ERROR]
>
> ^[ERROR] \kafka\kafka-trunk\core\src\main\scala\kafka\admin\
> ConsumerGroupCommand.scala:322: error: illegal start of simple expression
> i want 2 datatype parameter types extending java.util.ArrayList in
> regular java this would be:
> public KafkaConsumer,V extends
> java.util.ArrayList>  createNewConsumer() {}
> how do I setup a function or class declaration K, V whose parameter
> datatype extends java.util.ArrayList ?
> Martin
> __
>
>
>
> > From: mathieu.fenn...@replicon.com
> > Date: Wed, 17 Aug 2016 18:06:38 -0600
> > Subject: Re: DLL Hell
> > To: mgai...@hotmail.com
> >
> > Hi Martin,
> >
> > I'm sorry, this is way outside my Kafka knowledge.  I'm just a new
> > Kafka user who wanted to help with your Windows questions because I
> > had just faced the same hurdle. :-)  Wish I could help, but I wouldn't
> > know where to start with this.
> >
> > Mathieu
> >
> >
> > On Wed, Aug 17, 2016 at 6:00 PM, Martin Gainty 
> wrote:
> > > Hi Matthieu
> > > Many Thanks for attaching the binary
> > >
> > > running scala->java generator plugin I see:
> > >
> > > [ERROR]
> > > C:\Maven-plugin\kafka\kafka-trunk\core\src\main\scala\
> kafka\admin\AdminUtils.scala:639:
> > > error: type PartitionMetadata is not a member of object
> > > org.apache.kafka.common.requests.MetadataResponse
> > >
> > > yet when I look at org.apache.kafka.common.requests.MetadataResponse.java
> I
> > > see inner class
> > >
> > > public static class PartitionMetadata {
> > >
> > > inner static java classes are not visible to the converter for some
> reason
> > > the workaround seems to be birth inner static classes (e.g.
> > > PartitionMetadata)
> > > treating inner class as standalone works
> > >
> > > Advice?
> > > Martin
> > > __
> > >
> > >
> > >
> > >
> > > 
> > > From: mathieu.fenn...@replicon.com
> > > Date: Tue, 16 Aug 2016 08:04:52 -0600
> > > Subject: Re: DLL Hell
> > > To: mgai...@hotmail.com
> > >
> > >
> > > Hey Martin,
> > >
> > > Attached is the native .dll that I was able to build for rocksdb.  If
> you
> > > unzip this, and include the contained .dll into your
> rocksdbjni-4.8.0.jar at
> > > the root, it should be possible to use Kafka Streams in Windows.  But
> this
> > > is just a minimal debug build; wouldn't be appropriate for production
> use.
> > > Might save you some time if you're just trying to get a dev environment
> > > working though.
> > >
> > > Mathieu
> > >
> > >
> > > On Tue, Aug 16, 2016 at 7:40 AM, Martin Gainty 
> wrote:
> > >
> > >
> > >
> > >
> > >> From: mathieu.fenn...@replicon.com
> > >> Date: Tue, 16 Aug 2016 06:57:16 -0600
> > >> Subject: Re: DLL Hell
> > >> To: users@kafka.apache.org
> > >>
> > >> Hey Martin

Re: How distributed countByKey works in KStream ?

2016-08-31 Thread Tommy Q
I cleaned up all the zookeeper & kafka states and run the WordCountDemo
again, the results in wc-out is still wrong:

a 1
> b 1
> a 1
> b 1
> c 1



On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll  wrote:

> Can you double-check whether the results in wc-out are not rather:
>
> a 1
> b 1
> a 2
> b 2
> c 1
>
> ?
>
> On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q  wrote:
>
> > Tried the word count example as discussed, the result in wc-out is wrong:
> >
> > a 1
> > > b 1
> > > a 1
> > > b 1
> > > c 1
> >
> >
> > The expected result should be:
> >
> > a 2
> > > b 2
> > > c 1
> >
> >
> > Kafka version is 0.10.0.1
> >
> >
> > On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax  >
> > wrote:
> >
> > > No. It does not support hidden topics.
> > >
> > > The only explanation might be, that there is no repartitioning step.
> But
> > > than the question would be, if there is a bug in Kafka Streams, because
> > > between map() and countByKey() repartitioning is required.
> > >
> > > Can you verify that the result is correct?
> > >
> > > -Matthias
> > >
> > > On 08/30/2016 03:24 PM, Tommy Q wrote:
> > > > Does Kafka support hidden topics ? (Since all the topics infos are
> > stored
> > > > in ZK, this probably not the case )
> > > >
> > > > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > >> Hi Tommy,
> > > >>
> > > >> yes, you do understand Kafka Streams correctly. And yes, for
> > shuffling,
> > > >> na internal topic will be created under the hood. It should be named
> > > >> "-something-repartition". I am not sure, why it is
> > not
> > > >> listed via bin/kafka-topics.sh
> > > >>
> > > >> The internal topic "-counts-changelog" you see is
> > > >> created to back the state of countByKey() operator.
> > > >>
> > > >> See
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/
> > > >> Kafka+Streams%3A+Internal+Data+Management
> > > >>
> > > >> and
> > > >>
> > > >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > > >> streams-resetting-a-streams-application
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 08/30/2016 06:55 AM, Tommy Q wrote:
> > > >>> Michael, Thanks for your help.
> > > >>>
> > > >>> Take the word count example, I am trying to walk through the code
> > based
> > > >> on
> > > >>> your explanation:
> > > >>>
> > > >>> val textLines: KStream[String, String] =
> > > >> builder.stream("input-topic")
> > > >>> val wordCounts: KStream[String, JLong] = textLines
> > > >>>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.
> asJava)
> > > >>>   .map((key: String, word: String) => new KeyValue(word, word))
> > > >>>   .countByKey("counts")
> > > >>>   .toStream
> > > >>>
> > > >>> wordCounts.to(stringSerde, longSerde, "wc-out")
> > > >>>
> > > >>> Suppose the input-topic has two partitions and each partition has a
> > > >> string
> > > >>> record produced into:
> > > >>>
> > > >>> input-topic_0 : "a b"
> > >  input-topic_1 : "a b c"
> > > >>>
> > > >>>
> > > >>> Suppose we started two instance of the stream topology ( task_0 and
> > > >>> task_1). So after flatMapValues & map executed, they should have
> the
> > > >>> following task state:
> > > >>>
> > > >>> task_0 :  [ (a, "a"), (b, "b") ]
> > >  task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> > > >>>
> > > >>>
> > > >>> Before the execution of  countByKey, the kafka-stream framework
> > should
> > > >>> insert a invisible shuffle phase internally:
> > > >>>
> > > >>> shuffled across the network :
> > > 
> > > >>>
> > > >>>
> > >  _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> > >  _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> > > >>>
> > > >>>
> > > >>> countByKey (reduce) :
> > > >>>
> > > >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> > > >>>
> > > >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> > > >>>
> > > >>>
> > > >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> > > >>> "wc-out")`, we get the word count output in wc-out topic:
> > > >>>
> > > >>> task_0 (wc-out_0) :  [ (a, 2) ]
> > > >>>
> > > >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> > > >>>
> > > >>>
> > > >>>
> > > >>> According the steps list above, do I understand the internals of
> > > kstream
> > > >>> word count correctly ?
> > > >>> Another question is does the shuffle across the network work by
> > > creating
> > > >>> intermediate topics ? If so, why can't I find the intermediate
> topics
> > > >> using
> > > >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can
> only
> > > see
> > > >>> the counts-changelog got created by the kstream framework.
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <
> mich...@confluent.io>
> > > >> wrote:
> > > >>>
> > >  In Kafka Streams, data is partitioned according to the keys of the
> > >  key-value records, and operations such as countByKey operate on
> > these
> > >  stream partitions.  When r

Kafka network metrics with Ambari Metrics System

2016-08-31 Thread Jahn Roux
Hi users,

 

We recently moved from Ambari 1.7 (kafka 0.10) to Ambari 2.2 (kafka 0.10).
We have also moved from a Ganglia metrics server to Ambari Metrics Collector
+ grafana.

 

With Ganglia we tracked a crucial metric called
kafka.network.RequestMetrics.LocalTimeMs.request.Produce. However since
moving to AMS it does not seem like there are any kafka.network.* metrics
being reported. I have checked that these metrics are not being excluded in
the Kafka configs. Anyone have any idea how I could track down these metrics
with AMS? Thanks!

 

Kind regards,

 

Jahn Roux 

 



Re: How distributed countByKey works in KStream ?

2016-08-31 Thread Michael Noll
Can you double-check whether the results in wc-out are not rather:

a 1
b 1
a 2
b 2
c 1

?

On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q  wrote:

> Tried the word count example as discussed, the result in wc-out is wrong:
>
> a 1
> > b 1
> > a 1
> > b 1
> > c 1
>
>
> The expected result should be:
>
> a 2
> > b 2
> > c 1
>
>
> Kafka version is 0.10.0.1
>
>
> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax 
> wrote:
>
> > No. It does not support hidden topics.
> >
> > The only explanation might be, that there is no repartitioning step. But
> > than the question would be, if there is a bug in Kafka Streams, because
> > between map() and countByKey() repartitioning is required.
> >
> > Can you verify that the result is correct?
> >
> > -Matthias
> >
> > On 08/30/2016 03:24 PM, Tommy Q wrote:
> > > Does Kafka support hidden topics ? (Since all the topics infos are
> stored
> > > in ZK, this probably not the case )
> > >
> > > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > >> Hi Tommy,
> > >>
> > >> yes, you do understand Kafka Streams correctly. And yes, for
> shuffling,
> > >> na internal topic will be created under the hood. It should be named
> > >> "-something-repartition". I am not sure, why it is
> not
> > >> listed via bin/kafka-topics.sh
> > >>
> > >> The internal topic "-counts-changelog" you see is
> > >> created to back the state of countByKey() operator.
> > >>
> > >> See
> > >> https://cwiki.apache.org/confluence/display/KAFKA/
> > >> Kafka+Streams%3A+Internal+Data+Management
> > >>
> > >> and
> > >>
> > >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > >> streams-resetting-a-streams-application
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 08/30/2016 06:55 AM, Tommy Q wrote:
> > >>> Michael, Thanks for your help.
> > >>>
> > >>> Take the word count example, I am trying to walk through the code
> based
> > >> on
> > >>> your explanation:
> > >>>
> > >>> val textLines: KStream[String, String] =
> > >> builder.stream("input-topic")
> > >>> val wordCounts: KStream[String, JLong] = textLines
> > >>>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> > >>>   .map((key: String, word: String) => new KeyValue(word, word))
> > >>>   .countByKey("counts")
> > >>>   .toStream
> > >>>
> > >>> wordCounts.to(stringSerde, longSerde, "wc-out")
> > >>>
> > >>> Suppose the input-topic has two partitions and each partition has a
> > >> string
> > >>> record produced into:
> > >>>
> > >>> input-topic_0 : "a b"
> >  input-topic_1 : "a b c"
> > >>>
> > >>>
> > >>> Suppose we started two instance of the stream topology ( task_0 and
> > >>> task_1). So after flatMapValues & map executed, they should have the
> > >>> following task state:
> > >>>
> > >>> task_0 :  [ (a, "a"), (b, "b") ]
> >  task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> > >>>
> > >>>
> > >>> Before the execution of  countByKey, the kafka-stream framework
> should
> > >>> insert a invisible shuffle phase internally:
> > >>>
> > >>> shuffled across the network :
> > 
> > >>>
> > >>>
> >  _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> >  _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> > >>>
> > >>>
> > >>> countByKey (reduce) :
> > >>>
> > >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> > >>>
> > >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> > >>>
> > >>>
> > >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> > >>> "wc-out")`, we get the word count output in wc-out topic:
> > >>>
> > >>> task_0 (wc-out_0) :  [ (a, 2) ]
> > >>>
> > >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
> > >>>
> > >>>
> > >>>
> > >>> According the steps list above, do I understand the internals of
> > kstream
> > >>> word count correctly ?
> > >>> Another question is does the shuffle across the network work by
> > creating
> > >>> intermediate topics ? If so, why can't I find the intermediate topics
> > >> using
> > >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only
> > see
> > >>> the counts-changelog got created by the kstream framework.
> > >>>
> > >>>
> > >>>
> > >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll 
> > >> wrote:
> > >>>
> >  In Kafka Streams, data is partitioned according to the keys of the
> >  key-value records, and operations such as countByKey operate on
> these
> >  stream partitions.  When reading data from Kafka, these stream
> > >> partitions
> >  map to the partitions of the Kafka input topic(s), but these may
> > change
> >  once you add processing operations.
> > 
> >  To your question:  The first step, if the data isn't already keyed
> as
> >  needed, is to select the key you want to count by, which results in
> 1+
> >  output stream partitions.  Here, data may get shuffled across the
> > >> network
> >  (but if won't if there's no need to, e.g. when the data is already
> > >> keyed as
> >  needed).  

Re: How distributed countByKey works in KStream ?

2016-08-31 Thread Tommy Q
Hi @Matthias & @Michael I pushed the WordCountDemo code to GitHub
https://github.com/deeplambda/kstream-debug, can you help to debug the demo
?

Thanks,
Tommy

On Wed, Aug 31, 2016 at 11:47 AM, Tommy Q  wrote:

> Tried the word count example as discussed, the result in wc-out is wrong:
>
> a 1
>> b 1
>> a 1
>> b 1
>> c 1
>
>
> The expected result should be:
>
> a 2
>> b 2
>> c 1
>
>
> Kafka version is 0.10.0.1
>
>
> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax 
> wrote:
>
>> No. It does not support hidden topics.
>>
>> The only explanation might be, that there is no repartitioning step. But
>> than the question would be, if there is a bug in Kafka Streams, because
>> between map() and countByKey() repartitioning is required.
>>
>> Can you verify that the result is correct?
>>
>> -Matthias
>>
>> On 08/30/2016 03:24 PM, Tommy Q wrote:
>> > Does Kafka support hidden topics ? (Since all the topics infos are
>> stored
>> > in ZK, this probably not the case )
>> >
>> > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax > >
>> > wrote:
>> >
>> >> Hi Tommy,
>> >>
>> >> yes, you do understand Kafka Streams correctly. And yes, for shuffling,
>> >> na internal topic will be created under the hood. It should be named
>> >> "-something-repartition". I am not sure, why it is not
>> >> listed via bin/kafka-topics.sh
>> >>
>> >> The internal topic "-counts-changelog" you see is
>> >> created to back the state of countByKey() operator.
>> >>
>> >> See
>> >> https://cwiki.apache.org/confluence/display/KAFKA/
>> >> Kafka+Streams%3A+Internal+Data+Management
>> >>
>> >> and
>> >>
>> >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
>> >> streams-resetting-a-streams-application
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 08/30/2016 06:55 AM, Tommy Q wrote:
>> >>> Michael, Thanks for your help.
>> >>>
>> >>> Take the word count example, I am trying to walk through the code
>> based
>> >> on
>> >>> your explanation:
>> >>>
>> >>> val textLines: KStream[String, String] =
>> >> builder.stream("input-topic")
>> >>> val wordCounts: KStream[String, JLong] = textLines
>> >>>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
>> >>>   .map((key: String, word: String) => new KeyValue(word, word))
>> >>>   .countByKey("counts")
>> >>>   .toStream
>> >>>
>> >>> wordCounts.to(stringSerde, longSerde, "wc-out")
>> >>>
>> >>> Suppose the input-topic has two partitions and each partition has a
>> >> string
>> >>> record produced into:
>> >>>
>> >>> input-topic_0 : "a b"
>>  input-topic_1 : "a b c"
>> >>>
>> >>>
>> >>> Suppose we started two instance of the stream topology ( task_0 and
>> >>> task_1). So after flatMapValues & map executed, they should have the
>> >>> following task state:
>> >>>
>> >>> task_0 :  [ (a, "a"), (b, "b") ]
>>  task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
>> >>>
>> >>>
>> >>> Before the execution of  countByKey, the kafka-stream framework should
>> >>> insert a invisible shuffle phase internally:
>> >>>
>> >>> shuffled across the network :
>> 
>> >>>
>> >>>
>>  _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
>>  _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
>> >>>
>> >>>
>> >>> countByKey (reduce) :
>> >>>
>> >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
>> >>>
>> >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
>> >>>
>> >>>
>> >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
>> >>> "wc-out")`, we get the word count output in wc-out topic:
>> >>>
>> >>> task_0 (wc-out_0) :  [ (a, 2) ]
>> >>>
>> >>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
>> >>>
>> >>>
>> >>>
>> >>> According the steps list above, do I understand the internals of
>> kstream
>> >>> word count correctly ?
>> >>> Another question is does the shuffle across the network work by
>> creating
>> >>> intermediate topics ? If so, why can't I find the intermediate topics
>> >> using
>> >>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only
>> see
>> >>> the counts-changelog got created by the kstream framework.
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll 
>> >> wrote:
>> >>>
>>  In Kafka Streams, data is partitioned according to the keys of the
>>  key-value records, and operations such as countByKey operate on these
>>  stream partitions.  When reading data from Kafka, these stream
>> >> partitions
>>  map to the partitions of the Kafka input topic(s), but these may
>> change
>>  once you add processing operations.
>> 
>>  To your question:  The first step, if the data isn't already keyed as
>>  needed, is to select the key you want to count by, which results in
>> 1+
>>  output stream partitions.  Here, data may get shuffled across the
>> >> network
>>  (but if won't if there's no need to, e.g. when the data is already
>> >> keyed as
>>  needed).  Then the count operation is performed for each stream
>> >> partition,
>>  which is similar to