kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1579899408


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java:
##########
@@ -16,120 +16,31 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.IdempotentCloser;
-import org.apache.kafka.common.utils.LogContext;
-import org.slf4j.Logger;
-
-import java.io.Closeable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 
 /**
- * An {@link EventProcessor} is the means by which events <em>produced</em> by 
thread <em>A</em> are
- * <em>processed</em> by thread <em>B</em>. By definition, threads <em>A</em> 
and <em>B</em> run in parallel to
- * each other, so a mechanism is needed with which to receive and process the 
events from the other thread. That
- * communication channel is formed around {@link BlockingQueue a shared queue} 
into which thread <em>A</em>
- * enqueues events and thread <em>B</em> reads and processes those events.
+ * An {@code EventProcessor} is the means by which events are 
<em>processed</em>, the meaning of which is left
+ * intentionally loose. This is in large part to keep the {@code 
EventProcessor} focused on what it means to process
+ * the events, and <em>not</em> linking itself too closely with the rest of 
the surrounding application.
+ *
+ * <p/>
+ *
+ * The {@code EventProcessor} is envisaged as a stateless service that acts as 
a conduit, receiving an event and
+ * dispatching to another block of code to process. The semantic meaning of 
each event is different, so the
+ * {@code EventProcessor} will need to interact with other parts of the system 
that maintain state. The
+ * implementation should not be concerned with the mechanism by which an event 
arrived for processing. While the
+ * events are shuffled around the consumer subsystem by means of {@link 
BlockingQueue shared queues}, it should
+ * be considered an anti-pattern to need to know how it arrived or what 
happens after its is processed.
  */
-public abstract class EventProcessor<T> implements Closeable {
-
-    private final Logger log;
-    private final BlockingQueue<T> eventQueue;
-    private final IdempotentCloser closer;
-
-    protected EventProcessor(final LogContext logContext, final 
BlockingQueue<T> eventQueue) {
-        this.log = logContext.logger(EventProcessor.class);
-        this.eventQueue = eventQueue;
-        this.closer = new IdempotentCloser();
-    }
-
-    public abstract boolean process();
-
-    protected abstract void process(T event);
-
-    @Override
-    public void close() {
-        closer.close(this::closeInternal, () -> log.warn("The event processor 
was already closed"));
-    }
-
-    protected interface ProcessHandler<T> {
-
-        void onProcess(T event, Optional<KafkaException> error);
-    }
+public interface EventProcessor<T> extends AutoCloseable {
 
     /**
-     * Drains all available events from the queue, and then processes them in 
order. If any errors are thrown while
-     * processing the individual events, these are submitted to the given 
{@link ProcessHandler}.
+     * Process an event that is received.
      */
-    protected boolean process(ProcessHandler<T> processHandler) {
-        closer.assertOpen("The processor was previously closed, so no further 
processing can occur");
-
-        List<T> events = drain();
-
-        if (events.isEmpty()) {
-            log.trace("No events to process");
-            return false;
-        }
+    void process(T event);
 
-        try {
-            log.trace("Starting processing of {} event{}", events.size(), 
events.size() == 1 ? "" : "s");
-
-            for (T event : events) {
-                try {
-                    Objects.requireNonNull(event, "Attempted to process a null 
event");
-                    log.trace("Processing event: {}", event);
-                    process(event);
-                    processHandler.onProcess(event, Optional.empty());
-                } catch (Throwable t) {
-                    KafkaException error = 
ConsumerUtils.maybeWrapAsKafkaException(t);
-                    processHandler.onProcess(event, Optional.of(error));
-                }
-            }
-        } finally {
-            log.trace("Completed processing");
-        }
-
-        return true;
-    }
-
-    /**
-     * It is possible for the consumer to close before complete processing all 
the events in the queue. In
-     * this case, we need to throw an exception to notify the user the 
consumer is closed.
-     */
-    private void closeInternal() {
-        log.trace("Closing event processor");
-        List<T> incompleteEvents = drain();
-
-        if (incompleteEvents.isEmpty())
-            return;
-
-        KafkaException exception = new KafkaException("The consumer is 
closed");
-
-        // Check each of the events and if it has a Future that is incomplete, 
complete it exceptionally.
-        incompleteEvents
-                .stream()
-                .filter(e -> e instanceof CompletableEvent)
-                .map(e -> ((CompletableEvent<?>) e).future())
-                .filter(f -> !f.isDone())
-                .forEach(f -> {
-                    log.debug("Completing {} with exception {}", f, 
exception.getMessage());
-                    f.completeExceptionally(exception);
-                });
-
-        log.debug("Discarding {} events because the consumer is closing", 
incompleteEvents.size());
-    }
-
-    /**
-     * Moves all the events from the queue to the returned list.
-     */
-    private List<T> drain() {
-        LinkedList<T> events = new LinkedList<>();
-        eventQueue.drainTo(events);
-        return events;
+    @Override
+    default void close() {
+        // Do nothing by default...

Review Comment:
   It turns out `close()` wasn't needed any more, so I just removed it 🤷‍♂️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to