[ https://issues.apache.org/jira/browse/KAFKA-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712189#comment-16712189 ]
ASF GitHub Bot commented on KAFKA-7709: --------------------------------------- guozhangwang closed pull request #6005: KAFKA-7709: Fix ConcurrentModificationException when retrieving expired inflight batches on multiple partitions. URL: https://github.com/apache/kafka/pull/6005 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 19d7af2e7a0..644f45697a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -174,8 +174,9 @@ public void maybeRemoveFromInflightBatches(ProducerBatch batch) { */ private List<ProducerBatch> getExpiredInflightBatches(long now) { List<ProducerBatch> expiredBatches = new ArrayList<>(); - for (Map.Entry<TopicPartition, List<ProducerBatch>> entry : inFlightBatches.entrySet()) { - TopicPartition topicPartition = entry.getKey(); + + for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt = inFlightBatches.entrySet().iterator(); batchIt.hasNext();) { + Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next(); List<ProducerBatch> partitionInFlightBatches = entry.getValue(); if (partitionInFlightBatches != null) { Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator(); @@ -197,8 +198,9 @@ public void maybeRemoveFromInflightBatches(ProducerBatch batch) { break; } } - if (partitionInFlightBatches.isEmpty()) - inFlightBatches.remove(topicPartition); + if (partitionInFlightBatches.isEmpty()) { + batchIt.remove(); + } } } return expiredBatches; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 606637e4898..b3146ddc6e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -2079,6 +2079,44 @@ public void testResetNextBatchExpiry() throws Exception { } + @Test + public void testExpiredBatchesInMultiplePartitions() throws Exception { + long deliveryTimeoutMs = 1500L; + setupWithTransactionState(null, true, null); + + // Send multiple ProduceRequest across multiple partitions. + Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "k1".getBytes(), "v1".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + Future<RecordMetadata> request2 = accumulator.append(tp1, time.milliseconds(), "k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + + // Send request. + sender.run(time.milliseconds()); + assertEquals(1, client.inFlightRequestCount()); + assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size()); + + Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); + responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); + client.respond(new ProduceResponse(responseMap)); + + // Successfully expire both batches. + time.sleep(deliveryTimeoutMs); + sender.run(time.milliseconds()); + assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size()); + + try { + request1.get(); + fail("The expired batch should throw a TimeoutException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + + try { + request2.get(); + fail("The expired batch should throw a TimeoutException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + } + private class MatchingBufferPool extends BufferPool { IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ConcurrentModificationException occurs when iterating through multiple > partitions in Sender.getExpiredInflightBatches > --------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7709 > URL: https://issues.apache.org/jira/browse/KAFKA-7709 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 2.1.0 > Reporter: Mark Cho > Priority: Major > > In Sender.getExpiredInflightBatches method, delivery.timeout on multiple > partitions causes ConcurrentModificationException due to the underlying Java > collection being mutated while being iterated on. > In Java HashMap, you cannot mutate the underlying map while iterating through > it, as this will cause ConcurrentModificationException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)