cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413910032


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+     * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+     * {@link ConsumerNetworkThread network thread}.
+     * Those events are generally of two types:
+     *
+     * <ul>
+     *     <li>Errors that occur in the network thread that need to be 
propagated to the application thread</li>
+     *     <li>{@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread</li>
+     * </ul>
+     */
+    public class BackgroundEventProcessor extends 
EventProcessor<BackgroundEvent> {
+
+        public BackgroundEventProcessor(final LogContext logContext,
+                                        final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+            super(logContext, backgroundEventQueue);
+        }
+
+        /**
+         * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+         * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+         * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+         * error, continue to process the remaining events, and then throw the 
first error that occurred.
+         */
+        @Override
+        public void process() {
+            AtomicReference<RuntimeException> firstError = new 
AtomicReference<>();
+            process((event, error) -> firstError.compareAndSet(null, error));
+
+            if (firstError.get() != null) {
+                throw firstError.get();
+            }
+        }
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorBackgroundEvent) event);
+                    break;
+                case GROUP_METADATA_UPDATE:
+                    process((GroupMetadataUpdateEvent) event);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        @Override
+        protected Class<BackgroundEvent> getEventClass() {
+            return BackgroundEvent.class;
+        }
+
+        private void process(final ErrorBackgroundEvent event) {
+            throw event.error();
+        }
+
+        private void process(final GroupMetadataUpdateEvent event) {
+            if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+                final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+                AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+                    event.groupId(),

Review Comment:
   Yes, we do. We could verify that the group ID and the group instance ID are 
equal to catch bugs. Or we could just send the updateable fields to the 
application thread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+     * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+     * {@link ConsumerNetworkThread network thread}.
+     * Those events are generally of two types:
+     *
+     * <ul>
+     *     <li>Errors that occur in the network thread that need to be 
propagated to the application thread</li>
+     *     <li>{@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread</li>
+     * </ul>
+     */
+    public class BackgroundEventProcessor extends 
EventProcessor<BackgroundEvent> {
+
+        public BackgroundEventProcessor(final LogContext logContext,
+                                        final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+            super(logContext, backgroundEventQueue);
+        }
+
+        /**
+         * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+         * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+         * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+         * error, continue to process the remaining events, and then throw the 
first error that occurred.
+         */
+        @Override
+        public void process() {
+            AtomicReference<RuntimeException> firstError = new 
AtomicReference<>();
+            process((event, error) -> firstError.compareAndSet(null, error));
+
+            if (firstError.get() != null) {
+                throw firstError.get();
+            }
+        }
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorBackgroundEvent) event);
+                    break;
+                case GROUP_METADATA_UPDATE:
+                    process((GroupMetadataUpdateEvent) event);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        @Override
+        protected Class<BackgroundEvent> getEventClass() {
+            return BackgroundEvent.class;
+        }
+
+        private void process(final ErrorBackgroundEvent event) {
+            throw event.error();
+        }
+
+        private void process(final GroupMetadataUpdateEvent event) {
+            if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+                final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+                AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+                    event.groupId(),
+                    event.memberEpoch(),
+                    event.memberId() != null ? event.memberId() : 
currentGroupMetadata.memberId(),
+                    event.groupInstanceId()

Review Comment:
   See above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to