This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e63f23718fd KAFKA-15174: Ensure CommitAsync propagate the exception to the user (#14680) e63f23718fd is described below commit e63f23718fdec514582fe1107536c1d2bacf72a1 Author: Philip Nee <p...@confluent.io> AuthorDate: Mon Nov 20 00:15:48 2023 -0800 KAFKA-15174: Ensure CommitAsync propagate the exception to the user (#14680) The commit covers a few important points: - Exception handling: We should be thrown RetriableCommitException when the commit exception is retriable. We should throw FencedIdException on commit and poll similar to the current implementation. Other errors should be thrown as it is. - Callback invocation: The callbacks need to be invoked on the main/application thread; however, the future is completed in the background thread. To achieve this, I created an Invoker class with a queue, so that this callback can be invoked during the consumer.poll() Note: One change I made is to remove the DefaultOffsetCommit callback. Since the callback is purely for logging, I think it is reasonable for us to move the logging to the background thread instead of relying on the application thread to trigger the logging. Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../consumer/internals/AsyncKafkaConsumer.java | 123 +++++++++++++++--- .../kafka/clients/consumer/KafkaConsumerTest.java | 4 +- .../consumer/internals/AsyncKafkaConsumerTest.java | 137 +++++++++++++++++++-- .../kafka/api/BaseAsyncConsumerTest.scala | 10 +- 4 files changed, 237 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6746485a7ed..848806e2395 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -38,12 +38,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor; -import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; @@ -58,8 +59,10 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.Metrics; @@ -156,6 +159,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); + private boolean isFenced = false; + private final Optional<String> groupInstanceId; + + private final OffsetCommitCallbackInvoker invoker = new OffsetCommitCallbackInvoker(); // currentThread holds the threadId of the current thread accessing the AsyncKafkaConsumer // and is used to prevent multithreaded access @@ -168,7 +175,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { try { GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER); - + this.groupInstanceId = groupRebalanceConfig.groupInstanceId; this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); @@ -306,6 +313,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.applicationEventHandler = applicationEventHandler; this.assignors = assignors; this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.groupInstanceId = Optional.empty(); } // Visible for testing @@ -332,6 +340,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer); this.assignors = assignors; + this.groupInstanceId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX); FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); @@ -457,15 +466,15 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { acquireAndEnsureOpen(); try { CompletableFuture<Void> future = commit(offsets, false); - final OffsetCommitCallback commitCallback = callback == null ? new DefaultOffsetCommitCallback() : callback; future.whenComplete((r, t) -> { - if (t != null) { - commitCallback.onComplete(offsets, new KafkaException(t)); - } else { - commitCallback.onComplete(offsets, null); + if (callback == null) { + if (t != null) { + log.error("Offset commit with offsets {} failed", offsets, t); + } + return; } - }).exceptionally(e -> { - throw new KafkaException(e); + + invoker.submit(new OffsetCommitCallbackTask(callback, offsets, (Exception) t)); }); } finally { release(); @@ -473,8 +482,15 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { } // Visible for testing - CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets, final boolean isWakeupable) { + CompletableFuture<Void> commit(final Map<TopicPartition, OffsetAndMetadata> offsets, final boolean isWakeupable) { + maybeInvokeCommitCallbacks(); + maybeThrowFencedInstanceException(); maybeThrowInvalidGroupIdException(); + + if (offsets.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets); if (isWakeupable) { // the task can only be woken up if the top level API call is commitSync @@ -852,6 +868,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(timeout), "Failed to close application event handler with a timeout(ms)=" + timeout, firstException); + // Invoke all callbacks after the background thread exists in case if there are unsent async + // commits + maybeInvokeCommitCallbacks(); + closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); @@ -1158,16 +1178,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); } - private class DefaultOffsetCommitCallback implements OffsetCommitCallback { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception != null) - log.error("Offset commit with offsets {} failed", offsets, exception); - } - } - @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { + maybeInvokeCommitCallbacks(); + maybeThrowFencedInstanceException(); backgroundEventProcessor.process(); // Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as @@ -1307,4 +1321,75 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { public KafkaConsumerMetrics kafkaConsumerMetrics() { return kafkaConsumerMetrics; } -} \ No newline at end of file + + private void maybeThrowFencedInstanceException() { + if (isFenced) { + throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + + groupInstanceId.orElse("null")); + } + } + + // Visible for testing + void maybeInvokeCommitCallbacks() { + if (callbacks() > 0) { + invoker.executeCallbacks(); + } + } + + // Visible for testing + int callbacks() { + return invoker.callbackQueue.size(); + } + + /** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is + * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ + private class OffsetCommitCallbackInvoker { + // Thread-safe queue to store callbacks + private final BlockingQueue<OffsetCommitCallbackTask> callbackQueue = new LinkedBlockingQueue<>(); + + public void submit(final OffsetCommitCallbackTask callback) { + try { + callbackQueue.offer(callback); + } catch (Exception e) { + log.error("Unexpected error encountered when adding offset commit callback to the invocation queue", e); + } + } + + public void executeCallbacks() { + while (!callbackQueue.isEmpty()) { + OffsetCommitCallbackTask callback = callbackQueue.poll(); + if (callback != null) { + callback.invoke(); + } + } + } + } + + private class OffsetCommitCallbackTask { + private final Map<TopicPartition, OffsetAndMetadata> offsets; + private final Exception exception; + private final OffsetCommitCallback callback; + + public OffsetCommitCallbackTask(final OffsetCommitCallback callback, + final Map<TopicPartition, OffsetAndMetadata> offsets, + final Exception e) { + this.offsets = offsets; + this.exception = e; + this.callback = callback; + } + + public void invoke() { + if (exception instanceof RetriableException) { + callback.onComplete(offsets, new RetriableCommitFailedException(exception)); + return; + } + + if (exception instanceof FencedInstanceIdException) + isFenced = true; + callback.onComplete(offsets, exception); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index b07ce529c36..f176cb57c4b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -711,7 +711,7 @@ public class KafkaConsumerTest { } @ParameterizedTest - @EnumSource(GroupProtocol.class) + @EnumSource(value = GroupProtocol.class, names = "GENERIC") public void verifyPollTimesOutDuringMetadataUpdate(GroupProtocol groupProtocol) { final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); @@ -1258,7 +1258,7 @@ public class KafkaConsumerTest { } @ParameterizedTest - @EnumSource(GroupProtocol.class) + @EnumSource(value = GroupProtocol.class, names = "GENERIC") public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index c3b1e9fa50a..ae7ccd35323 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; @@ -29,15 +30,22 @@ import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicat import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.NetworkException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentMatchers; import org.mockito.MockedConstruction; import org.mockito.stubbing.Answer; @@ -50,20 +58,25 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.singleton; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -136,19 +149,42 @@ public class AsyncKafkaConsumerTest { assertFalse(future.isCompletedExceptionally()); } - @Test - public void testCommitAsync_UserSuppliedCallback() { + @ParameterizedTest + @MethodSource("commitExceptionSupplier") + public void testCommitAsync_UserSuppliedCallback(Exception exception) { CompletableFuture<Void> future = new CompletableFuture<>(); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); - offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(100L)); offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); doReturn(future).when(consumer).commit(offsets, false); - OffsetCommitCallback customCallback = mock(OffsetCommitCallback.class); - consumer.commitAsync(offsets, customCallback); - future.complete(null); - verify(customCallback).onComplete(offsets, null); + MockCommitCallback callback = new MockCommitCallback(); + assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + + if (exception == null) { + future.complete(null); + consumer.maybeInvokeCommitCallbacks(); + assertNull(callback.exception); + } else { + future.completeExceptionally(exception); + consumer.maybeInvokeCommitCallbacks(); + assertSame(exception.getClass(), callback.exception.getClass()); + } + } + + private static Stream<Exception> commitExceptionSupplier() { + return Stream.of( + null, // For the successful completion scenario + new KafkaException("Test exception"), + new GroupAuthorizationException("Group authorization exception")); + } + + @Test + public void testFencedInstanceException() { + CompletableFuture<Void> future = new CompletableFuture<>(); + doReturn(future).when(consumer).commit(new HashMap<>(), false); + assertDoesNotThrow(() -> consumer.commitAsync()); + future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); } @Test @@ -175,6 +211,91 @@ public class AsyncKafkaConsumerTest { } } + @Test + public void testEnsureCallbackExecutedByApplicationThread() { + final String currentThread = Thread.currentThread().getName(); + ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor(); + MockCommitCallback callback = new MockCommitCallback(); + CountDownLatch latch = new CountDownLatch(1); // Initialize the latch with a count of 1 + try { + CompletableFuture<Void> future = new CompletableFuture<>(); + doReturn(future).when(consumer).commit(new HashMap<>(), false); + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + // Simulating some background work + backgroundExecutor.submit(() -> { + future.complete(null); + latch.countDown(); + }); + latch.await(); + assertEquals(1, consumer.callbacks()); + consumer.maybeInvokeCommitCallbacks(); + assertEquals(currentThread, callback.completionThread); + } catch (Exception e) { + fail("Not expecting an exception"); + } finally { + backgroundExecutor.shutdown(); + } + } + + @Test + public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() { + MockCommitCallback callback = new MockCommitCallback(); + CompletableFuture<Void> future = new CompletableFuture<>(); + doReturn(future).when(consumer).commit(new HashMap<>(), false); + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + future.completeExceptionally(new NetworkException("Test exception")); + assertMockCommitCallbackInvoked(() -> consumer.commitSync(), + callback, + Errors.NETWORK_EXCEPTION); + } + + @Test + public void testEnsurePollExecutedCommitAsyncCallbacks() { + MockCommitCallback callback = new MockCommitCallback(); + CompletableFuture<Void> future = new CompletableFuture<>(); + consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); + doReturn(future).when(consumer).commit(new HashMap<>(), false); + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + future.complete(null); + assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), + callback, + null); + } + + @Test + public void testEnsureShutdownExecutedCommitAsyncCallbacks() { + MockCommitCallback callback = new MockCommitCallback(); + CompletableFuture<Void> future = new CompletableFuture<>(); + doReturn(future).when(consumer).commit(new HashMap<>(), false); + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + future.complete(null); + assertMockCommitCallbackInvoked(() -> consumer.close(Duration.ZERO), + callback, + null); + } + + private void assertMockCommitCallbackInvoked(final Executable task, final MockCommitCallback callback, + final Errors exception) { + assertDoesNotThrow(task); + assertEquals(1, callback.invoked); + if (callback.exception instanceof RetriableException) + assertEquals(callback.exception.getClass(), RetriableCommitFailedException.class); + else + assertNull(callback.exception); + } + + private static class MockCommitCallback implements OffsetCommitCallback { + public int invoked = 0; + public Exception exception = null; + public String completionThread; + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + invoked++; + this.completionThread = Thread.currentThread().getName(); + this.exception = exception; + } + } /** * This is a rather ugly bit of code. Not my choice :( * diff --git a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala index 717f1179753..45ce3c5a357 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala @@ -36,18 +36,12 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest { val producer = createProducer() val numRecords = 10000 val startingTimestamp = System.currentTimeMillis() - val cb = new CountConsumerCommitCallback sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) consumer.assign(List(tp).asJava) - consumer.commitAsync(cb) + consumer.commitAsync() waitUntilTrue(() => { - cb.successCount == 1 + consumer.committed(Set(tp).asJava, Duration.ofMillis(defaultBlockingAPITimeoutMs)) != null }, "wait until commit is completed successfully", defaultBlockingAPITimeoutMs) - val committedOffset = consumer.committed(Set(tp).asJava, Duration.ofMillis(defaultBlockingAPITimeoutMs)) - assertNotNull(committedOffset) - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)) assertTrue(consumer.assignment.contains(tp)) }