[ 
https://issues.apache.org/jira/browse/KAFKA-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712189#comment-16712189
 ] 

ASF GitHub Bot commented on KAFKA-7709:
---------------------------------------

guozhangwang closed pull request #6005: KAFKA-7709: Fix 
ConcurrentModificationException when retrieving expired inflight batches on 
multiple partitions.
URL: https://github.com/apache/kafka/pull/6005
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19d7af2e7a0..644f45697a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -174,8 +174,9 @@ public void maybeRemoveFromInflightBatches(ProducerBatch 
batch) {
      */
     private List<ProducerBatch> getExpiredInflightBatches(long now) {
         List<ProducerBatch> expiredBatches = new ArrayList<>();
-        for (Map.Entry<TopicPartition, List<ProducerBatch>> entry : 
inFlightBatches.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
+
+        for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt 
= inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {
+            Map.Entry<TopicPartition, List<ProducerBatch>> entry = 
batchIt.next();
             List<ProducerBatch> partitionInFlightBatches = entry.getValue();
             if (partitionInFlightBatches != null) {
                 Iterator<ProducerBatch> iter = 
partitionInFlightBatches.iterator();
@@ -197,8 +198,9 @@ public void maybeRemoveFromInflightBatches(ProducerBatch 
batch) {
                         break;
                     }
                 }
-                if (partitionInFlightBatches.isEmpty())
-                    inFlightBatches.remove(topicPartition);
+                if (partitionInFlightBatches.isEmpty()) {
+                    batchIt.remove();
+                }
             }
         }
         return expiredBatches;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 606637e4898..b3146ddc6e5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2079,6 +2079,44 @@ public void testResetNextBatchExpiry() throws Exception {
 
     }
 
+    @Test
+    public void testExpiredBatchesInMultiplePartitions() throws Exception {
+        long deliveryTimeoutMs = 1500L;
+        setupWithTransactionState(null, true, null);
+
+        // Send multiple ProduceRequest across multiple partitions.
+        Future<RecordMetadata> request1 = accumulator.append(tp0, 
time.milliseconds(), "k1".getBytes(), "v1".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> request2 = accumulator.append(tp1, 
time.milliseconds(), "k2".getBytes(), "v2".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        // Send request.
+        sender.run(time.milliseconds());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals("Expect one in-flight batch in accumulator", 1, 
sender.inFlightBatches(tp0).size());
+
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
+        responseMap.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+        client.respond(new ProduceResponse(responseMap));
+
+        // Successfully expire both batches.
+        time.sleep(deliveryTimeoutMs);
+        sender.run(time.milliseconds());
+        assertEquals("Expect zero in-flight batch in accumulator", 0, 
sender.inFlightBatches(tp0).size());
+
+        try {
+            request1.get();
+            fail("The expired batch should throw a TimeoutException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+
+        try {
+            request2.get();
+            fail("The expired batch should throw a TimeoutException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
     private class MatchingBufferPool extends BufferPool {
         IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConcurrentModificationException occurs when iterating through multiple 
> partitions in Sender.getExpiredInflightBatches
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7709
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7709
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.1.0
>            Reporter: Mark Cho
>            Priority: Major
>
> In Sender.getExpiredInflightBatches method, delivery.timeout on multiple 
> partitions causes ConcurrentModificationException due to the underlying Java 
> collection being mutated while being iterated on.
> In Java HashMap, you cannot mutate the underlying map while iterating through 
> it, as this will cause ConcurrentModificationException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to