[
https://issues.apache.org/jira/browse/KAFKA-4951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
cuiyang updated KAFKA-4951:
---------------------------
Description:
I foud that KafkaProducer may send duplicated message sometimes, which is
happend when:
In Sender thread:
NetworkClient::poll()
-> this.selector.poll() //write the message, such as "abc";
-> handleTimedOutRequests(responses,updatedNow); //Judge whether the
message "abc" which is sent above is expired or timeout, and the judge is
based on the parameter this.requestTimeoutMs and updatedNow;
-> response.request().callback().onComplete()
->
completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If
themessage was judged as expired, then it will be reenqueued and send repeatly
next loop;
-> this.accumulator.reenqueue(batch,now);
The problem comes out: If the message "abc" is sent successfully to broker,
but it may be judged to expired, so the message will be sent repeately next
loop, which make the message duplicated.
I can reproduce this scenario normally.
In my opinion, I think Send::handleTimedOutRequests() is not much useful,
because the response of sending request from broker is succesfully and has no
error. And this function will induce to the duplicated message problems.
was:
I foud that KafkaProducer may writes duplicated message sometimes, which is
happend when:
In Sender thread:
NetworkClient::poll()
-> this.selector.poll() //write the message, such as "abc";
-> handleTimedOutRequests(responses,updatedNow); //Judge whether the
message "abc" which is sent above is expired or timeout, and the judge is
based on the parameter this.requestTimeoutMs and updatedNow;
-> response.request().callback().onComplete()
->
completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If
themessage was judged as expired, then it will be reenqueued and send repeatly
next loop;
-> this.accumulator.reenqueue(batch,now);
The problem comes out: If the message "abc" is sent successfully to broker,
but it may be judged to expired, so the message will be sent repeately next
loop, which make the message duplicated.
I can reproduce this scenario normally.
In my opinion, I think Send::handleTimedOutRequests() is not much useful,
because the response of sending request from broker is succesfully and has no
error. And this function will induce to the duplicated message problems.
> KafkaProducer may send duplicated message sometimes
> ---------------------------------------------------
>
> Key: KAFKA-4951
> URL: https://issues.apache.org/jira/browse/KAFKA-4951
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.9.0.1
> Reporter: cuiyang
>
> I foud that KafkaProducer may send duplicated message sometimes, which is
> happend when:
> In Sender thread:
> NetworkClient::poll()
> -> this.selector.poll() //write the message, such as "abc";
> -> handleTimedOutRequests(responses,updatedNow); //Judge whether
> the message "abc" which is sent above is expired or timeout, and the judge
> is based on the parameter this.requestTimeoutMs and updatedNow;
> -> response.request().callback().onComplete()
> ->
> completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If
> themessage was judged as expired, then it will be reenqueued and send
> repeatly next loop;
> -> this.accumulator.reenqueue(batch,now);
> The problem comes out: If the message "abc" is sent successfully to broker,
> but it may be judged to expired, so the message will be sent repeately next
> loop, which make the message duplicated.
> I can reproduce this scenario normally.
> In my opinion, I think Send::handleTimedOutRequests() is not much useful,
> because the response of sending request from broker is succesfully and has no
> error. And this function will induce to the duplicated message problems.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)