RE: Kafka usecase

2016-09-18 Thread Lohith Samaga M
Hi Achintya,
1. Kafka can be used as a message store.
2. What is the message arrival rate per second?
3. What is the SLA for the messages to be processed?
4. If your messages arrive faster than they are consumed, you will get 
a backlog of messages. In that case, you may need to grow your cluster so that 
more messages are processed in parallel.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Ghosh, Achintya (Contractor) [mailto:achintya_gh...@comcast.com] 
Sent: Monday, September 19, 2016 08.39
To: users@kafka.apache.org
Cc: d...@kafka.apache.org
Subject: Kafka usecase

Hi there,

We have an usecase where we do a lot of business logic to process each message 
and sometime it takes 1-2 sec, so will be Kafka fit in our usecase?

Thanks
Achintya
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.



RE: Building API to make Kafka reactive

2016-06-28 Thread Lohith Samaga M
Hi Shekar,
Alternatively, you could make each stage of your pipeline to write to a 
Cassandra (or other DB) and your API will read from it. With Cassandra TTL, the 
row will be deleted after TTL is passed. No manual cleanup is required.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Shekar Tippur [mailto:ctip...@gmail.com] 
Sent: Wednesday, June 29, 2016 12.10
To: users
Subject: Building API to make Kafka reactive

I am looking at building a reactive api on top of Kafka.
This API produces event to Kafka topic. I want to add a unique session id into 
the payload.
The data gets transformed as it goes through different stages of a pipeline. I 
want to specify a final topic where I want the api to know that the processing 
was successful.
The API should give different status at each part of the pipeline.
At the ingestion, the API responds with "submitted"
During the progression, the API returns "in progress"
After successful completion, the API returns "Success"

Couple of questions:
1. Is this feasible?
2. I was looking at project reactor (https://projectreactor.io) where the docs 
talk about event bus. I wanted to see if I can implement a consumer that points 
to the "end" topic and throws an event into the event bus.
Since I would know the session ID, I can process the request accordingly.

Appreciate your inputs.

- Shekar
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


RE: Kafka HDFS Connector

2016-06-22 Thread Lohith Samaga M
Hi,
You can use Storm also, Here you have the option of rotating the file. 
You can also write to Hive directly.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga




-Original Message-
From: Mudit Kumar [mailto:mudit.ku...@askme.in] 
Sent: Wednesday, June 22, 2016 12.32
To: users@kafka.apache.org; d...@kafka.apache.org
Subject: Re: Kafka HDFS Connector

I think you can use flume also.

Thanks,
Mudit




On 6/22/16, 12:29 PM, "Pariksheet Barapatre"  wrote:

>Anybody have any idea on this?
>
>Thanks
>Pari
>
>On 20 June 2016 at 14:36, Pariksheet Barapatre 
>wrote:
>
>> Hello All,
>>
>> I have data coming from sensors into kafka cluster in text format 
>> delimited by comma.
>>
>> How to offload this data to Hive periodically from Kafka. I guess, 
>> Kafka Connect should solve my problem but when I checked 
>> documentation, examples have only avro formatted data. Can you please 
>> provide some knowledge on this.
>>
>> Many Thanks
>> Pari
>>

Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Re: Filter plugins in Kafka

2016-04-29 Thread Lohith Samaga M
Hi
Using Storm would be another way. This will scale as well.

Spark streaming would fit as well

It all depends on the complexity of the filter and any additional processing 
required.

HTH

Lohith

Sent from my Sony Xperia™ smartphone


 Gerard Klijs wrote 

Using kafka streams is one way, I used camel before with kafka, which also
has a nice way of using filters.

On Fri, Apr 29, 2016 at 1:51 PM Subramanian Karunanithi 
wrote:

> Hi,
>
> When a stream of data passes through Kafka, wanted to apply the filter and
> then let that message pass through to partitions.
>
> Regards,
> Subramanian. K
> On Apr 26, 2016 12:33, "Marko Bonaći"  wrote:
>
> > Instantly reminded me of Streams API, where you can use Java8 streams
> > semantics (filter being one of them) to do the first thing in Gouzhang's
> > response (filter messages from one topic into another - I assume that's
> > what you were looking for).
> >
> > Marko Bonaći
> > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > Solr & Elasticsearch Support
> > Sematext  | Contact
> > 
> >
> > On Tue, Apr 26, 2016 at 6:22 PM, Guozhang Wang 
> wrote:
> >
> > > Hi Subramanian,
> > >
> > > Could you elaborate a bit more on "filtering"? Do you want to read raw
> > data
> > > from Kafka, and send the filtered data back to Kafka as a separate
> topic,
> > > or do you want to read raw data from an external service and send the
> > > filtered data into Kafka?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Apr 26, 2016 at 8:07 AM, Subramanian Karunanithi <
> > > subub...@gmail.com
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Do we have any plugin which is available, which can be used as a
> > > filtering
> > > > mechanism on the data it's working on?
> > > >
> > > > Regards,
> > > > Subramanian. K
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


RE: How to build strategy for particular setup of kafka

2016-04-22 Thread Lohith Samaga M
Hi,
Please set up a Kafka cluster. So, you can get high throughput as well 
as high availability.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga


-Original Message-
From: Gaurav Agarwal [mailto:gaurav130...@gmail.com] 
Sent: Friday, April 22, 2016 12.44
To: users@kafka.apache.org
Subject: Re: How to build strategy for particular setup of kafka

Hi
You can have one or two instances of Kafka but you can have one or two Kafka 
topic dedicated to each application according to the need. Partition will have 
u in increasing the throughput and consumer group id can help u to make queue 
as topic or queue.
On Apr 22, 2016 12:37 PM, "Kuldeep Kamboj" 
wrote:

> Thanks for reply,
>
> I understand the your point, But my whole strategy depend on first 
> issue and that is how I can integrate Apps in architecture. Partition 
> / Consumer groups have different purpose. If I need to setup three 
> kafka instances each for App ?
>
> On Fri, Apr 22, 2016 at 12:30 PM, Lohith Samaga M < 
> lohith.sam...@mphasis.com
> > wrote:
>
> > Hi,
> > It is better NOT to share topics among applications. You may 
> > have a wrapper application reading from the queue/topic and routing 
> > it to the correct application, but it is simpler for each 
> > application to read from its own topic.
> >
> > Best regards / Mit freundlichen Grüßen / Sincères salutations M. 
> > Lohith Samaga
> >
> >
> >
> > -Original Message-
> > From: Kuldeep Kamboj [mailto:kuldeep.kam...@osscube.com]
> > Sent: Friday, April 22, 2016 12.08
> > To: users@kafka.apache.org
> > Subject: How to build strategy for particular setup of kafka
> >
> > I have three applications let named AppA, AppB, AppC. All could have 
> > several some message queue named like
> >
> > AppA
> >MsgQueueA
> >MsgQueueB
> > AppB
> >MsgQueueA
> >MsgQueueB
> >MsgQueueC
> > AppC
> >MsgQueueA
> >MsgQueueB
> >MsgQueueD
> >
> > I have very little exposure for Apache Kafka and even for message 
> > queue concepts.
> >
> > I just thinking to create topics for MsgQueueA, MsgQueueB and so on. 
> > But
> I
> > am confused how to integrate AppA, AppB, AppC. Partition and 
> > Consumer
> group
> > look like wrong choices for them.
> >
> > Also If separation of queue/topic for Apps is better idea instead of 
> > re-using them for different Apps for better debugging Like below
> >
> >
> > AppA
> >AppAMsgQueueA
> >AppAMsgQueueB
> > AppB
> >AppBMsgQueueA
> >AppBMsgQueueB
> >AppBMsgQueueC
> > AppC
> >AppCMsgQueueA
> >AppCMsgQueueB
> >AppCMsgQueueD
> >
> >
> >
> >
> > --
> > Kuldeep Kamboj
> > Information transmitted by this e-mail is proprietary to Mphasis, 
> > its associated companies and/ or its customers and is intended for 
> > use only by the individual or entity to which it is addressed, and
> may
> > contain information that is privileged, confidential or exempt from 
> > disclosure under applicable law. If you are not the intended 
> > recipient or it appears that this mail has been forwarded to you 
> > without proper authority, you are notified that any use or 
> > dissemination of this information in any manner is strictly 
> > prohibited. In such cases, please notify us immediately at 
> > mailmas...@mphasis.com and delete this mail from your records.
> >
>
>
>
> --
> Kuldeep Kamboj
> Mob. 9871662849
>
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


RE: How to build strategy for particular setup of kafka

2016-04-22 Thread Lohith Samaga M
Hi,
It is better NOT to share topics among applications. You may have a 
wrapper application reading from the queue/topic and routing it to the correct 
application, but it is simpler for each application to read from its own topic.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Kuldeep Kamboj [mailto:kuldeep.kam...@osscube.com] 
Sent: Friday, April 22, 2016 12.08
To: users@kafka.apache.org
Subject: How to build strategy for particular setup of kafka

I have three applications let named AppA, AppB, AppC. All could have several 
some message queue named like

AppA
   MsgQueueA
   MsgQueueB
AppB
   MsgQueueA
   MsgQueueB
   MsgQueueC
AppC
   MsgQueueA
   MsgQueueB
   MsgQueueD

I have very little exposure for Apache Kafka and even for message queue 
concepts.

I just thinking to create topics for MsgQueueA, MsgQueueB and so on. But I am 
confused how to integrate AppA, AppB, AppC. Partition and Consumer group look 
like wrong choices for them.

Also If separation of queue/topic for Apps is better idea instead of re-using 
them for different Apps for better debugging Like below


AppA
   AppAMsgQueueA
   AppAMsgQueueB
AppB
   AppBMsgQueueA
   AppBMsgQueueB
   AppBMsgQueueC
AppC
   AppCMsgQueueA
   AppCMsgQueueB
   AppCMsgQueueD




--
Kuldeep Kamboj
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


RE: New consumer API waits indefinitely

2016-04-12 Thread Lohith Samaga M
Dear All,
After a system restart, the new consumer is working as expected.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga




-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com] 
Sent: Tuesday, April 12, 2016 17.00
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Dear All,
I installed Kafka on a Linux VM.
Here too:
1. The producer is able to store messages in the topic (sent from 
Windows host).
2. The consumer is unable to read it either from Windows host or from 
kafka-console-consumer on the Linux VM console.

In the logs, I see:
[2016-04-12 16:51:00,672] INFO [GroupCoordinator 0]: Stabilized group 
console-consumer-39913 generation 1 (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:00,676] INFO [GroupCoordinator 0]: Assignment received from 
leader for group console-consumer-39913 for generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,638] INFO [GroupCoordinator 0]: Preparing to restabilize 
group console-consumer-39913 with old generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,640] INFO [GroupCoordinator 0]: Group 
console-consumer-39913 generation 1 is dead and removed 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:53:08,489] INFO [Group Metadata Manager on Broker 0]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

When I run my Java code, I still get the exception - 
org.apache.kafka.clients.consumer.internals.SendFailedException


So, is it advisable to use the old consumer on Kafka 0.9.0.1?

Please help.

Thanks in advance.


Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com]
Sent: Tuesday, April 05, 2016 13.36
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael, Niko,
After cleaning up the zookeeper and kafka logs, I do not get the below 
server exception anymore. I think Kafka did not like me opening the .log file 
in notepad.

The only exception that I now get is 
org.apache.kafka.clients.consumer.internals.SendFailedException in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler.
After that, the consumer goes into a loop.

Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com]
Sent: Tuesday, April 05, 2016 12.38
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with 
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply

RE: New consumer API waits indefinitely

2016-04-12 Thread Lohith Samaga M
Dear All,
I installed Kafka on a Linux VM.
Here too:
1. The producer is able to store messages in the topic (sent from 
Windows host).
2. The consumer is unable to read it either from Windows host or from 
kafka-console-consumer on the Linux VM console.

In the logs, I see:
[2016-04-12 16:51:00,672] INFO [GroupCoordinator 0]: Stabilized group 
console-consumer-39913 generation 1 (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:00,676] INFO [GroupCoordinator 0]: Assignment received from 
leader for group console-consumer-39913 for generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,638] INFO [GroupCoordinator 0]: Preparing to restabilize 
group console-consumer-39913 with old generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,640] INFO [GroupCoordinator 0]: Group 
console-consumer-39913 generation 1 is dead and removed 
(kafka.coordinator.GroupCoordinator)
[2016-04-12 16:53:08,489] INFO [Group Metadata Manager on Broker 0]: Removed 0 
expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

When I run my Java code, I still get the exception - 
org.apache.kafka.clients.consumer.internals.SendFailedException


So, is it advisable to use the old consumer on Kafka 0.9.0.1?

Please help.

Thanks in advance.


Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com] 
Sent: Tuesday, April 05, 2016 13.36
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael, Niko,
After cleaning up the zookeeper and kafka logs, I do not get the below 
server exception anymore. I think Kafka did not like me opening the .log file 
in notepad.

The only exception that I now get is 
org.apache.kafka.clients.consumer.internals.SendFailedException in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler.
After that, the consumer goes into a loop.

Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com]
Sent: Tuesday, April 05, 2016 12.38
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with 
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:165)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
at kafka.cluster.Partition.makeLeader(Partition.scala:165)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:692)
at kafka.server.ReplicaManager$$anonfun$makeLeaders

RE: New consumer API waits indefinitely

2016-04-05 Thread Lohith Samaga M
Hi Ismael, Niko,
After cleaning up the zookeeper and kafka logs, I do not get the below 
server exception anymore. I think Kafka did not like me opening the .log file 
in notepad.

The only exception that I now get is 
org.apache.kafka.clients.consumer.internals.SendFailedException in 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler.
After that, the consumer goes into a loop.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com] 
Sent: Tuesday, April 05, 2016 12.38
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with 
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:165)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
at kafka.cluster.Partition.makeLeader(Partition.scala:165)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:692)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:691)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:691)
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.sca
la:637)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:131)

at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:724)




Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-Original Message-
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: Monday, April 04, 2016 17.21
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Hi Lohith,

Are there any errors in your broker logs? I think there may be some issues with 
compacted topics on Windows and the new consumer uses a compacted topic to 
store offsets.

Ismael

On Mon, Apr 4, 2016 at 12:20 PM, Lohith Samaga M 
wrote:

> Dear All,
> The error seems to be NOT_COORDINATOR_FOR_GROUP.
> The exception thrown in
> org.apache.kafka.clients.consumer.internals.Requ

RE: New consumer API waits indefinitely

2016-04-05 Thread Lohith Samaga M
Hi Ismael,
I see the following exception when I (re)start Kafka (even a fresh 
setup after the previous one). And where is the configuration to set the data 
directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with
a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at kafka.log.LogManager.createLog(LogManager.scala:357)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:165)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
at kafka.cluster.Partition.makeLeader(Partition.scala:165)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:692)
at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:691)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:691)
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.sca
la:637)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:131)

at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:724)




Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-Original Message-
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: Monday, April 04, 2016 17.21
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Hi Lohith,

Are there any errors in your broker logs? I think there may be some issues with 
compacted topics on Windows and the new consumer uses a compacted topic to 
store offsets.

Ismael

On Mon, Apr 4, 2016 at 12:20 PM, Lohith Samaga M 
wrote:

> Dear All,
> The error seems to be NOT_COORDINATOR_FOR_GROUP.
> The exception thrown in
> org.apache.kafka.clients.consumer.internals.RequestFuture is:
> org.apache.kafka.common.errors.NotCoordinatorForGroupException:
> This is not the correct coordinator for this group.
>
> However, this exception is considered RetriableException in 
> org.apache.kafka.clients.consumer.internals.RequestFuture.
> So, the retry goes on - in a loop.
>
> It also happens that the Coordinator object becomes null in 
> AbstractCoordinator class.
>
> Can somebody please help?
>
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations M. 
> Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 12.22
> To: users@kafka.apache.org
> Subjec

RE: New consumer API waits indefinitely

2016-04-05 Thread Lohith Samaga M
Thanks Niko!

I think I missed an 
org.apache.kafka.clients.consumer.internals.SendFailedException exception at 
the very beginning (or atleast it is giving an exception today).

Even after using a new install of Kafka, I get the same errors. Strangely, all 
topics are re-created in the logs. I cannot find the data directory in my drive.
How can I cleanup and start again?

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga


-Original Message-
From: Niko Davor [mailto:nikoda...@gmail.com] 
Sent: Monday, April 04, 2016 23.59
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

M. Lohith Samaga,

Your Java code looks fine.

Usually, if consumer.poll(100); doesn't return, there is probably a basic 
connection error. If Kafka can't connect, it will internally go into an 
infinite loop. To me, that doesn't seem like a good design, but that's a 
separate tangent.

Turn SLF4J root logging up to debug and you will probably see the connection 
error messages.

A second thought is it might be worth trying using Kafka on a small Linux VM. 
The docs say, "Windows is not currently a well supported platform though we 
would be happy to change that.". Even if you want to use Windows as a server in 
the long run, at least as a development test option, I'd want to be able to 
test with a Linux VM.

FYI, I'm a Kafka newbie, and I've had no problems getting working code samples 
up and running with Kafka 0.9.0.1 and the new Producer/Consumer APIs. I've 
gotten code samples running in Java, Scala, and Python, and everything works, 
including cross language tests.

Lastly, as a mailing list question, how do I reply to a question like this if I 
see the original question in the web archives but it is not in my mail client? 
I suspect that this reply will show up as a different thread which is not what 
I want.
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


RE: New consumer API waits indefinitely

2016-04-04 Thread Lohith Samaga M
Dear All,
The error seems to be NOT_COORDINATOR_FOR_GROUP.
The exception thrown in 
org.apache.kafka.clients.consumer.internals.RequestFuture is:
org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is 
not the correct coordinator for this group.

However, this exception is considered RetriableException in 
org.apache.kafka.clients.consumer.internals.RequestFuture.
So, the retry goes on - in a loop.

It also happens that the Coordinator object becomes null in 
AbstractCoordinator class.

Can somebody please help?


Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga




-Original Message-
From: Ratha v [mailto:vijayara...@gmail.com] 
Sent: Monday, April 04, 2016 12.22
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Still struggling :)
Check following threads;

   - If my producer producing, then why the consumer couldn't consume? it
   stuck @ poll()
   - Consumer thread is waiting forever, not returning any objects


I think new APIs are recommended.


On 4 April 2016 at 16:37, Lohith Samaga M  wrote:

> Thanks for letting me know.
>
> Is there any work around? A fix?
>
> Which set of API is recommended for production use?
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations M. 
> Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 11.27
> To: users@kafka.apache.org
> Subject: Re: New consumer API waits indefinitely
>
> I too face same issue:(
>
> On 4 April 2016 at 15:51, Lohith Samaga M 
> wrote:
>
> > HI,
> > Good morning.
> >
> > I am new to Kafka. So, please bear with me.
> > I am using the new Producer and Consumer API with 
> > Kafka
> > 0.9.0.1 running on Windows 7 laptop with zookeeper.
> >
> > I was able to send messages using the new Producer 
> > API. I can see the messages in the Kafka data directory.
> >
> > However, when I run the consumer, it does not 
> > retrieve the messages. It keeps waiting for the messages indefinitely.
> > My code (taken from Javadoc and modified)  is as below:
> >
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", "new01");
> > props.put("enable.auto.commit", "true");
> > props.put("auto.commit.interval.ms", "1000");
> > props.put("session.timeout.ms", "3");
> > props.put("key.deserializer", 
> > "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer", 
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer consumer = new 
> > KafkaConsumer<>(props);
> > consumer.subscribe(Arrays.asList("new-producer"));
> > while (true) {
> > ConsumerRecords records = 
> > consumer.poll(100);
> > for (ConsumerRecord record : records)
> > System.out.printf("offset = %d, key = %s, value 
> > = %s", record.offset(), record.key(), record.value());
> > }
> >
> > Can anybody please tell me what went wrong?
> >
> > Thanks & Regards,
> > M. Lohith Samaga
> >
> > Information transmitted by this e-mail is proprietary to Mphasis, 
> > its associated companies and/ or its customers and is intended for 
> > use only by the individual or entity to which it is addressed, and 
> > may contain information that is privileged, confidential or exempt 
> > from disclosure under applicable law. If you are not the intended 
> > recipient or it appears that this mail has been forwarded to you 
> > without proper authority, you are notified that any use or 
> > dissemination of this information in any manner is strictly 
> > prohibited. In such cases, please notify us immediately at 
> > mailmas...@mphasis.com and delete this mail from your records.
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended for use 
> only by the individual or entity to which it is addressed, and may 
> contain information that is privileged, confidential or exempt from 
> disclosure under applic

RE: New consumer API waits indefinitely

2016-04-04 Thread Lohith Samaga M
Thanks Ratha.

I am trying tounderstand the code...

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga


-Original Message-
From: Ratha v [mailto:vijayara...@gmail.com] 
Sent: Monday, April 04, 2016 12.22
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Still struggling :)
Check following threads;

   - If my producer producing, then why the consumer couldn't consume? it
   stuck @ poll()
   - Consumer thread is waiting forever, not returning any objects


I think new APIs are recommended.


On 4 April 2016 at 16:37, Lohith Samaga M  wrote:

> Thanks for letting me know.
>
> Is there any work around? A fix?
>
> Which set of API is recommended for production use?
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations M. 
> Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 11.27
> To: users@kafka.apache.org
> Subject: Re: New consumer API waits indefinitely
>
> I too face same issue:(
>
> On 4 April 2016 at 15:51, Lohith Samaga M 
> wrote:
>
> > HI,
> > Good morning.
> >
> > I am new to Kafka. So, please bear with me.
> > I am using the new Producer and Consumer API with 
> > Kafka
> > 0.9.0.1 running on Windows 7 laptop with zookeeper.
> >
> > I was able to send messages using the new Producer 
> > API. I can see the messages in the Kafka data directory.
> >
> > However, when I run the consumer, it does not 
> > retrieve the messages. It keeps waiting for the messages indefinitely.
> > My code (taken from Javadoc and modified)  is as below:
> >
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", "new01");
> > props.put("enable.auto.commit", "true");
> > props.put("auto.commit.interval.ms", "1000");
> > props.put("session.timeout.ms", "3");
> > props.put("key.deserializer", 
> > "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer", 
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer consumer = new 
> > KafkaConsumer<>(props);
> > consumer.subscribe(Arrays.asList("new-producer"));
> > while (true) {
> > ConsumerRecords records = 
> > consumer.poll(100);
> > for (ConsumerRecord record : records)
> > System.out.printf("offset = %d, key = %s, value 
> > = %s", record.offset(), record.key(), record.value());
> > }
> >
> > Can anybody please tell me what went wrong?
> >
> > Thanks & Regards,
> > M. Lohith Samaga
> >
> > Information transmitted by this e-mail is proprietary to Mphasis, 
> > its associated companies and/ or its customers and is intended for 
> > use only by the individual or entity to which it is addressed, and 
> > may contain information that is privileged, confidential or exempt 
> > from disclosure under applicable law. If you are not the intended 
> > recipient or it appears that this mail has been forwarded to you 
> > without proper authority, you are notified that any use or 
> > dissemination of this information in any manner is strictly 
> > prohibited. In such cases, please notify us immediately at 
> > mailmas...@mphasis.com and delete this mail from your records.
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended for use 
> only by the individual or entity to which it is addressed, and may 
> contain information that is privileged, confidential or exempt from 
> disclosure under applicable law. If you are not the intended recipient 
> or it appears that this mail has been forwarded to you without proper 
> authority, you are notified that any use or dissemination of this 
> information in any manner is strictly prohibited. In such cases, 
> please notify us immediately at mailmas...@mphasis.com and delete this 
> mail from your records.
>



--
-Ratha
http://vvratha.blogspot.com/
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


RE: New consumer API waits indefinitely

2016-04-03 Thread Lohith Samaga M
Thanks for letting me know.

Is there any work around? A fix?

Which set of API is recommended for production use?

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga




-Original Message-
From: Ratha v [mailto:vijayara...@gmail.com] 
Sent: Monday, April 04, 2016 11.27
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

I too face same issue:(

On 4 April 2016 at 15:51, Lohith Samaga M  wrote:

> HI,
> Good morning.
>
> I am new to Kafka. So, please bear with me.
> I am using the new Producer and Consumer API with 
> Kafka
> 0.9.0.1 running on Windows 7 laptop with zookeeper.
>
> I was able to send messages using the new Producer 
> API. I can see the messages in the Kafka data directory.
>
> However, when I run the consumer, it does not retrieve 
> the messages. It keeps waiting for the messages indefinitely.
> My code (taken from Javadoc and modified)  is as below:
>
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "new01");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("session.timeout.ms", "3");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer consumer = new 
> KafkaConsumer<>(props);
> consumer.subscribe(Arrays.asList("new-producer"));
> while (true) {
> ConsumerRecords records = 
> consumer.poll(100);
> for (ConsumerRecord record : records)
> System.out.printf("offset = %d, key = %s, value = 
> %s", record.offset(), record.key(), record.value());
> }
>
> Can anybody please tell me what went wrong?
>
> Thanks & Regards,
> M. Lohith Samaga
>
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended for use 
> only by the individual or entity to which it is addressed, and may 
> contain information that is privileged, confidential or exempt from 
> disclosure under applicable law. If you are not the intended recipient 
> or it appears that this mail has been forwarded to you without proper 
> authority, you are notified that any use or dissemination of this 
> information in any manner is strictly prohibited. In such cases, 
> please notify us immediately at mailmas...@mphasis.com and delete this 
> mail from your records.
>



--
-Ratha
http://vvratha.blogspot.com/
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


New consumer API waits indefinitely

2016-04-03 Thread Lohith Samaga M
HI,
Good morning.

I am new to Kafka. So, please bear with me.
I am using the new Producer and Consumer API with Kafka 0.9.0.1 
running on Windows 7 laptop with zookeeper.

I was able to send messages using the new Producer API. I can 
see the messages in the Kafka data directory.

However, when I run the consumer, it does not retrieve the 
messages. It keeps waiting for the messages indefinitely.
My code (taken from Javadoc and modified)  is as below:

props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "new01");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "3");
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("new-producer"));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
}

Can anybody please tell me what went wrong?

Thanks & Regards,
M. Lohith Samaga

Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.