cmccabe commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r569813199



##########
File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public interface EventQueue extends AutoCloseable {
+    interface Event {
+        void run() throws Exception;
+        default void handleException(Throwable e) {}
+    }
+
+    abstract class FailureLoggingEvent implements Event {
+        private final Logger log;
+
+        public FailureLoggingEvent(Logger log) {
+            this.log = log;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            if (e instanceof EventQueueClosedException) {
+                log.info("Not processing {} because the event queue is 
closed.",
+                    this.toString());
+            } else {
+                log.error("Unexpected error handling {}", this.toString(), e);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return this.getClass().getSimpleName();
+        }
+    }
+
+    class DeadlineFunction implements Function<Long, Long> {
+        private final long deadlineNs;
+
+        public DeadlineFunction(long deadlineNs) {
+            this.deadlineNs = deadlineNs;
+        }
+
+        @Override
+        public Long apply(Long t) {
+            return deadlineNs;
+        }
+    }
+
+    class EarliestDeadlineFunction implements Function<Long, Long> {
+        private final long newDeadlineNs;
+
+        public EarliestDeadlineFunction(long newDeadlineNs) {
+            this.newDeadlineNs = newDeadlineNs;
+        }
+
+        @Override
+        public Long apply(Long prevDeadlineNs) {
+            if (prevDeadlineNs == null) {
+                return newDeadlineNs;
+            } else if (prevDeadlineNs < newDeadlineNs) {
+                return prevDeadlineNs;
+            } else {
+                return newDeadlineNs;
+            }
+        }
+    }
+
+    class VoidEvent implements Event {
+        public final static VoidEvent INSTANCE = new VoidEvent();
+
+        @Override
+        public void run() throws Exception {
+        }
+    }
+
+    /**
+     * Add an element to the front of the queue.
+     *
+     * @param event             The mandatory event to prepend.
+     */
+    default void prepend(Event event) {
+        enqueue(EventInsertionType.PREPEND, null, null, event);
+    }
+
+    /**
+     * Add an element to the end of the queue.
+     *
+     * @param event             The event to append.
+     */
+    default void append(Event event) {
+        enqueue(EventInsertionType.APPEND, null, null, event);
+    }
+
+    /**
+     * Enqueue an event to be run in FIFO order.
+     *
+     * @param deadlineNs        The time in monotonic nanoseconds after which 
the future
+     *                          is completed with a
+     *                          
@{org.apache.kafka.common.errors.TimeoutException},
+     *                          and the event is cancelled.
+     * @param event             The event to append.
+     */
+    default void appendWithDeadline(long deadlineNs, Event event) {
+        enqueue(EventInsertionType.APPEND, null, __ -> deadlineNs, event);
+    }
+
+    /**
+     * Schedule an event to be run at a specific time.
+     *
+     * @param tag                   If this is non-null, the unique tag to use 
for this
+     *                              event.  If an event with this tag already 
exists, it
+     *                              will be cancelled.
+     * @param deadlineNsCalculator  A function which takes as an argument the 
existing
+     *                              deadline for the event with this tag (or 
null if the
+     *                              event has no tag, or if there is none 
such), and
+     *                              produces the deadline to use for this 
event.
+     * @param event                 The event to schedule.
+     */
+    default void scheduleDeferred(String tag,
+                                  Function<Long, Long> deadlineNsCalculator,
+                                  Event event) {
+        enqueue(EventInsertionType.DEFERRED, tag, deadlineNsCalculator, event);
+    }
+
+    /**
+     * Cancel a deferred event.
+     *
+     * @param tag                   The unique tag for the event to be 
cancelled.  Must be
+     *                              non-null.  If the event with the tag has 
not been
+     *                              scheduled, this call will be ignored.
+     */
+    void cancelDeferred(String tag);
+
+    enum EventInsertionType {
+        PREPEND,
+        APPEND,
+        DEFERRED;
+    }
+
+    /**
+     * Enqueue an event to be run in FIFO order.
+     *
+     * @param insertionType         How to insert the event.
+     *                              PREPEND means insert the event as the 
first thing
+     *                              to run.  APPEND means insert the event as 
the last
+     *                              thing to run.  DEFERRED means insert the 
event to
+     *                              run after a delay.
+     * @param tag                   If this is non-null, the unique tag to use 
for
+     *                              this event.  If an event with this tag 
already
+     *                              exists, it will be cancelled.
+     * @param deadlineNsCalculator  If this is non-null, it is a function 
which takes
+     *                              as an argument the existing deadline for 
the
+     *                              event with this tag (or null if the event 
has no
+     *                              tag, or if there is none such), and 
produces the
+     *                              deadline to use for this event (or null to 
use
+     *                              none.)
+     * @param event                 The event to enqueue.
+     */
+    void enqueue(EventInsertionType insertionType,
+                 String tag,
+                 Function<Long, Long> deadlineNsCalculator,
+                 Event event);
+
+    /**
+     * Asynchronously shut down the event queue with no unnecessary delay.
+     * @see #beginShutdown(String, Event, TimeUnit, long)
+     *
+     * @param source                The source of the shutdown.
+     */
+    default void beginShutdown(String source) {
+        beginShutdown(source, new VoidEvent());
+    }
+
+    /**
+     * Asynchronously shut down the event queue with no unnecessary delay.
+     *
+     * @param source        The source of the shutdown.
+     * @param cleanupEvent  The mandatory event to invoke after all other 
events have
+     *                      been processed.
+     * @see #beginShutdown(String, Event, TimeUnit, long)
+     */
+    default void beginShutdown(String source, Event cleanupEvent) {
+        beginShutdown(source, cleanupEvent, TimeUnit.SECONDS, 0);
+    }
+
+    /**
+     * Asynchronously shut down the event queue.
+     *
+     * No new events will be accepted, and the timeout will be initiated
+     * for all existing events.
+     *
+     * @param source        The source of the shutdown.
+     * @param cleanupEvent  The mandatory event to invoke after all other 
events have
+     *                      been processed.
+     * @param timeUnit      The time unit to use for the timeout.
+     * @param timeSpan      The amount of time to use for the timeout.
+     *                      Once the timeout elapses, any remaining queued
+     *                      events will get a
+     *                      @{org.apache.kafka.common.errors.TimeoutException}.
+     */
+    void beginShutdown(String source, Event cleanupEvent, TimeUnit timeUnit, 
long timeSpan);

Review comment:
       fair enough, I will put timeSpan first




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to