lianetm commented on code in PR #20511:
URL: https://github.com/apache/kafka/pull/20511#discussion_r2348999339
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1527,26 +1495,39 @@ private void autoCommitOnClose(final Timer timer) {
}
private void runRebalanceCallbacksOnClose() {
- if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker == null)
+ if (groupMetadata.get().isEmpty())
return;
int memberEpoch = groupMetadata.get().get().generationId();
- Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get();
+ final Exception error;
- if (assignedPartitions.isEmpty())
- // Nothing to revoke.
- return;
+ if (streamsRebalanceListenerInvoker.isPresent()) {
Review Comment:
the `streamsRebalanceListenerInvoker` could be null here in the case where
`close` is called from the constructor upon a failure building the consumer (ln
509), so maybe consider adding a check here? (we've been seeing those noisy NPE
logs on close recently actually)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in
the {@link StreamsRebalanceListener}
+ * interface. When streams group task assignment changes, these methods are
invoked. This class wraps those
+ * callback calls with some logging and error handling.
+ */
+public class StreamsRebalanceListenerInvoker {
+
+ private final Logger log;
+
+ private final StreamsRebalanceData streamsRebalanceData;
+ private Optional<StreamsRebalanceListener> listener;
+
+ StreamsRebalanceListenerInvoker(LogContext logContext,
StreamsRebalanceData streamsRebalanceData) {
+ this.log = logContext.logger(getClass());
+ this.listener = Optional.empty();
+ this.streamsRebalanceData = streamsRebalanceData;
+ }
+
+ public void setRebalanceListener(StreamsRebalanceListener
streamsRebalanceListener) {
+ Objects.requireNonNull(streamsRebalanceListener,
"StreamsRebalanceListener cannot be null");
+ if (listener.isPresent() && listener.get() !=
streamsRebalanceListener) {
+ throw new IllegalStateException("StreamsRebalanceListener can only
be set once");
Review Comment:
given this validation, should we clear the listener on `unsubscribe`?
(otherwise, I expect that subscribe+unsubscribe+subscribe would fail on the
last subscribe with this error).
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1527,26 +1495,39 @@ private void autoCommitOnClose(final Timer timer) {
}
private void runRebalanceCallbacksOnClose() {
Review Comment:
since this func is being extended to cover streams tasks now too, should we
tweak the error message to make it more generic? (above, `Failed to release
group assignment`). Up to you, but maybe something along the lines of "Failed
running callbacks" would apply better to both cases
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -278,44 +269,28 @@ private void
processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT
private StreamsOnTasksRevokedCallbackCompletedEvent
invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId>
activeTasksToRevoke,
final CompletableFuture<Void> future) {
- final Optional<Exception> exceptionFromCallback =
streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
+ final Optional<Exception> exceptionFromCallback =
Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksRevoked(activeTasksToRevoke));
final Optional<KafkaException> error = exceptionFromCallback.map(e
-> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws
an error"));
Review Comment:
uhm I see, I'm ok leaving like this then, was just trying to simplify the
optionals readability, gets heavy, but I see the trade-of with the existing
logic you mentioned.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvoker.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in
the {@link StreamsRebalanceListener}
+ * interface. When streams group task assignment changes, these methods are
invoked. This class wraps those
+ * callback calls with some logging and error handling.
+ */
+public class StreamsRebalanceListenerInvoker {
+
+ private final Logger log;
+
+ private final StreamsRebalanceData streamsRebalanceData;
+ private Optional<StreamsRebalanceListener> listener;
+
+ StreamsRebalanceListenerInvoker(LogContext logContext,
StreamsRebalanceData streamsRebalanceData) {
+ this.log = logContext.logger(getClass());
+ this.listener = Optional.empty();
+ this.streamsRebalanceData = streamsRebalanceData;
+ }
+
+ public void setRebalanceListener(StreamsRebalanceListener
streamsRebalanceListener) {
+ this.listener = Optional.ofNullable(streamsRebalanceListener);
+ }
+
+ public Exception invokeAllTasksRevoked() {
+ if (listener.isPresent()) {
+ return
invokeTasksRevoked(streamsRebalanceData.reconciledAssignment().activeTasks());
+ }
+
+ return null;
+ }
+
+ public Exception invokeTasksAssigned(final StreamsRebalanceData.Assignment
assignment) {
+ if (listener.isPresent()) {
+ log.info("Adding newly assigned tasks: {}", assignment);
Review Comment:
> The consumer rebalance listener logs Adding newly assigned partitions: in
this case
I see, well, that's wrong too :) at that point the partitions have been
already added really. I will fix it on the consumer side ;)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1532,21 +1512,34 @@ private void runRebalanceCallbacksOnClose() {
Review Comment:
ack
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]