[
https://issues.apache.org/jira/browse/KAFKA-4951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
cuiyang updated KAFKA-4951:
---------------------------
Description:
I foud that KafkaProducer may send duplicated message sometimes, which is
happend when:
In Sender thread:
NetworkClient::poll()
-> this.selector.poll() //send the message, such as "abc", and send
it to broker successfully
-> handleTimedOutRequests(responses,updatedNow); //Judge whether the
message "abc" which is sent above is expired or timeout, and the judge is
based on the parameter this.requestTimeoutMs and updatedNow;
-> response.request().callback().onComplete()
->
completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If
themessage was judged as expired, then it will be reenqueued and send repeatly
next loop;
-> this.accumulator.reenqueue(batch,now);
The problem comes out: If the message "abc" is sent successfully to broker,
but it may be judged to expired, so the message will be sent repeately next
loop, which make the message duplicated.
I can reproduce this scenario normally.
In my opinion, I think Send::handleTimedOutRequests() is not much useful,
because the response of sending request from broker is succesfully and has no
error, which means brokers persist it successfully. And this function will
induce to the duplicated message problems.
was:
I foud that KafkaProducer may send duplicated message sometimes, which is
happend when:
In Sender thread:
NetworkClient::poll()
-> this.selector.poll() //send the message, such as "abc", and send
it to broker successfully
-> handleTimedOutRequests(responses,updatedNow); //Judge whether the
message "abc" which is sent above is expired or timeout, and the judge is
based on the parameter this.requestTimeoutMs and updatedNow;
-> response.request().callback().onComplete()
->
completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If
themessage was judged as expired, then it will be reenqueued and send repeatly
next loop;
-> this.accumulator.reenqueue(batch,now);
The problem comes out: If the message "abc" is sent successfully to broker,
but it may be judged to expired, so the message will be sent repeately next
loop, which make the message duplicated.
I can reproduce this scenario normally.
In my opinion, I think Send::handleTimedOutRequests() is not much useful,
because the response of sending request from broker is succesfully and has no
error. And this function will induce to the duplicated message problems.
> KafkaProducer may send duplicated message sometimes
> ---------------------------------------------------
>
> Key: KAFKA-4951
> URL: https://issues.apache.org/jira/browse/KAFKA-4951
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.9.0.1
> Reporter: cuiyang
>
> I foud that KafkaProducer may send duplicated message sometimes, which is
> happend when:
> In Sender thread:
> NetworkClient::poll()
> -> this.selector.poll() //send the message, such as "abc", and
> send it to broker successfully
> -> handleTimedOutRequests(responses,updatedNow); //Judge whether
> the message "abc" which is sent above is expired or timeout, and the judge
> is based on the parameter this.requestTimeoutMs and updatedNow;
> -> response.request().callback().onComplete()
> ->
> completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If
> themessage was judged as expired, then it will be reenqueued and send
> repeatly next loop;
> -> this.accumulator.reenqueue(batch,now);
> The problem comes out: If the message "abc" is sent successfully to broker,
> but it may be judged to expired, so the message will be sent repeately next
> loop, which make the message duplicated.
> I can reproduce this scenario normally.
> In my opinion, I think Send::handleTimedOutRequests() is not much useful,
> because the response of sending request from broker is succesfully and has no
> error, which means brokers persist it successfully. And this function will
> induce to the duplicated message problems.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)