RE: New consumer API waits indefinitely

2016-04-13 Thread Lohith Samaga M
Dear All,
After a system restart, the new consumer is working as expected.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga




-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com] 
Sent: Tuesday, April 12, 2016 17.00
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Dear All,
I installed Kafka on a Linux VM.
Here too:
1. The producer is able to store messages in the topic (sent from 
Windows host).
2. The consumer is unable to read it either from Windows host or from 
kafka-console-consumer on the Linux VM console.

In the logs, I see:
[2016-04-12 16:51:00,672] INFO [GroupCoordinator 0]: Stabilized group 
console-consumer-39913 generation 1 (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:00,676] INFO [GroupCoordinator 0]: Assignment received from 
leader for group console-consumer-39913 for generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,638] INFO [GroupCoordinator 0]: Preparing to restabilize 
group console-consumer-39913 with old generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,640] INFO [GroupCoordinator 0]: Group 
console-consumer-39913 generation 1 is dead and removed 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:53:08,489] INFO [Group Metadata Manager on Broker 0]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

When I run my Java code, I still get the exception - 
org.apache.kafka.clients.consumer.internals.SendFailedException


So, is it advisable to use the old consumer on Kafka 0.9.0.1?

Please help.

Thanks in advance.


Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com]
Sent: Tuesday, April 05, 2016 13.36
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael, Niko,
After cleaning up the zookeeper and kafka logs, I do not get the below 
server exception anymore. I think Kafka did not like me opening the .log file 
in notepad.

The only exception that I now get is 
org.apache.kafka.clients.consumer.internals.SendFailedException in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler.
After that, the consumer goes into a loop.

Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com]
Sent: Tuesday, April 05, 2016 12.38
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with 
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply

RE: New consumer API waits indefinitely

2016-04-12 Thread Lohith Samaga M
Dear All,
I installed Kafka on a Linux VM.
Here too:
1. The producer is able to store messages in the topic (sent from 
Windows host).
2. The consumer is unable to read it either from Windows host or from 
kafka-console-consumer on the Linux VM console.

In the logs, I see:
[2016-04-12 16:51:00,672] INFO [GroupCoordinator 0]: Stabilized group 
console-consumer-39913 generation 1 (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:00,676] INFO [GroupCoordinator 0]: Assignment received from 
leader for group console-consumer-39913 for generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,638] INFO [GroupCoordinator 0]: Preparing to restabilize 
group console-consumer-39913 with old generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,640] INFO [GroupCoordinator 0]: Group 
console-consumer-39913 generation 1 is dead and removed 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:53:08,489] INFO [Group Metadata Manager on Broker 0]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

When I run my Java code, I still get the exception - 
org.apache.kafka.clients.consumer.internals.SendFailedException


So, is it advisable to use the old consumer on Kafka 0.9.0.1?

Please help.

Thanks in advance.


Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com] 
Sent: Tuesday, April 05, 2016 13.36
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael, Niko,
After cleaning up the zookeeper and kafka logs, I do not get the below 
server exception anymore. I think Kafka did not like me opening the .log file 
in notepad.

The only exception that I now get is 
org.apache.kafka.clients.consumer.internals.SendFailedException in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler.
After that, the consumer goes into a loop.

Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com]
Sent: Tuesday, April 05, 2016 12.38
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with 
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:165)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
at kafka.cluster.Partition.makeLeader(Partition.scala:165)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:692)
at kafka.server.ReplicaManager$$anonfun$makeLeaders

RE: New consumer API waits indefinitely

2016-04-05 Thread Lohith Samaga M
Hi Ismael, Niko,
After cleaning up the zookeeper and kafka logs, I do not get the below 
server exception anymore. I think Kafka did not like me opening the .log file 
in notepad.

The only exception that I now get is 
org.apache.kafka.clients.consumer.internals.SendFailedException in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler.
After that, the consumer goes into a loop.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com] 
Sent: Tuesday, April 05, 2016 12.38
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with 
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:165)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
at kafka.cluster.Partition.makeLeader(Partition.scala:165)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:692)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:691)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:691)
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.sca
la:637)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:131)

at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:724)




Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: Monday, April 04, 2016 17.21
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Hi Lohith,

Are there any errors in your broker logs? I think there may be some issues with 
compacted topics on Windows and the new consumer uses a compacted topic to 
store offsets.

Ismael

On Mon, Apr 4, 2016 at 12:20 PM, Lohith Samaga M <lohith.sam...@mphasis.com>
wrote:

> Dear All,
> The error seems to be NOT_COORDINATOR_FOR_GROUP.
> The exception thrown in
> org.apache.kafka.clients.consumer.inter

RE: New consumer API waits indefinitely

2016-04-05 Thread Lohith Samaga M
Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:165)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
at kafka.cluster.Partition.makeLeader(Partition.scala:165)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:692)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:691)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:691)
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.sca
la:637)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:131)

at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:724)




Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: Monday, April 04, 2016 17.21
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Hi Lohith,

Are there any errors in your broker logs? I think there may be some issues with 
compacted topics on Windows and the new consumer uses a compacted topic to 
store offsets.

Ismael

On Mon, Apr 4, 2016 at 12:20 PM, Lohith Samaga M <lohith.sam...@mphasis.com>
wrote:

> Dear All,
> The error seems to be NOT_COORDINATOR_FOR_GROUP.
> The exception thrown in
> org.apache.kafka.clients.consumer.internals.RequestFuture is:
> org.apache.kafka.common.errors.NotCoordinatorForGroupException:
> This is not the correct coordinator for this group.
>
> However, this exception is considered RetriableException in 
> org.apache.kafka.clients.consumer.internals.RequestFuture.
> So, the retry goes on - in a loop.
>
> It also happens that the Coordinator object becomes null in 
> AbstractCoordinator class.
>
> Can somebody please help?
>
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations M. 
> Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 12.22
> 

RE: New consumer API waits indefinitely

2016-04-05 Thread Lohith Samaga M
Thanks Niko!

I think I missed an 
org.apache.kafka.clients.consumer.internals.SendFailedException exception at 
the very beginning (or atleast it is giving an exception today).

Even after using a new install of Kafka, I get the same errors. Strangely, all 
topics are re-created in the logs. I cannot find the data directory in my drive.
How can I cleanup and start again?

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga


-Original Message-
From: Niko Davor [mailto:nikoda...@gmail.com] 
Sent: Monday, April 04, 2016 23.59
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

M. Lohith Samaga,

Your Java code looks fine.

Usually, if consumer.poll(100); doesn't return, there is probably a basic 
connection error. If Kafka can't connect, it will internally go into an 
infinite loop. To me, that doesn't seem like a good design, but that's a 
separate tangent.

Turn SLF4J root logging up to debug and you will probably see the connection 
error messages.

A second thought is it might be worth trying using Kafka on a small Linux VM. 
The docs say, "Windows is not currently a well supported platform though we 
would be happy to change that.". Even if you want to use Windows as a server in 
the long run, at least as a development test option, I'd want to be able to 
test with a Linux VM.

FYI, I'm a Kafka newbie, and I've had no problems getting working code samples 
up and running with Kafka 0.9.0.1 and the new Producer/Consumer APIs. I've 
gotten code samples running in Java, Scala, and Python, and everything works, 
including cross language tests.

Lastly, as a mailing list question, how do I reply to a question like this if I 
see the original question in the web archives but it is not in my mail client? 
I suspect that this reply will show up as a different thread which is not what 
I want.
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Re: New consumer API waits indefinitely

2016-04-04 Thread Ratha v
This is the same logs i get with my local kafka server, that works fine..

On 5 April 2016 at 10:20, Ratha v  wrote:

> HI Niko;
> I face this issue with linux systems..
> I changed the logging level to debug and when I start and stop my consumer
> (stopping the program)
>  I get same exception. What is the cause here?
>
> [2016-04-05 00:01:08,784] DEBUG Connection with /192.xx.xx.248
> disconnected (org.apache.kafka.common.network.Selector)
>
> kafka_1 | java.io.EOFException
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>
> kafka_1 | at
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>
> kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)
>
> kafka_1 | at java.lang.Thread.run(Thread.java:745)
>
> kafka_1 | [2016-04-05 00:01:09,236] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:11,236] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:13,238] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:14,078] DEBUG Connection with /192.168.0.248
> disconnected (org.apache.kafka.common.network.Selector)
>
> kafka_1 | java.io.EOFException
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>
> kafka_1 | at
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>
> kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)
>
> kafka_1 | at java.lang.Thread.run(Thread.java:745)
>
> kafka_1 | [2016-04-05 00:01:15,240] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:17,240] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:19,242] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:19,558] DEBUG Connection with /192.xx.xx.248
> disconnected (org.apache.kafka.common.network.Selector)
>
> kafka_1 | java.io.EOFException
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>
> kafka_1 | at
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>
> kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)
>
> kafka_1 | at java.lang.Thread.run(Thread.java:745)
>
> kafka_1 | [2016-04-05 00:01:21,242] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnx
>
>
> On 5 April 2016 at 04:29, Niko Davor  wrote:
>
>> M. Lohith Samaga,
>>
>> Your Java code looks fine.
>>
>> Usually, if consumer.poll(100); doesn't return, there is probably a basic
>> connection error. If Kafka can't connect, it will internally go into an
>> infinite loop. To me, that doesn't seem like a good design, but that's a
>> separate tangent.
>>
>> Turn SLF4J root logging up to debug and you will probably see the
>> connection error messages.
>>
>> A second thought is it might be worth trying using Kafka on a small Linux
>> VM. The docs say, "Windows is not currently a well supported platform
>> though we would be happy to change that.". Even if you want to use Windows
>> as a server in the long run, at least as a development test option, I'd
>> want to be able to test with a Linux VM.
>>
>> FYI, I'm a Kafka newbie, and I've had no problems getting working code
>> samples up and running with Kafka 0.9.0.1 and the new Producer/Consumer
>> APIs. I've gotten code samples running in Java, Scala, and Python, and
>> everything works, including cross language tests.
>>
>> Lastly, as a mailing list question, how do I reply to a question like this
>> if I see the original question in the web archives but it is not in my
>> mail
>> client? I suspect that this reply will show up as a 

Re: New consumer API waits indefinitely

2016-04-04 Thread Ratha v
HI Niko;
I face this issue with linux systems..
I changed the logging level to debug and when I start and stop my consumer
(stopping the program)
 I get same exception. What is the cause here?

[2016-04-05 00:01:08,784] DEBUG Connection with /192.xx.xx.248 disconnected
(org.apache.kafka.common.network.Selector)

kafka_1 | java.io.EOFException

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)

kafka_1 | at
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)

kafka_1 | at java.lang.Thread.run(Thread.java:745)

kafka_1 | [2016-04-05 00:01:09,236] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:11,236] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:13,238] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:14,078] DEBUG Connection with /192.168.0.248
disconnected (org.apache.kafka.common.network.Selector)

kafka_1 | java.io.EOFException

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)

kafka_1 | at
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)

kafka_1 | at java.lang.Thread.run(Thread.java:745)

kafka_1 | [2016-04-05 00:01:15,240] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:17,240] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:19,242] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:19,558] DEBUG Connection with /192.xx.xx.248
disconnected (org.apache.kafka.common.network.Selector)

kafka_1 | java.io.EOFException

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)

kafka_1 | at
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)

kafka_1 | at java.lang.Thread.run(Thread.java:745)

kafka_1 | [2016-04-05 00:01:21,242] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnx


On 5 April 2016 at 04:29, Niko Davor  wrote:

> M. Lohith Samaga,
>
> Your Java code looks fine.
>
> Usually, if consumer.poll(100); doesn't return, there is probably a basic
> connection error. If Kafka can't connect, it will internally go into an
> infinite loop. To me, that doesn't seem like a good design, but that's a
> separate tangent.
>
> Turn SLF4J root logging up to debug and you will probably see the
> connection error messages.
>
> A second thought is it might be worth trying using Kafka on a small Linux
> VM. The docs say, "Windows is not currently a well supported platform
> though we would be happy to change that.". Even if you want to use Windows
> as a server in the long run, at least as a development test option, I'd
> want to be able to test with a Linux VM.
>
> FYI, I'm a Kafka newbie, and I've had no problems getting working code
> samples up and running with Kafka 0.9.0.1 and the new Producer/Consumer
> APIs. I've gotten code samples running in Java, Scala, and Python, and
> everything works, including cross language tests.
>
> Lastly, as a mailing list question, how do I reply to a question like this
> if I see the original question in the web archives but it is not in my mail
> client? I suspect that this reply will show up as a different thread which
> is not what I want.
>



-- 
-Ratha
http://vvratha.blogspot.com/


RE: New consumer API waits indefinitely

2016-04-04 Thread Niko Davor
M. Lohith Samaga,

Your Java code looks fine.

Usually, if consumer.poll(100); doesn't return, there is probably a basic
connection error. If Kafka can't connect, it will internally go into an
infinite loop. To me, that doesn't seem like a good design, but that's a
separate tangent.

Turn SLF4J root logging up to debug and you will probably see the
connection error messages.

A second thought is it might be worth trying using Kafka on a small Linux
VM. The docs say, "Windows is not currently a well supported platform
though we would be happy to change that.". Even if you want to use Windows
as a server in the long run, at least as a development test option, I'd
want to be able to test with a Linux VM.

FYI, I'm a Kafka newbie, and I've had no problems getting working code
samples up and running with Kafka 0.9.0.1 and the new Producer/Consumer
APIs. I've gotten code samples running in Java, Scala, and Python, and
everything works, including cross language tests.

Lastly, as a mailing list question, how do I reply to a question like this
if I see the original question in the web archives but it is not in my mail
client? I suspect that this reply will show up as a different thread which
is not what I want.


Re: New consumer API waits indefinitely

2016-04-04 Thread Ismael Juma
Hi Lohith,

Are there any errors in your broker logs? I think there may be some issues
with compacted topics on Windows and the new consumer uses a compacted
topic to store offsets.

Ismael

On Mon, Apr 4, 2016 at 12:20 PM, Lohith Samaga M <lohith.sam...@mphasis.com>
wrote:

> Dear All,
> The error seems to be NOT_COORDINATOR_FOR_GROUP.
> The exception thrown in
> org.apache.kafka.clients.consumer.internals.RequestFuture is:
> org.apache.kafka.common.errors.NotCoordinatorForGroupException:
> This is not the correct coordinator for this group.
>
> However, this exception is considered RetriableException in
> org.apache.kafka.clients.consumer.internals.RequestFuture.
> So, the retry goes on - in a loop.
>
> It also happens that the Coordinator object becomes null in
> AbstractCoordinator class.
>
> Can somebody please help?
>
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations
> M. Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 12.22
> To: users@kafka.apache.org
> Subject: Re: New consumer API waits indefinitely
>
> Still struggling :)
> Check following threads;
>
>- If my producer producing, then why the consumer couldn't consume? it
>stuck @ poll()
>- Consumer thread is waiting forever, not returning any objects
>
>
> I think new APIs are recommended.
>
>
> On 4 April 2016 at 16:37, Lohith Samaga M <lohith.sam...@mphasis.com>
> wrote:
>
> > Thanks for letting me know.
> >
> > Is there any work around? A fix?
> >
> > Which set of API is recommended for production use?
> >
> > Best regards / Mit freundlichen Grüßen / Sincères salutations M.
> > Lohith Samaga
> >
> >
> >
> >
> > -Original Message-
> > From: Ratha v [mailto:vijayara...@gmail.com]
> > Sent: Monday, April 04, 2016 11.27
> > To: users@kafka.apache.org
> > Subject: Re: New consumer API waits indefinitely
> >
> > I too face same issue:(
> >
> > On 4 April 2016 at 15:51, Lohith Samaga M <lohith.sam...@mphasis.com>
> > wrote:
> >
> > > HI,
> > > Good morning.
> > >
> > > I am new to Kafka. So, please bear with me.
> > > I am using the new Producer and Consumer API with
> > > Kafka
> > > 0.9.0.1 running on Windows 7 laptop with zookeeper.
> > >
> > > I was able to send messages using the new Producer
> > > API. I can see the messages in the Kafka data directory.
> > >
> > > However, when I run the consumer, it does not
> > > retrieve the messages. It keeps waiting for the messages indefinitely.
> > > My code (taken from Javadoc and modified)  is as below:
> > >
> > > props.put("bootstrap.servers", "localhost:9092");
> > > props.put("group.id", "new01");
> > > props.put("enable.auto.commit", "true");
> > > props.put("auto.commit.interval.ms", "1000");
> > > props.put("session.timeout.ms", "3");
> > > props.put("key.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > > props.put("value.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > > KafkaConsumer<String, String> consumer = new
> > > KafkaConsumer<>(props);
> > > consumer.subscribe(Arrays.asList("new-producer"));
> > > while (true) {
> > > ConsumerRecords<String, String> records =
> > > consumer.poll(100);
> > > for (ConsumerRecord<String, String> record : records)
> > > System.out.printf("offset = %d, key = %s, value
> > > = %s", record.offset(), record.key(), record.value());
> > > }
> > >
> > > Can anybody please tell me what went wrong?
> > >
> > > Thanks & Regards,
> > > M. Lohith Samaga
> > >
> > > Information transmitted by this e-mail is proprietary to Mphasis,
> > > its associated companies and/ or its customers and is intended for
> > > use only by the individual or entity to which it is addr

RE: New consumer API waits indefinitely

2016-04-04 Thread Lohith Samaga M
Thanks Ratha.

I am trying tounderstand the code...

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga


-Original Message-
From: Ratha v [mailto:vijayara...@gmail.com] 
Sent: Monday, April 04, 2016 12.22
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Still struggling :)
Check following threads;

   - If my producer producing, then why the consumer couldn't consume? it
   stuck @ poll()
   - Consumer thread is waiting forever, not returning any objects


I think new APIs are recommended.


On 4 April 2016 at 16:37, Lohith Samaga M <lohith.sam...@mphasis.com> wrote:

> Thanks for letting me know.
>
> Is there any work around? A fix?
>
> Which set of API is recommended for production use?
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations M. 
> Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 11.27
> To: users@kafka.apache.org
> Subject: Re: New consumer API waits indefinitely
>
> I too face same issue:(
>
> On 4 April 2016 at 15:51, Lohith Samaga M <lohith.sam...@mphasis.com>
> wrote:
>
> > HI,
> > Good morning.
> >
> > I am new to Kafka. So, please bear with me.
> > I am using the new Producer and Consumer API with 
> > Kafka
> > 0.9.0.1 running on Windows 7 laptop with zookeeper.
> >
> > I was able to send messages using the new Producer 
> > API. I can see the messages in the Kafka data directory.
> >
> > However, when I run the consumer, it does not 
> > retrieve the messages. It keeps waiting for the messages indefinitely.
> > My code (taken from Javadoc and modified)  is as below:
> >
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", "new01");
> > props.put("enable.auto.commit", "true");
> > props.put("auto.commit.interval.ms", "1000");
> > props.put("session.timeout.ms", "3");
> > props.put("key.deserializer", 
> > "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer", 
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer<String, String> consumer = new 
> > KafkaConsumer<>(props);
> > consumer.subscribe(Arrays.asList("new-producer"));
> > while (true) {
> > ConsumerRecords<String, String> records = 
> > consumer.poll(100);
> > for (ConsumerRecord<String, String> record : records)
> > System.out.printf("offset = %d, key = %s, value 
> > = %s", record.offset(), record.key(), record.value());
> > }
> >
> > Can anybody please tell me what went wrong?
> >
> > Thanks & Regards,
> > M. Lohith Samaga
> >
> > Information transmitted by this e-mail is proprietary to Mphasis, 
> > its associated companies and/ or its customers and is intended for 
> > use only by the individual or entity to which it is addressed, and 
> > may contain information that is privileged, confidential or exempt 
> > from disclosure under applicable law. If you are not the intended 
> > recipient or it appears that this mail has been forwarded to you 
> > without proper authority, you are notified that any use or 
> > dissemination of this information in any manner is strictly 
> > prohibited. In such cases, please notify us immediately at 
> > mailmas...@mphasis.com and delete this mail from your records.
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended for use 
> only by the individual or entity to which it is addressed, and may 
> contain information that is privileged, confidential or exempt from 
> disclosure under applicable law. If you are not the intended recipient 
> or it appears that this mail has been forwarded to you without proper 
> authority, you are notified that any use or dissemination of this 
> information in any manner is strictly prohibited. In such cases, 
> please notify us immediately at mailmas...@mphasis.com and delete this 
> mail from your records.
>



--
-Ratha
http://vvratha.blogspot.c

Re: New consumer API waits indefinitely

2016-04-04 Thread Ratha v
Still struggling :)
Check following threads;

   - If my producer producing, then why the consumer couldn't consume? it
   stuck @ poll()
   - Consumer thread is waiting forever, not returning any objects


I think new APIs are recommended.


On 4 April 2016 at 16:37, Lohith Samaga M <lohith.sam...@mphasis.com> wrote:

> Thanks for letting me know.
>
> Is there any work around? A fix?
>
> Which set of API is recommended for production use?
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations
> M. Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 11.27
> To: users@kafka.apache.org
> Subject: Re: New consumer API waits indefinitely
>
> I too face same issue:(
>
> On 4 April 2016 at 15:51, Lohith Samaga M <lohith.sam...@mphasis.com>
> wrote:
>
> > HI,
> > Good morning.
> >
> > I am new to Kafka. So, please bear with me.
> > I am using the new Producer and Consumer API with
> > Kafka
> > 0.9.0.1 running on Windows 7 laptop with zookeeper.
> >
> > I was able to send messages using the new Producer
> > API. I can see the messages in the Kafka data directory.
> >
> > However, when I run the consumer, it does not retrieve
> > the messages. It keeps waiting for the messages indefinitely.
> > My code (taken from Javadoc and modified)  is as below:
> >
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", "new01");
> > props.put("enable.auto.commit", "true");
> > props.put("auto.commit.interval.ms", "1000");
> > props.put("session.timeout.ms", "3");
> > props.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer<String, String> consumer = new
> > KafkaConsumer<>(props);
> > consumer.subscribe(Arrays.asList("new-producer"));
> > while (true) {
> > ConsumerRecords<String, String> records =
> > consumer.poll(100);
> > for (ConsumerRecord<String, String> record : records)
> > System.out.printf("offset = %d, key = %s, value =
> > %s", record.offset(), record.key(), record.value());
> > }
> >
> > Can anybody please tell me what went wrong?
> >
> > Thanks & Regards,
> > M. Lohith Samaga
> >
> > Information transmitted by this e-mail is proprietary to Mphasis, its
> > associated companies and/ or its customers and is intended for use
> > only by the individual or entity to which it is addressed, and may
> > contain information that is privileged, confidential or exempt from
> > disclosure under applicable law. If you are not the intended recipient
> > or it appears that this mail has been forwarded to you without proper
> > authority, you are notified that any use or dissemination of this
> > information in any manner is strictly prohibited. In such cases,
> > please notify us immediately at mailmas...@mphasis.com and delete this
> > mail from your records.
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>



-- 
-Ratha
http://vvratha.blogspot.com/


RE: New consumer API waits indefinitely

2016-04-04 Thread Lohith Samaga M
Thanks for letting me know.

Is there any work around? A fix?

Which set of API is recommended for production use?

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga




-Original Message-
From: Ratha v [mailto:vijayara...@gmail.com] 
Sent: Monday, April 04, 2016 11.27
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

I too face same issue:(

On 4 April 2016 at 15:51, Lohith Samaga M <lohith.sam...@mphasis.com> wrote:

> HI,
> Good morning.
>
> I am new to Kafka. So, please bear with me.
> I am using the new Producer and Consumer API with 
> Kafka
> 0.9.0.1 running on Windows 7 laptop with zookeeper.
>
> I was able to send messages using the new Producer 
> API. I can see the messages in the Kafka data directory.
>
> However, when I run the consumer, it does not retrieve 
> the messages. It keeps waiting for the messages indefinitely.
> My code (taken from Javadoc and modified)  is as below:
>
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "new01");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("session.timeout.ms", "3");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer<String, String> consumer = new 
> KafkaConsumer<>(props);
> consumer.subscribe(Arrays.asList("new-producer"));
> while (true) {
> ConsumerRecords<String, String> records = 
> consumer.poll(100);
> for (ConsumerRecord<String, String> record : records)
> System.out.printf("offset = %d, key = %s, value = 
> %s", record.offset(), record.key(), record.value());
> }
>
> Can anybody please tell me what went wrong?
>
> Thanks & Regards,
> M. Lohith Samaga
>
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended for use 
> only by the individual or entity to which it is addressed, and may 
> contain information that is privileged, confidential or exempt from 
> disclosure under applicable law. If you are not the intended recipient 
> or it appears that this mail has been forwarded to you without proper 
> authority, you are notified that any use or dissemination of this 
> information in any manner is strictly prohibited. In such cases, 
> please notify us immediately at mailmas...@mphasis.com and delete this 
> mail from your records.
>



--
-Ratha
http://vvratha.blogspot.com/
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Re: New consumer API waits indefinitely

2016-04-03 Thread Ratha v
I too face same issue:(

On 4 April 2016 at 15:51, Lohith Samaga M  wrote:

> HI,
> Good morning.
>
> I am new to Kafka. So, please bear with me.
> I am using the new Producer and Consumer API with Kafka
> 0.9.0.1 running on Windows 7 laptop with zookeeper.
>
> I was able to send messages using the new Producer API. I
> can see the messages in the Kafka data directory.
>
> However, when I run the consumer, it does not retrieve the
> messages. It keeps waiting for the messages indefinitely.
> My code (taken from Javadoc and modified)  is as below:
>
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "new01");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("session.timeout.ms", "3");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer consumer = new
> KafkaConsumer<>(props);
> consumer.subscribe(Arrays.asList("new-producer"));
> while (true) {
> ConsumerRecords records =
> consumer.poll(100);
> for (ConsumerRecord record : records)
> System.out.printf("offset = %d, key = %s, value = %s",
> record.offset(), record.key(), record.value());
> }
>
> Can anybody please tell me what went wrong?
>
> Thanks & Regards,
> M. Lohith Samaga
>
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>



-- 
-Ratha
http://vvratha.blogspot.com/