This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d086e83 KAFKA-5503; Idempotent producer ignores shutdown while
fetching ProducerId (#5881)
d086e83 is described below
commit d086e83fecbf211695868f6aebe6a80b68ff1c50
Author: layfe <[email protected]>
AuthorDate: Thu Jan 3 02:00:40 2019 +0300
KAFKA-5503; Idempotent producer ignores shutdown while fetching ProducerId
(#5881)
Check `running` in `Sender.maybeWaitForProducerId` to ensure that the
producer can be closed while awaiting initialization of the producerId.
Reviewers: Jason Gustafson <[email protected]>
---
.../main/java/org/apache/kafka/clients/producer/internals/Sender.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 644f456..1879d6f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -494,7 +494,7 @@ public class Sender implements Runnable {
}
private void maybeWaitForProducerId() {
- while (!transactionManager.hasProducerId() &&
!transactionManager.hasError()) {
+ while (running && !transactionManager.hasProducerId() &&
!transactionManager.hasError()) {
Node node = null;
try {
node = awaitLeastLoadedNodeReady(requestTimeoutMs);