This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e63f23718fd KAFKA-15174: Ensure CommitAsync propagate the exception to 
the user (#14680)
e63f23718fd is described below

commit e63f23718fdec514582fe1107536c1d2bacf72a1
Author: Philip Nee <p...@confluent.io>
AuthorDate: Mon Nov 20 00:15:48 2023 -0800

    KAFKA-15174: Ensure CommitAsync propagate the exception to the user (#14680)
    
    The commit covers a few important points:
    
    - Exception handling: We should be thrown RetriableCommitException when the 
commit exception is retriable. We should throw FencedIdException on commit and 
poll similar to the current implementation. Other errors should be thrown as it 
is.
    - Callback invocation: The callbacks need to be invoked on the 
main/application thread; however, the future is completed in the background 
thread. To achieve this, I created an Invoker class with a queue, so that this 
callback can be invoked during the consumer.poll()
    Note: One change I made is to remove the DefaultOffsetCommit callback. 
Since the callback is purely for logging, I think it is reasonable for us to 
move the logging to the background thread instead of relying on the application 
thread to trigger the logging.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 123 +++++++++++++++---
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   4 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 137 +++++++++++++++++++--
 .../kafka/api/BaseAsyncConsumerTest.scala          |  10 +-
 4 files changed, 237 insertions(+), 37 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 6746485a7ed..848806e2395 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -38,12 +38,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor;
-import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
@@ -58,8 +59,10 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Metrics;
@@ -156,6 +159,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     // to keep from repeatedly scanning subscriptions in poll(), cache the 
result during metadata updates
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
+    private boolean isFenced = false;
+    private final Optional<String> groupInstanceId;
+
+    private final OffsetCommitCallbackInvoker invoker = new 
OffsetCommitCallbackInvoker();
 
     // currentThread holds the threadId of the current thread accessing the 
AsyncKafkaConsumer
     // and is used to prevent multithreaded access
@@ -168,7 +175,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         try {
             GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
                     GroupRebalanceConfig.ProtocolType.CONSUMER);
-
+            this.groupInstanceId = groupRebalanceConfig.groupInstanceId;
             this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
             this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
@@ -306,6 +313,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.applicationEventHandler = applicationEventHandler;
         this.assignors = assignors;
         this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
"consumer");
+        this.groupInstanceId = Optional.empty();
     }
 
     // Visible for testing
@@ -332,6 +340,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
         this.assignors = assignors;
+        this.groupInstanceId = 
Optional.ofNullable(config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
 
         ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
         FetchMetricsManager fetchMetricsManager = new 
FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
@@ -457,15 +466,15 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             CompletableFuture<Void> future = commit(offsets, false);
-            final OffsetCommitCallback commitCallback = callback == null ? new 
DefaultOffsetCommitCallback() : callback;
             future.whenComplete((r, t) -> {
-                if (t != null) {
-                    commitCallback.onComplete(offsets, new KafkaException(t));
-                } else {
-                    commitCallback.onComplete(offsets, null);
+                if (callback == null) {
+                    if (t != null) {
+                        log.error("Offset commit with offsets {} failed", 
offsets, t);
+                    }
+                    return;
                 }
-            }).exceptionally(e -> {
-                throw new KafkaException(e);
+
+                invoker.submit(new OffsetCommitCallbackTask(callback, offsets, 
(Exception) t));
             });
         } finally {
             release();
@@ -473,8 +482,15 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     // Visible for testing
-    CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> 
offsets, final boolean isWakeupable) {
+    CompletableFuture<Void> commit(final Map<TopicPartition, 
OffsetAndMetadata> offsets, final boolean isWakeupable) {
+        maybeInvokeCommitCallbacks();
+        maybeThrowFencedInstanceException();
         maybeThrowInvalidGroupIdException();
+
+        if (offsets.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         final CommitApplicationEvent commitEvent = new 
CommitApplicationEvent(offsets);
         if (isWakeupable) {
             // the task can only be woken up if the top level API call is 
commitSync
@@ -852,6 +868,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         if (applicationEventHandler != null)
             closeQuietly(() -> applicationEventHandler.close(timeout), "Failed 
to close application event handler with a timeout(ms)=" + timeout, 
firstException);
 
+        // Invoke all callbacks after the background thread exists in case if 
there are unsent async
+        // commits
+        maybeInvokeCommitCallbacks();
+
         closeQuietly(fetchBuffer, "Failed to close the fetch buffer", 
firstException);
         closeQuietly(interceptors, "consumer interceptors", firstException);
         closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", 
firstException);
@@ -1158,16 +1178,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
-    private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
-        @Override
-        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
-            if (exception != null)
-                log.error("Offset commit with offsets {} failed", offsets, 
exception);
-        }
-    }
-
     @Override
     public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
+        maybeInvokeCommitCallbacks();
+        maybeThrowFencedInstanceException();
         backgroundEventProcessor.process();
 
         // Keeping this updateAssignmentMetadataIfNeeded wrapping up the 
updateFetchPositions as
@@ -1307,4 +1321,75 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public KafkaConsumerMetrics kafkaConsumerMetrics() {
         return kafkaConsumerMetrics;
     }
-}
\ No newline at end of file
+
+    private void maybeThrowFencedInstanceException() {
+        if (isFenced) {
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
+                groupInstanceId.orElse("null"));
+        }
+    }
+
+    // Visible for testing
+    void maybeInvokeCommitCallbacks() {
+        if (callbacks() > 0) {
+            invoker.executeCallbacks();
+        }
+    }
+
+    // Visible for testing
+    int callbacks() {
+        return invoker.callbackQueue.size();
+    }
+
+    /**
+     * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+     * achieved by having the background thread register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+     * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
+     */
+    private class OffsetCommitCallbackInvoker {
+        // Thread-safe queue to store callbacks
+        private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = 
new LinkedBlockingQueue<>();
+
+        public void submit(final OffsetCommitCallbackTask callback) {
+            try {
+                callbackQueue.offer(callback);
+            } catch (Exception e) {
+                log.error("Unexpected error encountered when adding offset 
commit callback to the invocation queue", e);
+            }
+        }
+
+        public void executeCallbacks() {
+            while (!callbackQueue.isEmpty()) {
+                OffsetCommitCallbackTask callback = callbackQueue.poll();
+                if (callback != null) {
+                    callback.invoke();
+                }
+            }
+        }
+    }
+
+    private class OffsetCommitCallbackTask {
+        private final Map<TopicPartition, OffsetAndMetadata> offsets;
+        private final Exception exception;
+        private final OffsetCommitCallback callback;
+
+        public OffsetCommitCallbackTask(final OffsetCommitCallback callback,
+                                        final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                        final Exception e) {
+            this.offsets = offsets;
+            this.exception = e;
+            this.callback = callback;
+        }
+
+        public void invoke() {
+            if (exception instanceof RetriableException) {
+                callback.onComplete(offsets, new 
RetriableCommitFailedException(exception));
+                return;
+            }
+
+            if (exception instanceof FencedInstanceIdException)
+                isFenced = true;
+            callback.onComplete(offsets, exception);
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b07ce529c36..f176cb57c4b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -711,7 +711,7 @@ public class KafkaConsumerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(GroupProtocol.class)
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     public void verifyPollTimesOutDuringMetadataUpdate(GroupProtocol 
groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
@@ -1258,7 +1258,7 @@ public class KafkaConsumerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(GroupProtocol.class)
+    @EnumSource(value = GroupProtocol.class, names = "GENERIC")
     public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol 
groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index c3b1e9fa50a..ae7ccd35323 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
@@ -29,15 +30,22 @@ import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicat
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentMatchers;
 import org.mockito.MockedConstruction;
 import org.mockito.stubbing.Answer;
@@ -50,20 +58,25 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Collections.singleton;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -136,19 +149,42 @@ public class AsyncKafkaConsumerTest {
         assertFalse(future.isCompletedExceptionally());
     }
 
-    @Test
-    public void testCommitAsync_UserSuppliedCallback() {
+    @ParameterizedTest
+    @MethodSource("commitExceptionSupplier")
+    public void testCommitAsync_UserSuppliedCallback(Exception exception) {
         CompletableFuture<Void> future = new CompletableFuture<>();
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-        offsets.put(new TopicPartition("my-topic", 0), new 
OffsetAndMetadata(100L));
         offsets.put(new TopicPartition("my-topic", 1), new 
OffsetAndMetadata(200L));
 
         doReturn(future).when(consumer).commit(offsets, false);
-        OffsetCommitCallback customCallback = mock(OffsetCommitCallback.class);
-        consumer.commitAsync(offsets, customCallback);
-        future.complete(null);
-        verify(customCallback).onComplete(offsets, null);
+        MockCommitCallback callback = new MockCommitCallback();
+        assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
+
+        if (exception == null) {
+            future.complete(null);
+            consumer.maybeInvokeCommitCallbacks();
+            assertNull(callback.exception);
+        } else {
+            future.completeExceptionally(exception);
+            consumer.maybeInvokeCommitCallbacks();
+            assertSame(exception.getClass(), callback.exception.getClass());
+        }
+    }
+
+    private static Stream<Exception> commitExceptionSupplier() {
+        return Stream.of(
+                null,  // For the successful completion scenario
+                new KafkaException("Test exception"),
+                new GroupAuthorizationException("Group authorization 
exception"));
+    }
+
+    @Test
+    public void testFencedInstanceException() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        assertDoesNotThrow(() -> consumer.commitAsync());
+        future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
     }
 
     @Test
@@ -175,6 +211,91 @@ public class AsyncKafkaConsumerTest {
         }
     }
 
+    @Test
+    public void testEnsureCallbackExecutedByApplicationThread() {
+        final String currentThread = Thread.currentThread().getName();
+        ExecutorService backgroundExecutor = 
Executors.newSingleThreadExecutor();
+        MockCommitCallback callback = new MockCommitCallback();
+        CountDownLatch latch = new CountDownLatch(1);  // Initialize the latch 
with a count of 1
+        try {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            doReturn(future).when(consumer).commit(new HashMap<>(), false);
+            assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
+            // Simulating some background work
+            backgroundExecutor.submit(() -> {
+                future.complete(null);
+                latch.countDown();
+            });
+            latch.await();
+            assertEquals(1, consumer.callbacks());
+            consumer.maybeInvokeCommitCallbacks();
+            assertEquals(currentThread, callback.completionThread);
+        } catch (Exception e) {
+            fail("Not expecting an exception");
+        } finally {
+            backgroundExecutor.shutdown();
+        }
+    }
+
+    @Test
+    public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() {
+        MockCommitCallback callback = new MockCommitCallback();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
+        future.completeExceptionally(new NetworkException("Test exception"));
+        assertMockCommitCallbackInvoked(() -> consumer.commitSync(),
+            callback,
+            Errors.NETWORK_EXCEPTION);
+    }
+
+    @Test
+    public void testEnsurePollExecutedCommitAsyncCallbacks() {
+        MockCommitCallback callback = new MockCommitCallback();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
+        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
+        future.complete(null);
+        assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO),
+            callback,
+            null);
+    }
+
+    @Test
+    public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
+        MockCommitCallback callback = new MockCommitCallback();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
+        future.complete(null);
+        assertMockCommitCallbackInvoked(() -> consumer.close(Duration.ZERO),
+            callback,
+            null);
+    }
+
+    private void assertMockCommitCallbackInvoked(final Executable task, final 
MockCommitCallback callback,
+                                                 final Errors exception) {
+        assertDoesNotThrow(task);
+        assertEquals(1, callback.invoked);
+        if (callback.exception instanceof RetriableException)
+            assertEquals(callback.exception.getClass(), 
RetriableCommitFailedException.class);
+        else
+            assertNull(callback.exception);
+    }
+
+    private static class MockCommitCallback implements OffsetCommitCallback {
+        public int invoked = 0;
+        public Exception exception = null;
+        public String completionThread;
+
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
+            invoked++;
+            this.completionThread = Thread.currentThread().getName();
+            this.exception = exception;
+        }
+    }
     /**
      * This is a rather ugly bit of code. Not my choice :(
      *
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
index 717f1179753..45ce3c5a357 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
@@ -36,18 +36,12 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
     val producer = createProducer()
     val numRecords = 10000
     val startingTimestamp = System.currentTimeMillis()
-    val cb = new CountConsumerCommitCallback
     sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
     consumer.assign(List(tp).asJava)
-    consumer.commitAsync(cb)
+    consumer.commitAsync()
     waitUntilTrue(() => {
-      cb.successCount == 1
+      consumer.committed(Set(tp).asJava, 
Duration.ofMillis(defaultBlockingAPITimeoutMs)) != null
     }, "wait until commit is completed successfully", 
defaultBlockingAPITimeoutMs)
-    val committedOffset = consumer.committed(Set(tp).asJava, 
Duration.ofMillis(defaultBlockingAPITimeoutMs))
-    assertNotNull(committedOffset)
-    // No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
-    // tp. The committed offset should be null. This is intentional.
-    assertNull(committedOffset.get(tp))
     assertTrue(consumer.assignment.contains(tp))
   }
 

Reply via email to