Re: Idempotent Producers and Exactly Once Consumers

2019-09-29 Thread christopher palm
Does a Kafka Streams consumer also have that same limitation of possible
duplicates?

Thanks,
Chris

On Fri, Sep 27, 2019 at 11:56 AM Matthias J. Sax 
wrote:

> Enabling "read_committed" only ensures that a consumer does not return
> uncommitted data.
>
> However, on failure, a consumer might still read committed messages
> multiple times (if you commit offsets after processing). If you commit
> offsets before you process messages, and a failure happens before
> processing finishes, you may "loose" those messages, as they won't be
> consumed again on restart.
>
> Hence, if you have a "consumer only" application, not much changed and
> you still need to take care in your application code about potential
> duplicate processing of records.
>
> -Matthias
>
>
> On 9/27/19 7:34 AM, Alessandro Tagliapietra wrote:
> > You can achieve exactly once on a consumer by enabling read committed and
> > manually committing the offset as soon as you receive a message. That way
> > you know that at next poll you won't get old message again.
> >
> > On Fri, Sep 27, 2019, 6:24 AM christopher palm  wrote:
> >
> >> I had a similar question, and just watched the video on the
> confluent.io
> >> site about this.
> >> From what I understand idempotence and transactions are there to solve
> the
> >> duplicate writes and exactly once processing, respectively.
> >>
> >> Is what you are stating below is that this only works if we produce
> into a
> >> kafka topic and consume from it via a kafka stream, but a regular
> >> kafka consumer won't get the guarantee of exactly once processing?
> >>
> >> Thanks,
> >> Chris
> >>
> >>
> >> On Sat, Aug 31, 2019 at 12:29 AM Matthias J. Sax  >
> >> wrote:
> >>
> >>> Exactly-once on the producer will only ensure that no duplicate writes
> >>> happen. If a downstream consumer fails, you might still read message
> >>> multiple times for all cases (ie, without idempotence, with idempotence
> >>> enabled, or if you use transactions).
> >>>
> >>> Note, that exactly-once is designed for a read-process-write pattern,
> >>> but not for a write-read pattern.
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 8/30/19 1:00 PM, Peter Groesbeck wrote:
> >>>> For a producer that emits messages to a single topic (i.e. no single
> >>>> message is sent to multiple topics), will enabling idempotency but not
> >>>> transactions provide exactly once guarantees for downstream consumers
> >> of
> >>>> said topic?
> >>>>
> >>>> Ordering is not important I just want to make sure consumers only
> >>> consumer
> >>>> messages sent once.
> >>>>
> >>>
> >>>
> >>
> >
>
>


Re: Idempotent Producers and Exactly Once Consumers

2019-09-27 Thread christopher palm
I had a similar question, and just watched the video on the confluent.io
site about this.
>From what I understand idempotence and transactions are there to solve the
duplicate writes and exactly once processing, respectively.

Is what you are stating below is that this only works if we produce into a
kafka topic and consume from it via a kafka stream, but a regular
kafka consumer won't get the guarantee of exactly once processing?

Thanks,
Chris


On Sat, Aug 31, 2019 at 12:29 AM Matthias J. Sax 
wrote:

> Exactly-once on the producer will only ensure that no duplicate writes
> happen. If a downstream consumer fails, you might still read message
> multiple times for all cases (ie, without idempotence, with idempotence
> enabled, or if you use transactions).
>
> Note, that exactly-once is designed for a read-process-write pattern,
> but not for a write-read pattern.
>
> -Matthias
>
>
>
> On 8/30/19 1:00 PM, Peter Groesbeck wrote:
> > For a producer that emits messages to a single topic (i.e. no single
> > message is sent to multiple topics), will enabling idempotency but not
> > transactions provide exactly once guarantees for downstream consumers of
> > said topic?
> >
> > Ordering is not important I just want to make sure consumers only
> consumer
> > messages sent once.
> >
>
>


Re: KafkaProducer Retries in .9.0.1

2016-04-21 Thread christopher palm
Hi Nicolas,
I was able to get this working by shutting down one of the broker nodes, at
that point you can see the producer retries and then deliver the message.

Thanks,
Chris

On Wed, Apr 20, 2016 at 3:00 PM, Nicolas Phung 
wrote:

> Hello,
>
> Have you solved this ? I'm encountering the same issue with the new
> Producer on 0.9.0.1 client with a 0.9.0.1 Kafka broker. We tried the same
> various breakdown (kafka(s), zookeeper) with 0.8.2.2 client and Kafka
> broker 0.8.2.2 and the retries work as expected on the older version. I'm
> going to take a look if someone else has filed a related issue about it.
>
> Regards,
> Nicolas PHUNG
>
> On Thu, Apr 7, 2016 at 5:15 AM, christopher palm  wrote:
>
> > Hi Thanks for the suggestion.
> > I lowered the broker message.max.bytes to be smaller than my payload so
> > that I now receive an
> > org.apache.kafka.common.errors.RecordTooLargeException
> > :
> >
> > I still don't see the retries happening, the default back off is 100ms,
> and
> > my producer loops for a few seconds, long enough to trigger the retry.
> >
> > Is there something else I need to set?
> >
> > I have tried this with a sync and async producer both with same results
> >
> > Thanks,
> >
> > Chris
> >
> > On Wed, Apr 6, 2016 at 12:01 AM, Manikumar Reddy <
> > manikumar.re...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > >  Producer message size validation checks ("buffer.memory",
> > > "max.request.size" )  happens before
> > >  batching and sending messages.  Retry mechanism is applicable for
> broker
> > > side errors and network errors.
> > > Try changing "message.max.bytes" broker config property for simulating
> > > broker side error.
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Apr 6, 2016 at 9:53 AM, christopher palm 
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am working with the KafkaProducer using the properties below,
> > > > so that the producer keeps trying to send upon failure on Kafka
> .9.0.1.
> > > > I am forcing a failure by setting my buffersize smaller than my
> > > > payload,which causes the expected exception below.
> > > >
> > > > I don't see the producer retry to send on receiving this failure.
> > > >
> > > > Am I  missing something in the configuration to allow the producer to
> > > retry
> > > > on failed sends?
> > > >
> > > > Thanks,
> > > > Chris
> > > >
> > > > .java.util.concurrent.ExecutionException:
> > > > org.apache.kafka.common.errors.RecordTooLargeException: The message
> is
> > > 8027
> > > > bytes when serialized which is larger than the total memory buffer
> you
> > > have
> > > > configured with the buffer.memory configuration.
> > > >
> > > >  props.put("bootstrap.servers", bootStrapServers);
> > > >
> > > > props.put("acks", "all");
> > > >
> > > > props.put("retries", 3);//Try for 3 strikes
> > > >
> > > > props.put("batch.size", batchSize);//Need to see if this number
> should
> > > > increase under load
> > > >
> > > > props.put("linger.ms", 1);//After 1 ms fire the batch even if the
> > batch
> > > > isn't full.
> > > >
> > > > props.put("buffer.memory", buffMemorySize);
> > > >
> > > > props.put("max.block.ms",500);
> > > >
> > > > props.put("max.in.flight.requests.per.connection", 1);
> > > >
> > > > props.put("key.serializer",
> > > > "org.apache.kafka.common.serialization.StringSerializer");
> > > >
> > > > props.put("value.serializer",
> > > > "org.apache.kafka.common.serialization.ByteArraySerializer");
> > > >
> > >
> >
>


Re: KafkaProducer Retries in .9.0.1

2016-04-06 Thread christopher palm
Hi Thanks for the suggestion.
I lowered the broker message.max.bytes to be smaller than my payload so
that I now receive an org.apache.kafka.common.errors.RecordTooLargeException
:

I still don't see the retries happening, the default back off is 100ms, and
my producer loops for a few seconds, long enough to trigger the retry.

Is there something else I need to set?

I have tried this with a sync and async producer both with same results

Thanks,

Chris

On Wed, Apr 6, 2016 at 12:01 AM, Manikumar Reddy 
wrote:

> Hi,
>
>  Producer message size validation checks ("buffer.memory",
> "max.request.size" )  happens before
>  batching and sending messages.  Retry mechanism is applicable for broker
> side errors and network errors.
> Try changing "message.max.bytes" broker config property for simulating
> broker side error.
>
>
>
>
>
>
> On Wed, Apr 6, 2016 at 9:53 AM, christopher palm  wrote:
>
> > Hi All,
> >
> > I am working with the KafkaProducer using the properties below,
> > so that the producer keeps trying to send upon failure on Kafka .9.0.1.
> > I am forcing a failure by setting my buffersize smaller than my
> > payload,which causes the expected exception below.
> >
> > I don't see the producer retry to send on receiving this failure.
> >
> > Am I  missing something in the configuration to allow the producer to
> retry
> > on failed sends?
> >
> > Thanks,
> > Chris
> >
> > .java.util.concurrent.ExecutionException:
> > org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 8027
> > bytes when serialized which is larger than the total memory buffer you
> have
> > configured with the buffer.memory configuration.
> >
> >  props.put("bootstrap.servers", bootStrapServers);
> >
> > props.put("acks", "all");
> >
> > props.put("retries", 3);//Try for 3 strikes
> >
> > props.put("batch.size", batchSize);//Need to see if this number should
> > increase under load
> >
> > props.put("linger.ms", 1);//After 1 ms fire the batch even if the batch
> > isn't full.
> >
> > props.put("buffer.memory", buffMemorySize);
> >
> > props.put("max.block.ms",500);
> >
> > props.put("max.in.flight.requests.per.connection", 1);
> >
> > props.put("key.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >
> > props.put("value.serializer",
> > "org.apache.kafka.common.serialization.ByteArraySerializer");
> >
>


KafkaProducer Retries in .9.0.1

2016-04-05 Thread christopher palm
Hi All,

I am working with the KafkaProducer using the properties below,
so that the producer keeps trying to send upon failure on Kafka .9.0.1.
I am forcing a failure by setting my buffersize smaller than my
payload,which causes the expected exception below.

I don't see the producer retry to send on receiving this failure.

Am I  missing something in the configuration to allow the producer to retry
on failed sends?

Thanks,
Chris

.java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8027
bytes when serialized which is larger than the total memory buffer you have
configured with the buffer.memory configuration.

 props.put("bootstrap.servers", bootStrapServers);

props.put("acks", "all");

props.put("retries", 3);//Try for 3 strikes

props.put("batch.size", batchSize);//Need to see if this number should
increase under load

props.put("linger.ms", 1);//After 1 ms fire the batch even if the batch
isn't full.

props.put("buffer.memory", buffMemorySize);

props.put("max.block.ms",500);

props.put("max.in.flight.requests.per.connection", 1);

props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");


Re: Security with SSL and not Kerberos?

2016-03-22 Thread christopher palm
Hi Ismael,

Ok I got the basic authentication/ACL authorization for SSL working with
the principal  Kafka.example.com

If that principal isn't in the server.properties as a super user, I was
seeing errors on broker startup.

In order to add new principals, the server.properties has to be updated and
that principal user added to the super user group?

How to I run the kafka producer/consumer as a different principal other
than Kafka.example.com?

Thanks,
Chris

On Mon, Mar 21, 2016 at 6:54 PM, Ismael Juma  wrote:

> Hi Gopal,
>
> As you suspected, you have to set the appropriate ACLs for it to work. The
> following will make the producer work:
>
> kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
> --add --allow-principal
> "User:CN=kafka.example.com,OU=Client,O=Confluent,L=London,ST=London,C=GB"
> \
> --producer --topic securing-kafka
>
> The following will make the consumer work:
>
> kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
> --add --allow-principal
> "User:CN=kafka.example.com,OU=Client,O=Confluent,L=London,ST=London,C=GB"
> \
> --consumer --topic securing-kafka --group securing-kafka-group
>
> Enabling the authorizer log is a good way to figure out the principal if
> you don't know it.
>
> Hope this helps,
> Ismael
>
> On Mon, Mar 21, 2016 at 10:27 PM, Raghavan, Gopal  >
> wrote:
>
> > >Hi Christopher,
> >
> > >On Mon, Mar 21, 2016 at 3:53 PM, christopher palm 
> > wrote:
> >
> > >> Does Kafka support SSL authentication and ACL authorization without
> > >> Kerberos?
> > >>
> >
> > >Yes. The following branch modifies the blog example slightly to only
> allow
> > >SSL authentication.
> >
> > >https://github.com/confluentinc/securing-kafka-blog/tree/ssl-only
> >
> > >If so, can different clients have their own SSL certificate on the same
> > >> broker?
> > >>
> >
> > >Yes.
> >
> >
> >
> > I tried the “ssl-only” branch but am getting the following error:
> >
> > [vagrant@kafka ~]$ kafka-console-producer --broker-list
> > kafka.example.com:9093 --topic securing-kafka --producer.config
> > /etc/kafka/producer_ssl.properties
> >
> >
> >
> >
> > test
> >
> >
> >
> >
> > [2016-03-21 22:08:46,744] WARN Error while fetching metadata with
> > correlation id 0 : {securing-kafka=TOPIC_AUTHORIZATION_FAILED}
> > (org.apache.kafka.clients.NetworkClient)
> >
> >
> >
> >
> > [2016-03-21 22:08:46,748] ERROR Error when sending message to topic
> > securing-kafka with key: null, value: 4 bytes with error: Not authorized
> to
> > access topics: [securing-kafka]
> > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> >
> >
> >
> >
> > I did not set topic level ACL, since I do not know the Principal name to
> > use for --allow-principal parameter of kafka-acls
> >
> >
> > Any suggestions ?
> >
> >
> > >In reading the following security article, it seems that Kerberos is an
> > >> option but not required if SSL is used.
> > >>
> >
> > >That's right.
> >
> > >Ismael
> >
>


Security with SSL and not Kerberos?

2016-03-21 Thread christopher palm
Hi All,

Does Kafka support SSL authentication and ACL authorization without
Kerberos?

If so, can different clients have their own SSL certificate on the same
broker?

In reading the following security article, it seems that Kerberos is an
option but not required if SSL is used.

Thanks,
Chris

http://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption

"Administrators can require client authentication using either Kerberos or
Transport Layer Security (TLS) client certificates, so that Kafka brokers
know who is making each request"


Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-30 Thread christopher palm
What I found was  2 problems.
1. The producer wasn't passing in a partition key, so not all partitions
were getting data.
2. After fixing the producer, I could see all threads getting data
consistently then the shutdown method was
clearly killing the threads.
I have removed the shutdown,and with the producer changes sending in a key,
this looks like it is running correctly now.

Thanks!

On Wed, Apr 29, 2015 at 10:59 PM, tao xiao  wrote:

> The log suggests that the shutdown method were still called
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
> during the course of your program
>
> On Thu, Apr 30, 2015 at 2:23 AM, christopher palm 
> wrote:
>
> > Commenting out Example shutdown did not seem to make a difference, I
> added
> > the print statement below to highlight the fact.
> >
> > The other threads still shut down, and only one thread lives on,
> eventually
> > that dies after a few minutes as well
> >
> > Could this be that the producer default partitioner is isn't balancing
> data
> > across all partitions?
> >
> > Thanks,
> > Chris
> >
> > Thread 0: 2015-04-29
> > 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753
> >
> > Last Shutdown via example.shutDown called!
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> > ZKConsumerConnector shutting down
> >
> > 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> > scheduler
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutting down
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Stopped
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping all fetchers
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-consumergroup], Shutting down
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Stopped
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-] All connections stopped
> >
> > 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> > thread.
> >
> > Shutting down Thread: 2
> >
> > Shutting down Thread: 1
> >
> > Shutting down Thread: 3
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], ZKConsumerConnector shut down completed
> >
> > Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> > distance|-73.99021500035|40.6636611
> >
> > 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], stopping watcher executor thread for consumer
> > consumergroup
> >
> > Thread 0: 2015-04-29
> > 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009
> >
> > On Wed, Apr 29, 2015 at 10:11 AM, tao xiao  wrote:
> >
> > > example.shutdown(); in ConsumerGroupExample closes all consumer
> > connections
> > > to Kafka. remove this line the consumer threads will run forever
> > >
> > > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm 
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am trying to get a multi threaded HL consumer working against a 2
> > > broker
> > > > Kafka cluster with a 4 partition 2 replica  topic.
> > > >
> > > > The consumer code is set to run with 4 threads, one for each
> partition.
> > > >
> > > > The producer code uses the default partitioner and loops indefinitely
> > > > feeding events into the topic.(I excluded the while loop in the paste
> > > > below)
> > > >
> > > > What I see is the threads eventually all exit, even t

Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Commenting out Example shutdown did not seem to make a difference, I added
the print statement below to highlight the fact.

The other threads still shut down, and only one thread lives on, eventually
that dies after a few minutes as well

Could this be that the producer default partitioner is isn't balancing data
across all partitions?

Thanks,
Chris

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
scheduler

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping leader finder thread

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping all fetchers

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-consumergroup], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-] All connections stopped

15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.

Shutting down Thread: 2

Shutting down Thread: 1

Shutting down Thread: 3

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], ZKConsumerConnector shut down completed

Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
distance|-73.99021500035|40.6636611

15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], stopping watcher executor thread for consumer consumergroup

Thread 0: 2015-04-29
12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009

On Wed, Apr 29, 2015 at 10:11 AM, tao xiao  wrote:

> example.shutdown(); in ConsumerGroupExample closes all consumer connections
> to Kafka. remove this line the consumer threads will run forever
>
> On Wed, Apr 29, 2015 at 9:42 PM, christopher palm 
> wrote:
>
> > Hi All,
> >
> > I am trying to get a multi threaded HL consumer working against a 2
> broker
> > Kafka cluster with a 4 partition 2 replica  topic.
> >
> > The consumer code is set to run with 4 threads, one for each partition.
> >
> > The producer code uses the default partitioner and loops indefinitely
> > feeding events into the topic.(I excluded the while loop in the paste
> > below)
> >
> > What I see is the threads eventually all exit, even thought the producer
> is
> > still sending events into the topic.
> >
> > My understanding is that the consumer thread per partition is the correct
> > setup.
> >
> > Any ideas why this code doesn't continue to consume events at they are
> > pushed to the topic?
> >
> > I suspect I am configuring something wrong here, but am not sure what.
> >
> > Thanks,
> >
> > Chris
> >
> >
> > *T**opic Configuration*
> >
> > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> >
> > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > *Producer Code:*
> >
> >  Properties props = new Properties();
> >
> > props.put("metadata.broker.list", args[0]);
> >
> > props.put("zk.connect", args[1]);
> >
> > props.put("serializer.class", "kafka.serializer.StringEncoder");
> >
> > props.put("request.required.acks", "1");
> >
> > String TOPIC = args[2];
> >
> > ProducerConfig config = new ProducerConfig(props);
> >
> > Producer producer = new Producer(
> > config);
> >
> > finalEvent = new Timestamp(new Date().getTime()) + "|"
> >
> > + truckIds[0] + "|" + driverIds[0] + "|" +
> > events[random
> > .nextInt(evtCnt)]
> >
> >   

MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Hi All,

I am trying to get a multi threaded HL consumer working against a 2 broker
Kafka cluster with a 4 partition 2 replica  topic.

The consumer code is set to run with 4 threads, one for each partition.

The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste below)

What I see is the threads eventually all exit, even thought the producer is
still sending events into the topic.

My understanding is that the consumer thread per partition is the correct
setup.

Any ideas why this code doesn't continue to consume events at they are
pushed to the topic?

I suspect I am configuring something wrong here, but am not sure what.

Thanks,

Chris


*T**opic Configuration*

Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

*Producer Code:*

 Properties props = new Properties();

props.put("metadata.broker.list", args[0]);

props.put("zk.connect", args[1]);

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("request.required.acks", "1");

String TOPIC = args[2];

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(
config);

finalEvent = new Timestamp(new Date().getTime()) + "|"

+ truckIds[0] + "|" + driverIds[0] + "|" + events[random
.nextInt(evtCnt)]

+ "|" + getLatLong(arrayroute17[i]);

try {

KeyedMessage data = new
KeyedMessage(TOPIC, finalEvent);

LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
msg:" + finalEvent);

producer.send(data);

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

}


*Consumer Code:*

public class ConsumerTest implements Runnable {

   private KafkaStream m_stream;

   private int m_threadNumber;

   public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

   m_threadNumber = a_threadNumber;

   m_stream = a_stream;

   }

   public void run() {

   ConsumerIterator it = m_stream.iterator();

   while (it.hasNext()){

   System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));

   try {

 Thread.sleep(1000);

}catch (InterruptedException e) {

 e.printStackTrace();

 }

   }

   System.out.println("Shutting down Thread: " + m_threadNumber);

   }

}

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private  ExecutorService executor;



public ConsumerGroupExample(String a_zookeeper, String a_groupId,
String a_topic) {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig(a_zookeeper, a_groupId));

this.topic = a_topic;

}



public void shutdown() {

if (consumer != null) consumer.shutdown();

if (executor != null) executor.shutdown();

try {

if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

System.out.println("Timed out waiting for consumer threads
to shut down, exiting uncleanly");

}

} catch (InterruptedException e) {

System.out.println("Interrupted during shutdown, exiting
uncleanly");

}

   }



public void run(int a_numThreads) {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(a_numThreads));

Map>> consumerMap =
consumer.createMessageStreams(topicCountMap);

List> streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(a_numThreads);

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.submit(new ConsumerTest(stream, threadNumber));

threadNumber++;

}

}



private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {

Properties props = new Properties();

props.put("zookeeper.connect", a_zookeeper);

props.put("group.id", a_groupId);

props.put("zookeeper.session.timeout.ms", "400");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("consumer.timeout.ms", "-1");

 return new ConsumerConfig(props);

}



public static void main(String[] args) {

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = Integer.parseInt(