This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b4952ba  KAFKA-8061; Handle concurrent ProducerId reset and call to 
Sender thread shutdown (#6388)
b4952ba is described below

commit b4952ba7860235dedf7255d750e1b7418b23e46a
Author: Manikumar Reddy <manikumar.re...@gmail.com>
AuthorDate: Fri Mar 8 03:58:21 2019 +0530

    KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread 
shutdown (#6388)
    
    In KAFKA-5503, we have added a check  for `running` flag in the loop inside 
maybeWaitForProducerId.  This is to handle concurrent call to Sender close(), 
while we attempt to get the ProducerId. This avoids blocking indefinitely when 
the producer is shutting down.
    
    This created a corner case, where Sender thread gets blocked, if we had 
concurrent producerId reset and call to Sender thread close. The fix here is to 
check the `forceClose` flag in the loop inside maybeWaitForProducerId instead 
of the `running` flag.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/producer/internals/Sender.java   |  2 +-
 .../clients/producer/internals/SenderTest.java     | 75 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1879d6f..c1fc40b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -494,7 +494,7 @@ public class Sender implements Runnable {
     }
 
     private void maybeWaitForProducerId() {
-        while (running && !transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
+        while (!forceClose && !transactionManager.hasProducerId() && 
!transactionManager.hasError()) {
             Node node = null;
             try {
                 node = awaitLeastLoadedNodeReady(requestTimeoutMs);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5bef134..ffc0763 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -81,6 +81,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -1201,6 +1202,80 @@ public class SenderTest {
     }
 
     @Test
+    public void testCloseWithProducerIdReset() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new 
ProducerIdAndEpoch(producerId, (short) 0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, 
apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, 
time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, 
Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.initiateClose(); // initiate close
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an 
OutOfOrderSequenceException", transactionManager.hasProducerId());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                prepareInitPidResponse(Errors.NONE, producerId + 1, (short) 1);
+                sender.run(time.milliseconds());
+                return !accumulator.hasUndrained();
+            }
+        }, 5000, "Failed to drain batches");
+    }
+
+    @Test
+    public void testForceCloseWithProducerIdReset() throws Exception {
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(1L, 
(short) 0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, 
apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, 
time.milliseconds(), "key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, 
Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, 
Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an 
OutOfOrderSequenceException", transactionManager.hasProducerId());
+        sender.forceClose(); // initiate force close
+        sender.run(time.milliseconds()); // this should not block
+        sender.run(); // run main loop to test forceClose flag
+        assertTrue("Pending batches are not aborted.", 
!accumulator.hasUndrained());
+        assertTrue(successfulResponse.isDone());
+    }
+
+    @Test
     public void 
testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry()
 throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();

Reply via email to