[ 
https://issues.apache.org/jira/browse/KAFKA-3569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Zoller updated KAFKA-3569:
-------------------------------
    Description: 
I have a KafkaConsumer instance I've wrapped in a thread, which communicates 
with the outside (multi-threaded) world via a blocking queue.  Code is here:

https://gist.github.com/gzoller/93fe2392fd3606bcb3b879e4ab2f8f6e

I'm not worried about batch commits at this point and want to understand 
single-message commit behavior first.  If I commitSync() a single message it is 
"slow" but is consistent--doesn't drop commits.

If I use commitAsync() its "fast" but I get flakey results--it drops commits, 
even for small numbers.

I pre-loaded a 4-partition topic with 12 messages--3 per partition.  Then I use 
this code across 2 consumers (each with their own instance of this class, hence 
their own thread).  One consumer winds up listening on 2 partitions and the 
other on the remaining 2.  

Read logs confirm the poll() behavior/content is working as expected for the 2 
consumers, meaning each of the 2 consumers is successfully seeing (and only 
seeing) messages from their respectively assigned partitions.

Some of the 12 messages committed fine, while others report errors like this 
one in the callback:

ERROR [{lowercaseStrings-2=OffsetAndMetadata{offset=1, metadata=''}}]:  
org.apache.kafka.clients.consumer.internals.SendFailedExceptionERROR

My final offsets after my test run of 12:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
group1, lowercaseStrings, 0, 2, 3, 1, consumer-1_/192.168.99.1
group1, lowercaseStrings, 1, unknown, 3, unknown, consumer-1_/192.168.99.1
group1, lowercaseStrings, 2, unknown, 3, unknown, consumer-2_/192.168.99.1
group1, lowercaseStrings, 3, 2, 3, 1, consumer-2_/192.168.99.1

The "missing" offsets correspond to the ones that produced errors, so all 
messages are accounted for, either by success or by error.

At high volumes the behavior is the same.  Over 1 million messages I'll drop 
30K-60K of them due to these same kinds of errors, while the other commit 
successfully.  The speed difference is profound, though!  commitSync() takes 
several minutes for 1m, but drops none.  commitAsync() takes maybe 5 seconds 
with losses.

I noted there has been some work done in this area in 0.10.1.0 (for example 
SendFailedException doesn't seem to be in the code anymore) and was eager to 
see if the problem persists, but I'm having KafkaProducer problems in 0.10.1.0 
and haven't been able to see if this behavior remains or not.

  was:
I have a KafkaConsumer instance I've wrapped in a thread, which communicates 
with the outside (multi-threaded) world via a blocking queue.  Code is here:

https://gist.github.com/gzoller/93fe2392fd3606bcb3b879e4ab2f8f6e

I'm not worried about batch commits at this point--I want to understand 
single-message commit behavior first.  If I commitSync() a single message it is 
"slow" but is consistent--doesn't drop commits.

If I use commitAsync() its "fast" but I get flakey results--it drops commits, 
even for small numbers.

I pre-loaded a 4-partition topic with 12 messages--3 per partition.  Then I use 
this code across 2 consumers (each with their own instance of this class, hence 
their own thread).  One consumer winds up listening on 2 partitions and the 
other on the remaining 2.  

Read logs confirm the poll() behavior/content is working as expected for the 2 
consumers, meaning each of the 2 consumers is successfully seeing (and only 
seeing) messages from their respectively assigned partitions.

Some of the 12 messages committed fine, while others report errors like this 
one in the callback:

ERROR [{lowercaseStrings-2=OffsetAndMetadata{offset=1, metadata=''}}]:  
org.apache.kafka.clients.consumer.internals.SendFailedExceptionERROR

My final offsets after my test run of 12:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
group1, lowercaseStrings, 0, 2, 3, 1, consumer-1_/192.168.99.1
group1, lowercaseStrings, 1, unknown, 3, unknown, consumer-1_/192.168.99.1
group1, lowercaseStrings, 2, unknown, 3, unknown, consumer-2_/192.168.99.1
group1, lowercaseStrings, 3, 2, 3, 1, consumer-2_/192.168.99.1

The "missing" offsets correspond to the ones that produced errors, so all 
messages are accounted for, either by success or by error.

At high volumes the behavior is the same.  Over 1 million messages I'll drop 
30K-60K of them due to these same kinds of errors, while the other commit 
successfully.  The speed difference is profound, though!  commitSync() takes 
several minutes for 1m, but drops none.  commitAsync() takes maybe 5 seconds 
with losses.

I noted there has been some work done in this area in 0.10.1.0 (for example 
SendFailedException doesn't seem to be in the code anymore) and was eager to 
see if the problem persists, but I'm having KafkaProducer problems in 0.10.1.0 
and haven't been able to see if this behavior remains or not.


> commitAsync() sometimes fails with errors
> -----------------------------------------
>
>                 Key: KAFKA-3569
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3569
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1
>         Environment: MacOS Docker
>            Reporter: Greg Zoller
>              Labels: clients
>
> I have a KafkaConsumer instance I've wrapped in a thread, which communicates 
> with the outside (multi-threaded) world via a blocking queue.  Code is here:
> https://gist.github.com/gzoller/93fe2392fd3606bcb3b879e4ab2f8f6e
> I'm not worried about batch commits at this point and want to understand 
> single-message commit behavior first.  If I commitSync() a single message it 
> is "slow" but is consistent--doesn't drop commits.
> If I use commitAsync() its "fast" but I get flakey results--it drops commits, 
> even for small numbers.
> I pre-loaded a 4-partition topic with 12 messages--3 per partition.  Then I 
> use this code across 2 consumers (each with their own instance of this class, 
> hence their own thread).  One consumer winds up listening on 2 partitions and 
> the other on the remaining 2.  
> Read logs confirm the poll() behavior/content is working as expected for the 
> 2 consumers, meaning each of the 2 consumers is successfully seeing (and only 
> seeing) messages from their respectively assigned partitions.
> Some of the 12 messages committed fine, while others report errors like this 
> one in the callback:
> ERROR [{lowercaseStrings-2=OffsetAndMetadata{offset=1, metadata=''}}]:  
> org.apache.kafka.clients.consumer.internals.SendFailedExceptionERROR
> My final offsets after my test run of 12:
> GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
> group1, lowercaseStrings, 0, 2, 3, 1, consumer-1_/192.168.99.1
> group1, lowercaseStrings, 1, unknown, 3, unknown, consumer-1_/192.168.99.1
> group1, lowercaseStrings, 2, unknown, 3, unknown, consumer-2_/192.168.99.1
> group1, lowercaseStrings, 3, 2, 3, 1, consumer-2_/192.168.99.1
> The "missing" offsets correspond to the ones that produced errors, so all 
> messages are accounted for, either by success or by error.
> At high volumes the behavior is the same.  Over 1 million messages I'll drop 
> 30K-60K of them due to these same kinds of errors, while the other commit 
> successfully.  The speed difference is profound, though!  commitSync() takes 
> several minutes for 1m, but drops none.  commitAsync() takes maybe 5 seconds 
> with losses.
> I noted there has been some work done in this area in 0.10.1.0 (for example 
> SendFailedException doesn't seem to be in the code anymore) and was eager to 
> see if the problem persists, but I'm having KafkaProducer problems in 
> 0.10.1.0 and haven't been able to see if this behavior remains or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to