cadonna commented on code in PR #17795:
URL: https://github.com/apache/kafka/pull/17795#discussion_r1858311885
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -281,17 +300,173 @@ public String toString() {
}
}
+ private final BlockingQueue<BackgroundEvent> onCallbackRequests = new
LinkedBlockingQueue<>();
+
+ private ApplicationEventHandler applicationEventHandler = null;
+
+ private Optional<Function<Set<StreamsAssignmentInterface.TaskId>,
Optional<Exception>>> onTasksRevokedCallback = null;
+ private Optional<Function<Assignment, Optional<Exception>>>
onTasksAssignedCallback = null;
+ private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback =
null;
+
+ private final StreamsRebalanceEventProcessor
streamsRebalanceEventProcessor;
+
+ private class StreamsRebalanceEventProcessor implements
EventProcessor<BackgroundEvent> {
+
+ @Override
+ public void process(final BackgroundEvent event) {
+ switch (event.type()) {
+ case ERROR:
+ process((ErrorEvent) event);
+ break;
+
+ case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
+ process((StreamsOnTasksRevokedCallbackNeededEvent) event);
+ break;
+
+ case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
+ process((StreamsOnTasksAssignedCallbackNeededEvent) event);
+ break;
+
+ case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
+ process((StreamsOnAllTasksLostCallbackNeededEvent) event);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Background event type
" + event.type() + " was not expected");
+
+ }
+ }
+
+ private void process(final ErrorEvent event) {
+ throw event.error();
+ }
+
+ private void process(final StreamsOnTasksRevokedCallbackNeededEvent
event) {
+ StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent =
invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future());
+ applicationEventHandler.add(invokedEvent);
+ if (invokedEvent.error().isPresent()) {
+ throw invokedEvent.error().get();
+ }
+ }
+
+ private void process(final StreamsOnTasksAssignedCallbackNeededEvent
event) {
+ StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent =
invokeOnTasksAssignedCallback(event.assignment(), event.future());
+ applicationEventHandler.add(invokedEvent);
+ if (invokedEvent.error().isPresent()) {
+ throw invokedEvent.error().get();
+ }
+ }
+
+ private void process(final StreamsOnAllTasksLostCallbackNeededEvent
event) {
+ StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent =
invokeOnAllTasksLostCallback(event.future());
+ applicationEventHandler.add(invokedEvent);
+ if (invokedEvent.error().isPresent()) {
+ throw invokedEvent.error().get();
+ }
+ }
+
+ private StreamsOnTasksRevokedCallbackCompletedEvent
invokeOnTasksRevokedCallback(final Set<StreamsAssignmentInterface.TaskId>
activeTasksToRevoke,
+
final CompletableFuture<Void> future) {
+ final Optional<Exception> exceptionFromCallback =
onTasksRevokedCallback
+ .orElseThrow(() -> new IllegalStateException("No tasks
assignment callback set!")).apply(activeTasksToRevoke);
+
+ return exceptionFromCallback
+ .map(exception ->
+ new StreamsOnTasksRevokedCallbackCompletedEvent(
+ future,
+
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exception, "Task revocation
callback throws an error"))
+ ))
+ .orElseGet(() -> new
StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.empty()));
+ }
+
+ private StreamsOnTasksAssignedCallbackCompletedEvent
invokeOnTasksAssignedCallback(final StreamsAssignmentInterface.Assignment
assignment,
+
final CompletableFuture<Void> future) {
+ Optional<KafkaException> error = Optional.empty();
+ // ToDo: Can we avoid the following check?
+ if (!assignment.equals(reconciledAssignment.get())) {
+
+ final Optional<Exception> exceptionFromCallback =
onTasksAssignedCallback
+ .orElseThrow(() -> new IllegalStateException("No tasks
assignment callback set!")).apply(assignment);
+
+ if (exceptionFromCallback.isPresent()) {
+ error =
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
"Task assignment callback throws an error"));
+ } else {
+ reconciledAssignment.set(assignment);
+ }
+ }
+ return new StreamsOnTasksAssignedCallbackCompletedEvent(future,
error);
+ }
+
+ private StreamsOnAllTasksLostCallbackCompletedEvent
invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) {
+ final Optional<Exception> exceptionFromCallback =
onAllTasksLostCallback
+ .orElseThrow(() -> new IllegalStateException("No tasks
assignment callback set!")).get();
+
+ final Optional<KafkaException> error;
+
+ if (exceptionFromCallback.isPresent()) {
+ error =
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(),
"Task assignment callback throws an error"));
+ } else {
+ error = Optional.empty();
+ reconciledAssignment.set(Assignment.EMPTY);
+ }
+
+ return new StreamsOnAllTasksLostCallbackCompletedEvent(future,
error);
+ }
+ }
+
public StreamsAssignmentInterface(UUID processId,
Optional<HostInfo> endpoint,
Map<String, Subtopology> subtopologyMap,
- Map<String, String> clientTags
- ) {
+ Map<String, String> clientTags) {
this.processId = processId;
this.endpoint = endpoint;
this.subtopologyMap = subtopologyMap;
this.taskLags = new HashMap<>();
this.shutdownRequested = new AtomicBoolean(false);
this.clientTags = clientTags;
+ this.streamsRebalanceEventProcessor = new
StreamsRebalanceEventProcessor();
+ }
+
+ public void setOnTasksRevokedCallback(final
Function<Set<StreamsAssignmentInterface.TaskId>, Optional<Exception>>
onTasksRevokedCallback) {
+ this.onTasksRevokedCallback =
Optional.ofNullable(onTasksRevokedCallback);
+ }
+
+ public void setOnTasksAssignedCallback(final Function<Assignment,
Optional<Exception>> onTasksAssignedCallback) {
+ this.onTasksAssignedCallback =
Optional.ofNullable(onTasksAssignedCallback);
+ }
+
+ public void setOnAllTasksLostCallback(final Supplier<Optional<Exception>>
onAllTasksLostCallback) {
+ this.onAllTasksLostCallback =
Optional.ofNullable(onAllTasksLostCallback);
+ }
+
+ public void setApplicationEventHandler(final ApplicationEventHandler
applicationEventHandler) {
+ this.applicationEventHandler = applicationEventHandler;
+ }
+
+ public CompletableFuture<Void>
requestOnTasksAssignedCallbackInvocation(final Assignment assignment) {
+ final StreamsOnTasksAssignedCallbackNeededEvent
onTasksAssignedCallbackNeededEvent = new
StreamsOnTasksAssignedCallbackNeededEvent(assignment);
+ onCallbackRequests.add(onTasksAssignedCallbackNeededEvent);
+ return onTasksAssignedCallbackNeededEvent.future();
+ }
+
+ public CompletableFuture<Void>
requestOnTasksRevokedCallbackInvocation(final
Set<StreamsAssignmentInterface.TaskId> activeTasksToRevoke) {
+ final StreamsOnTasksRevokedCallbackNeededEvent
onTasksRevokedCallbackNeededEvent = new
StreamsOnTasksRevokedCallbackNeededEvent(activeTasksToRevoke);
+ onCallbackRequests.add(onTasksRevokedCallbackNeededEvent);
+ return onTasksRevokedCallbackNeededEvent.future();
+ }
+
+ public CompletableFuture<Void> requestOnAllTasksLostCallbackInvocation() {
Review Comment:
Basically, we use the futures only to bind code to them so that this code is
executed when they complete. We pass the future from the background thread to
the application thread which does something. Then when this something is done,
we pass the future back to the background thread where it is completed and the
code is executed. It is really just a way to pass around code and execute it at
the right moment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]