This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43f603cfb7e KAFKA-19351: AsyncConsumer#commitAsync should copy the 
input offsets (#19855)
43f603cfb7e is described below

commit 43f603cfb7ef52d9bb84c2d3c580e8020bc4577a
Author: Lan Ding <[email protected]>
AuthorDate: Fri May 30 16:36:38 2025 +0800

    KAFKA-19351: AsyncConsumer#commitAsync should copy the input offsets 
(#19855)
    
    `AsyncConsumer#commitAsync` and `AsyncConsumer#commitSync` should copy
    the input offsets.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumer.java      | 12 ++++---
 .../consumer/internals/AsyncKafkaConsumer.java     |  8 ++---
 .../consumer/internals/AsyncKafkaConsumerTest.java | 40 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3a06e71335d..0e7d3d65b1f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1003,7 +1003,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
      *
-     * @param offsets A map of offsets by partition with associated metadata
+     * @param offsets A map of offsets by partition with associated metadata. 
This map will be copied internally, so it
+     *                is safe to mutate the map after returning.
      * @throws org.apache.kafka.clients.consumer.CommitFailedException if the 
commit failed and cannot be retried.
      *             This can only occur if you are using automatic group 
management with {@link #subscribe(Collection)},
      *             or if there is an active group with the same 
<code>group.id</code> which is using group management. In such cases,
@@ -1054,7 +1055,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
      *
-     * @param offsets A map of offsets by partition with associated metadata
+     * @param offsets A map of offsets by partition with associated metadata. 
This map will be copied internally, so it
+     *                is safe to mutate the map after returning.
      * @param timeout The maximum amount of time to await completion of the 
offset commit
      * @throws org.apache.kafka.clients.consumer.CommitFailedException if the 
commit failed and cannot be retried.
      *             This can only occur if you are using automatic group 
management with {@link #subscribe(Collection)},
@@ -1143,7 +1145,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * offsets committed through this API are guaranteed to complete before a 
subsequent call to {@link #commitSync()}
      * (and variants) returns.
      *
-     * @param offsets A map of offsets by partition with associate metadata. 
This map will be copied internally, so it
+     * @param offsets A map of offsets by partition with associated metadata. 
This map will be copied internally, so it
      *                is safe to mutate the map after returning.
      * @param callback Callback to invoke when the commit completes
      * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer is using the classic group protocol
@@ -1563,7 +1565,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @param timestampsToSearch the mapping from partition to the timestamp 
to look up.
      *
      * @return a mapping from partition to the timestamp and offset of the 
first message with timestamp greater
-     *         than or equal to the target timestamp. If the timestamp and 
offset for a specific partition cannot be found within 
+     *         than or equal to the target timestamp. If the timestamp and 
offset for a specific partition cannot be found within
      *         the default timeout, and no corresponding message exists, the 
entry in the returned map will be {@code null}
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
@@ -1590,7 +1592,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @param timeout The maximum amount of time to await retrieval of the 
offsets
      *
      * @return a mapping from partition to the timestamp and offset of the 
first message with timestamp greater
-     *         than or equal to the target timestamp. If the timestamp and 
offset for a specific partition cannot be found within 
+     *         than or equal to the target timestamp. If the timestamp and 
offset for a specific partition cannot be found within
      *         timeout, and no corresponding message exists, the entry in the 
returned map will be {@code null}
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e301b6855c6..29843c765c3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -385,7 +385,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             setGroupAssignmentSnapshot(partitions);
         }
     };
-    
+
     public AsyncKafkaConsumer(final ConsumerConfig config,
                               final Deserializer<K> keyDeserializer,
                               final Deserializer<V> valueDeserializer,
@@ -927,7 +927,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
-        commitAsync(Optional.of(offsets), callback);
+        commitAsync(Optional.of(new HashMap<>(offsets)), callback);
     }
 
     private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>> 
offsets, OffsetCommitCallback callback) {
@@ -1599,12 +1599,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        commitSync(Optional.of(offsets), defaultApiTimeoutMs);
+        commitSync(Optional.of(new HashMap<>(offsets)), defaultApiTimeoutMs);
     }
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
Duration timeout) {
-        commitSync(Optional.of(offsets), timeout);
+        commitSync(Optional.of(new HashMap<>(offsets)), timeout);
     }
 
     private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>> 
offsets, Duration timeout) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index bb9482aa636..ec5b6b1f6f8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -357,6 +357,26 @@ public class AsyncKafkaConsumerTest {
         assertSame(exception.getClass(), callback.exception.getClass());
     }
 
+    @Test
+    public void testCommitAsyncShouldCopyOffsets() {
+        consumer = newConsumer();
+
+        TopicPartition tp = new TopicPartition("t0", 2);
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp, new OffsetAndMetadata(10L));
+
+        markOffsetsReadyForCommitEvent();
+        consumer.commitAsync(offsets, null);
+
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        assertTrue(commitEvent.offsets().isPresent());
+        assertTrue(commitEvent.offsets().get().containsKey(tp));
+        offsets.remove(tp);
+        assertTrue(commitEvent.offsets().get().containsKey(tp));
+    }
+
     private static Stream<Exception> commitExceptionSupplier() {
         return Stream.of(
                 new KafkaException("Test exception"),
@@ -590,6 +610,26 @@ public class AsyncKafkaConsumerTest {
         assertDoesNotThrow(() -> 
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), 
Duration.ofMillis(100)));
     }
 
+    @Test
+    public void testCommitSyncShouldCopyOffsets() {
+        consumer = newConsumer();
+
+        TopicPartition tp = new TopicPartition("t0", 2);
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp, new OffsetAndMetadata(10L));
+
+        completeCommitSyncApplicationEventSuccessfully();
+        consumer.commitSync(offsets);
+
+        final ArgumentCaptor<SyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(SyncCommitEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final SyncCommitEvent commitEvent = commitEventCaptor.getValue();
+        assertTrue(commitEvent.offsets().isPresent());
+        assertTrue(commitEvent.offsets().get().containsKey(tp));
+        offsets.remove(tp);
+        assertTrue(commitEvent.offsets().get().containsKey(tp));
+    }
+
     private CompletableFuture<Void> 
setUpConsumerWithIncompleteAsyncCommit(TopicPartition tp) {
         time = new MockTime(1);
         consumer = newConsumer();

Reply via email to