[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-05 Thread GitBox


junrao commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570453181



##
File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import org.slf4j.Logger;
+
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public interface EventQueue extends AutoCloseable {
+interface Event {
+/**
+ * Run the event.
+ */
+void run() throws Exception;
+
+/**
+ * Handle an exception that was either generated by running the event, 
or by the
+ * event queue's inability to run the event.
+ *
+ * @param e The exception.  This will be a TimeoutException if the 
event hit
+ *  its deadline before it could be scheduled.
+ *  It will be a RejectedExecutionException if the event 
could not be
+ *  scheduled because the event queue has already been 
closed.
+ *  Otherweise, it will be whatever exception was thrown 
by run().
+ */
+default void handleException(Throwable e) {}
+}
+
+abstract class FailureLoggingEvent implements Event {
+private final Logger log;
+
+public FailureLoggingEvent(Logger log) {
+this.log = log;
+}
+
+@Override
+public void handleException(Throwable e) {
+if (e instanceof RejectedExecutionException) {
+log.info("Not processing {} because the event queue is 
closed.",
+this.toString());
+} else {
+log.error("Unexpected error handling {}", this.toString(), e);
+}
+}
+
+@Override
+public String toString() {
+return this.getClass().getSimpleName();
+}
+}
+
+class NoDeadlineFunction implements Function {
+public static final NoDeadlineFunction INSTANCE = new 
NoDeadlineFunction();
+
+@Override
+public OptionalLong apply(OptionalLong ignored) {
+return OptionalLong.empty();
+}
+}
+
+class DeadlineFunction implements Function {
+private final long deadlineNs;
+
+public DeadlineFunction(long deadlineNs) {
+this.deadlineNs = deadlineNs;
+}
+
+@Override
+public OptionalLong apply(OptionalLong ignored) {
+return OptionalLong.of(deadlineNs);
+}
+}
+
+class EarliestDeadlineFunction implements Function {
+private final long newDeadlineNs;
+
+public EarliestDeadlineFunction(long newDeadlineNs) {
+this.newDeadlineNs = newDeadlineNs;
+}
+
+@Override
+public OptionalLong apply(OptionalLong prevDeadlineNs) {
+if (!prevDeadlineNs.isPresent()) {
+return OptionalLong.of(newDeadlineNs);
+} else if (prevDeadlineNs.getAsLong() < newDeadlineNs) {
+return prevDeadlineNs;
+} else {
+return OptionalLong.of(newDeadlineNs);
+}
+}
+}
+
+class VoidEvent implements Event {
+public final static VoidEvent INSTANCE = new VoidEvent();
+
+@Override
+public void run() throws Exception {
+}
+}
+
+/**
+ * Add an element to the front of the queue.
+ *
+ * @param event The mandatory event to prepend.
+ */
+default void prepend(Event event) {
+enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+}
+
+/**
+ * Add an element to the end of the queue.
+ *
+ * @param event The event to append.
+ */
+default void append(Event event) {
+enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+}
+
+/**
+ * Add an event to the end of the queue.
+ *
+ * @param deadlineNsThe deadline for starting the event, in 
monotonic
+ *  

[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


junrao commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570453181



##
File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import org.slf4j.Logger;
+
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public interface EventQueue extends AutoCloseable {
+interface Event {
+/**
+ * Run the event.
+ */
+void run() throws Exception;
+
+/**
+ * Handle an exception that was either generated by running the event, 
or by the
+ * event queue's inability to run the event.
+ *
+ * @param e The exception.  This will be a TimeoutException if the 
event hit
+ *  its deadline before it could be scheduled.
+ *  It will be a RejectedExecutionException if the event 
could not be
+ *  scheduled because the event queue has already been 
closed.
+ *  Otherweise, it will be whatever exception was thrown 
by run().
+ */
+default void handleException(Throwable e) {}
+}
+
+abstract class FailureLoggingEvent implements Event {
+private final Logger log;
+
+public FailureLoggingEvent(Logger log) {
+this.log = log;
+}
+
+@Override
+public void handleException(Throwable e) {
+if (e instanceof RejectedExecutionException) {
+log.info("Not processing {} because the event queue is 
closed.",
+this.toString());
+} else {
+log.error("Unexpected error handling {}", this.toString(), e);
+}
+}
+
+@Override
+public String toString() {
+return this.getClass().getSimpleName();
+}
+}
+
+class NoDeadlineFunction implements Function {
+public static final NoDeadlineFunction INSTANCE = new 
NoDeadlineFunction();
+
+@Override
+public OptionalLong apply(OptionalLong ignored) {
+return OptionalLong.empty();
+}
+}
+
+class DeadlineFunction implements Function {
+private final long deadlineNs;
+
+public DeadlineFunction(long deadlineNs) {
+this.deadlineNs = deadlineNs;
+}
+
+@Override
+public OptionalLong apply(OptionalLong ignored) {
+return OptionalLong.of(deadlineNs);
+}
+}
+
+class EarliestDeadlineFunction implements Function {
+private final long newDeadlineNs;
+
+public EarliestDeadlineFunction(long newDeadlineNs) {
+this.newDeadlineNs = newDeadlineNs;
+}
+
+@Override
+public OptionalLong apply(OptionalLong prevDeadlineNs) {
+if (!prevDeadlineNs.isPresent()) {
+return OptionalLong.of(newDeadlineNs);
+} else if (prevDeadlineNs.getAsLong() < newDeadlineNs) {
+return prevDeadlineNs;
+} else {
+return OptionalLong.of(newDeadlineNs);
+}
+}
+}
+
+class VoidEvent implements Event {
+public final static VoidEvent INSTANCE = new VoidEvent();
+
+@Override
+public void run() throws Exception {
+}
+}
+
+/**
+ * Add an element to the front of the queue.
+ *
+ * @param event The mandatory event to prepend.
+ */
+default void prepend(Event event) {
+enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+}
+
+/**
+ * Add an element to the end of the queue.
+ *
+ * @param event The event to append.
+ */
+default void append(Event event) {
+enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+}
+
+/**
+ * Add an event to the end of the queue.
+ *
+ * @param deadlineNsThe deadline for starting the event, in 
monotonic
+ *  

[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-03 Thread GitBox


junrao commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r569890466



##
File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+
+public final class KafkaEventQueue implements EventQueue {
+/**
+ * A context object that wraps events.
+ */
+private static class EventContext {
+/**
+ * The caller-supplied event.
+ */
+private final Event event;
+
+/**
+ * How this event was inserted.
+ */
+private final EventInsertionType insertionType;
+
+/**
+ * The previous pointer of our circular doubly-linked list.
+ */
+private EventContext prev = this;
+
+/**
+ * The next pointer in our circular doubly-linked list.
+ */
+private EventContext next = this;
+
+/**
+ * If this event is in the delay map, this is the key it is there 
under.
+ * If it is not in the map, this is null.
+ */
+private Long deadlineNs = null;
+
+/**
+ * The tag associated with this event.
+ */
+private String tag;
+
+EventContext(Event event, EventInsertionType insertionType, String 
tag) {
+this.event = event;
+this.insertionType = insertionType;
+this.tag = tag;
+}
+
+/**
+ * Insert a new node in the circularly linked list after this node.
+ */
+void insertAfter(EventContext other) {
+this.next.prev = other;
+other.next = this.next;
+other.prev = this;
+this.next = other;
+}
+
+/**
+ * Insert a new node in the circularly linked list before this node.
+ */
+void insertBefore(EventContext other) {
+this.prev.next = other;
+other.prev = this.prev;
+other.next = this;
+this.prev = other;
+}
+
+/**
+ * Remove this node from the circularly linked list.
+ */
+void remove() {
+this.prev.next = this.next;
+this.next.prev = this.prev;
+this.prev = this;
+this.next = this;
+}
+
+/**
+ * Returns true if this node is the only element in its list.
+ */
+boolean isSingleton() {
+return prev == this && next == this;
+}
+
+/**
+ * Run the event associated with this EventContext.
+ */
+void run() throws InterruptedException {
+try {
+event.run();
+} catch (InterruptedException e) {
+throw e;
+} catch (Exception e) {
+event.handleException(e);
+}
+}
+
+/**
+ * Complete the event associated with this EventContext with a timeout 
exception.
+ */
+void completeWithTimeout() {
+completeWithException(new TimeoutException());
+}
+
+/**
+ * Complete the event associated with this EventContext with the 
specified
+ * exception.
+ */
+void completeWithException(Throwable t) {
+event.handleException(t);
+}
+}
+
+private class EventHandler implements Runnable {
+/**
+ * Event contexts indexed by tag.  Events without a tag are not 
included here.
+ */
+private final Map tagToEventContext = new 
HashMap<>();
+
+/**
+ * The head of the event queue.
+

[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-03 Thread GitBox


junrao commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r569820385



##
File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+
+public final class KafkaEventQueue implements EventQueue {
+/**
+ * A context object that wraps events.
+ */
+private static class EventContext {
+/**
+ * The caller-supplied event.
+ */
+private final Event event;
+
+/**
+ * How this event was inserted.
+ */
+private final EventInsertionType insertionType;
+
+/**
+ * The previous pointer of our circular doubly-linked list.
+ */
+private EventContext prev = this;
+
+/**
+ * The next pointer in our circular doubly-linked list.
+ */
+private EventContext next = this;
+
+/**
+ * If this event is in the delay map, this is the key it is there 
under.
+ * If it is not in the map, this is null.
+ */
+private Long deadlineNs = null;
+
+/**
+ * The tag associated with this event.
+ */
+private String tag;
+
+EventContext(Event event, EventInsertionType insertionType, String 
tag) {
+this.event = event;
+this.insertionType = insertionType;
+this.tag = tag;
+}
+
+/**
+ * Insert a new node in the circularly linked list after this node.
+ */
+void insertAfter(EventContext other) {
+this.next.prev = other;
+other.next = this.next;
+other.prev = this;
+this.next = other;
+}
+
+/**
+ * Insert a new node in the circularly linked list before this node.
+ */
+void insertBefore(EventContext other) {
+this.prev.next = other;
+other.prev = this.prev;
+other.next = this;
+this.prev = other;
+}
+
+/**
+ * Remove this node from the circularly linked list.
+ */
+void remove() {
+this.prev.next = this.next;
+this.next.prev = this.prev;
+this.prev = this;
+this.next = this;
+}
+
+/**
+ * Returns true if this node is the only element in its list.
+ */
+boolean isSingleton() {
+return prev == this && next == this;
+}
+
+/**
+ * Run the event associated with this EventContext.
+ */
+void run() throws InterruptedException {
+try {
+event.run();
+} catch (InterruptedException e) {
+throw e;
+} catch (Exception e) {
+event.handleException(e);
+}
+}
+
+/**
+ * Complete the event associated with this EventContext with a timeout 
exception.
+ */
+void completeWithTimeout() {
+completeWithException(new TimeoutException());
+}
+
+/**
+ * Complete the event associated with this EventContext with the 
specified
+ * exception.
+ */
+void completeWithException(Throwable t) {
+event.handleException(t);
+}
+}
+
+private class EventHandler implements Runnable {
+/**
+ * Event contexts indexed by tag.  Events without a tag are not 
included here.
+ */
+private final Map tagToEventContext = new 
HashMap<>();
+
+/**
+ * The head of the event queue.
+

[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-03 Thread GitBox


junrao commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r56973



##
File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+
+public final class KafkaEventQueue implements EventQueue {
+/**
+ * A context object that wraps events.
+ */
+private static class EventContext {
+/**
+ * The caller-supplied event.
+ */
+private final Event event;
+
+/**
+ * How this event was inserted.
+ */
+private final EventInsertionType insertionType;
+
+/**
+ * The previous pointer of our circular doubly-linked list.
+ */
+private EventContext prev = this;
+
+/**
+ * The next pointer in our circular doubly-linked list.
+ */
+private EventContext next = this;
+
+/**
+ * If this event is in the delay map, this is the key it is there 
under.
+ * If it is not in the map, this is null.
+ */
+private Long deadlineNs = null;
+
+/**
+ * The tag associated with this event.
+ */
+private String tag;
+
+EventContext(Event event, EventInsertionType insertionType, String 
tag) {
+this.event = event;
+this.insertionType = insertionType;
+this.tag = tag;
+}
+
+/**
+ * Insert a new node in the circularly linked list after this node.
+ */
+void insertAfter(EventContext other) {
+this.next.prev = other;
+other.next = this.next;
+other.prev = this;
+this.next = other;
+}
+
+/**
+ * Insert a new node in the circularly linked list before this node.
+ */
+void insertBefore(EventContext other) {
+this.prev.next = other;
+other.prev = this.prev;
+other.next = this;
+this.prev = other;
+}
+
+/**
+ * Remove this node from the circularly linked list.
+ */
+void remove() {
+this.prev.next = this.next;
+this.next.prev = this.prev;
+this.prev = this;
+this.next = this;
+}
+
+/**
+ * Returns true if this node is the only element in its list.
+ */
+boolean isSingleton() {
+return prev == this && next == this;
+}
+
+/**
+ * Run the event associated with this EventContext.
+ */
+void run() throws InterruptedException {
+try {
+event.run();
+} catch (InterruptedException e) {
+throw e;
+} catch (Exception e) {
+event.handleException(e);
+}
+}
+
+/**
+ * Complete the event associated with this EventContext with a timeout 
exception.
+ */
+void completeWithTimeout() {
+completeWithException(new TimeoutException());
+}
+
+/**
+ * Complete the event associated with this EventContext with the 
specified
+ * exception.
+ */
+void completeWithException(Throwable t) {
+event.handleException(t);
+}
+}
+
+private class EventHandler implements Runnable {
+/**
+ * Event contexts indexed by tag.  Events without a tag are not 
included here.
+ */
+private final Map tagToEventContext = new 
HashMap<>();
+
+/**
+ * The head of the event queue.
+