[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
junrao commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r570453181 ## File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import org.slf4j.Logger; + +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public interface EventQueue extends AutoCloseable { +interface Event { +/** + * Run the event. + */ +void run() throws Exception; + +/** + * Handle an exception that was either generated by running the event, or by the + * event queue's inability to run the event. + * + * @param e The exception. This will be a TimeoutException if the event hit + * its deadline before it could be scheduled. + * It will be a RejectedExecutionException if the event could not be + * scheduled because the event queue has already been closed. + * Otherweise, it will be whatever exception was thrown by run(). + */ +default void handleException(Throwable e) {} +} + +abstract class FailureLoggingEvent implements Event { +private final Logger log; + +public FailureLoggingEvent(Logger log) { +this.log = log; +} + +@Override +public void handleException(Throwable e) { +if (e instanceof RejectedExecutionException) { +log.info("Not processing {} because the event queue is closed.", +this.toString()); +} else { +log.error("Unexpected error handling {}", this.toString(), e); +} +} + +@Override +public String toString() { +return this.getClass().getSimpleName(); +} +} + +class NoDeadlineFunction implements Function { +public static final NoDeadlineFunction INSTANCE = new NoDeadlineFunction(); + +@Override +public OptionalLong apply(OptionalLong ignored) { +return OptionalLong.empty(); +} +} + +class DeadlineFunction implements Function { +private final long deadlineNs; + +public DeadlineFunction(long deadlineNs) { +this.deadlineNs = deadlineNs; +} + +@Override +public OptionalLong apply(OptionalLong ignored) { +return OptionalLong.of(deadlineNs); +} +} + +class EarliestDeadlineFunction implements Function { +private final long newDeadlineNs; + +public EarliestDeadlineFunction(long newDeadlineNs) { +this.newDeadlineNs = newDeadlineNs; +} + +@Override +public OptionalLong apply(OptionalLong prevDeadlineNs) { +if (!prevDeadlineNs.isPresent()) { +return OptionalLong.of(newDeadlineNs); +} else if (prevDeadlineNs.getAsLong() < newDeadlineNs) { +return prevDeadlineNs; +} else { +return OptionalLong.of(newDeadlineNs); +} +} +} + +class VoidEvent implements Event { +public final static VoidEvent INSTANCE = new VoidEvent(); + +@Override +public void run() throws Exception { +} +} + +/** + * Add an element to the front of the queue. + * + * @param event The mandatory event to prepend. + */ +default void prepend(Event event) { +enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, event); +} + +/** + * Add an element to the end of the queue. + * + * @param event The event to append. + */ +default void append(Event event) { +enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, event); +} + +/** + * Add an event to the end of the queue. + * + * @param deadlineNsThe deadline for starting the event, in monotonic + *
[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
junrao commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r570453181 ## File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java ## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import org.slf4j.Logger; + +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public interface EventQueue extends AutoCloseable { +interface Event { +/** + * Run the event. + */ +void run() throws Exception; + +/** + * Handle an exception that was either generated by running the event, or by the + * event queue's inability to run the event. + * + * @param e The exception. This will be a TimeoutException if the event hit + * its deadline before it could be scheduled. + * It will be a RejectedExecutionException if the event could not be + * scheduled because the event queue has already been closed. + * Otherweise, it will be whatever exception was thrown by run(). + */ +default void handleException(Throwable e) {} +} + +abstract class FailureLoggingEvent implements Event { +private final Logger log; + +public FailureLoggingEvent(Logger log) { +this.log = log; +} + +@Override +public void handleException(Throwable e) { +if (e instanceof RejectedExecutionException) { +log.info("Not processing {} because the event queue is closed.", +this.toString()); +} else { +log.error("Unexpected error handling {}", this.toString(), e); +} +} + +@Override +public String toString() { +return this.getClass().getSimpleName(); +} +} + +class NoDeadlineFunction implements Function { +public static final NoDeadlineFunction INSTANCE = new NoDeadlineFunction(); + +@Override +public OptionalLong apply(OptionalLong ignored) { +return OptionalLong.empty(); +} +} + +class DeadlineFunction implements Function { +private final long deadlineNs; + +public DeadlineFunction(long deadlineNs) { +this.deadlineNs = deadlineNs; +} + +@Override +public OptionalLong apply(OptionalLong ignored) { +return OptionalLong.of(deadlineNs); +} +} + +class EarliestDeadlineFunction implements Function { +private final long newDeadlineNs; + +public EarliestDeadlineFunction(long newDeadlineNs) { +this.newDeadlineNs = newDeadlineNs; +} + +@Override +public OptionalLong apply(OptionalLong prevDeadlineNs) { +if (!prevDeadlineNs.isPresent()) { +return OptionalLong.of(newDeadlineNs); +} else if (prevDeadlineNs.getAsLong() < newDeadlineNs) { +return prevDeadlineNs; +} else { +return OptionalLong.of(newDeadlineNs); +} +} +} + +class VoidEvent implements Event { +public final static VoidEvent INSTANCE = new VoidEvent(); + +@Override +public void run() throws Exception { +} +} + +/** + * Add an element to the front of the queue. + * + * @param event The mandatory event to prepend. + */ +default void prepend(Event event) { +enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, event); +} + +/** + * Add an element to the end of the queue. + * + * @param event The event to append. + */ +default void append(Event event) { +enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, event); +} + +/** + * Add an event to the end of the queue. + * + * @param deadlineNsThe deadline for starting the event, in monotonic + *
[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
junrao commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r569890466 ## File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + + +public final class KafkaEventQueue implements EventQueue { +/** + * A context object that wraps events. + */ +private static class EventContext { +/** + * The caller-supplied event. + */ +private final Event event; + +/** + * How this event was inserted. + */ +private final EventInsertionType insertionType; + +/** + * The previous pointer of our circular doubly-linked list. + */ +private EventContext prev = this; + +/** + * The next pointer in our circular doubly-linked list. + */ +private EventContext next = this; + +/** + * If this event is in the delay map, this is the key it is there under. + * If it is not in the map, this is null. + */ +private Long deadlineNs = null; + +/** + * The tag associated with this event. + */ +private String tag; + +EventContext(Event event, EventInsertionType insertionType, String tag) { +this.event = event; +this.insertionType = insertionType; +this.tag = tag; +} + +/** + * Insert a new node in the circularly linked list after this node. + */ +void insertAfter(EventContext other) { +this.next.prev = other; +other.next = this.next; +other.prev = this; +this.next = other; +} + +/** + * Insert a new node in the circularly linked list before this node. + */ +void insertBefore(EventContext other) { +this.prev.next = other; +other.prev = this.prev; +other.next = this; +this.prev = other; +} + +/** + * Remove this node from the circularly linked list. + */ +void remove() { +this.prev.next = this.next; +this.next.prev = this.prev; +this.prev = this; +this.next = this; +} + +/** + * Returns true if this node is the only element in its list. + */ +boolean isSingleton() { +return prev == this && next == this; +} + +/** + * Run the event associated with this EventContext. + */ +void run() throws InterruptedException { +try { +event.run(); +} catch (InterruptedException e) { +throw e; +} catch (Exception e) { +event.handleException(e); +} +} + +/** + * Complete the event associated with this EventContext with a timeout exception. + */ +void completeWithTimeout() { +completeWithException(new TimeoutException()); +} + +/** + * Complete the event associated with this EventContext with the specified + * exception. + */ +void completeWithException(Throwable t) { +event.handleException(t); +} +} + +private class EventHandler implements Runnable { +/** + * Event contexts indexed by tag. Events without a tag are not included here. + */ +private final Map tagToEventContext = new HashMap<>(); + +/** + * The head of the event queue. +
[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
junrao commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r569820385 ## File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + + +public final class KafkaEventQueue implements EventQueue { +/** + * A context object that wraps events. + */ +private static class EventContext { +/** + * The caller-supplied event. + */ +private final Event event; + +/** + * How this event was inserted. + */ +private final EventInsertionType insertionType; + +/** + * The previous pointer of our circular doubly-linked list. + */ +private EventContext prev = this; + +/** + * The next pointer in our circular doubly-linked list. + */ +private EventContext next = this; + +/** + * If this event is in the delay map, this is the key it is there under. + * If it is not in the map, this is null. + */ +private Long deadlineNs = null; + +/** + * The tag associated with this event. + */ +private String tag; + +EventContext(Event event, EventInsertionType insertionType, String tag) { +this.event = event; +this.insertionType = insertionType; +this.tag = tag; +} + +/** + * Insert a new node in the circularly linked list after this node. + */ +void insertAfter(EventContext other) { +this.next.prev = other; +other.next = this.next; +other.prev = this; +this.next = other; +} + +/** + * Insert a new node in the circularly linked list before this node. + */ +void insertBefore(EventContext other) { +this.prev.next = other; +other.prev = this.prev; +other.next = this; +this.prev = other; +} + +/** + * Remove this node from the circularly linked list. + */ +void remove() { +this.prev.next = this.next; +this.next.prev = this.prev; +this.prev = this; +this.next = this; +} + +/** + * Returns true if this node is the only element in its list. + */ +boolean isSingleton() { +return prev == this && next == this; +} + +/** + * Run the event associated with this EventContext. + */ +void run() throws InterruptedException { +try { +event.run(); +} catch (InterruptedException e) { +throw e; +} catch (Exception e) { +event.handleException(e); +} +} + +/** + * Complete the event associated with this EventContext with a timeout exception. + */ +void completeWithTimeout() { +completeWithException(new TimeoutException()); +} + +/** + * Complete the event associated with this EventContext with the specified + * exception. + */ +void completeWithException(Throwable t) { +event.handleException(t); +} +} + +private class EventHandler implements Runnable { +/** + * Event contexts indexed by tag. Events without a tag are not included here. + */ +private final Map tagToEventContext = new HashMap<>(); + +/** + * The head of the event queue. +
[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue
junrao commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r56973 ## File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java ## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + + +public final class KafkaEventQueue implements EventQueue { +/** + * A context object that wraps events. + */ +private static class EventContext { +/** + * The caller-supplied event. + */ +private final Event event; + +/** + * How this event was inserted. + */ +private final EventInsertionType insertionType; + +/** + * The previous pointer of our circular doubly-linked list. + */ +private EventContext prev = this; + +/** + * The next pointer in our circular doubly-linked list. + */ +private EventContext next = this; + +/** + * If this event is in the delay map, this is the key it is there under. + * If it is not in the map, this is null. + */ +private Long deadlineNs = null; + +/** + * The tag associated with this event. + */ +private String tag; + +EventContext(Event event, EventInsertionType insertionType, String tag) { +this.event = event; +this.insertionType = insertionType; +this.tag = tag; +} + +/** + * Insert a new node in the circularly linked list after this node. + */ +void insertAfter(EventContext other) { +this.next.prev = other; +other.next = this.next; +other.prev = this; +this.next = other; +} + +/** + * Insert a new node in the circularly linked list before this node. + */ +void insertBefore(EventContext other) { +this.prev.next = other; +other.prev = this.prev; +other.next = this; +this.prev = other; +} + +/** + * Remove this node from the circularly linked list. + */ +void remove() { +this.prev.next = this.next; +this.next.prev = this.prev; +this.prev = this; +this.next = this; +} + +/** + * Returns true if this node is the only element in its list. + */ +boolean isSingleton() { +return prev == this && next == this; +} + +/** + * Run the event associated with this EventContext. + */ +void run() throws InterruptedException { +try { +event.run(); +} catch (InterruptedException e) { +throw e; +} catch (Exception e) { +event.handleException(e); +} +} + +/** + * Complete the event associated with this EventContext with a timeout exception. + */ +void completeWithTimeout() { +completeWithException(new TimeoutException()); +} + +/** + * Complete the event associated with this EventContext with the specified + * exception. + */ +void completeWithException(Throwable t) { +event.handleException(t); +} +} + +private class EventHandler implements Runnable { +/** + * Event contexts indexed by tag. Events without a tag are not included here. + */ +private final Map tagToEventContext = new HashMap<>(); + +/** + * The head of the event queue. +