This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-21585 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 6c07c49e60fcc6f912be21d28340c8a100c6641c Author: amashenkov <[email protected]> AuthorDate: Tue Mar 5 19:03:54 2024 +0300 Add DeferredEventsQueue for destruction events. --- .../ignite/internal/table/DeferredEventsQueue.java | 123 +++++++++++++++ .../table/DeferredEventsQueueSelfTest.java | 174 +++++++++++++++++++++ 2 files changed, 297 insertions(+) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java b/modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java new file mode 100644 index 0000000000..af4157ea9e --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.function.ToLongFunction; + +/** + * A queue for deferred events, which provide method to drain events up to given watermark. An implementation is a thread-safe wrapper over + * {@link java.util.PriorityQueue}. + * + * @param <T> Event type. + */ +public class DeferredEventsQueue<T> { + private final PriorityQueue<T> queue; + private final ToLongFunction<T> mapper; + + /** + * Creates a queue. + * + * @param mapper Event timestamp extractor. + */ + public DeferredEventsQueue(ToLongFunction<T> mapper) { + this.mapper = mapper; + this.queue = new PriorityQueue<>(Comparator.comparingLong(this.mapper)); + } + + /** + * Offers a new event to the queue. + * + * @param event New deferred event. + */ + public boolean enqueue(T event) { + synchronized (queue) { + return queue.offer(event); + } + } + + /** + * Drain queue up to given watermark and return dequeued events. + * + * @param watermark Timestamp to drain up to. + * @return Dequeued events. + */ + public List<T> drainUpTo(long watermark) { + synchronized (queue) { + if (!hasExpired0(watermark)) { + return List.of(); + } + + List<T> events = new ArrayList<>(); + do { + T event = queue.poll(); + + events.add(event); + } while (hasExpired0(watermark)); + + return events; + } + } + + /** + * Returns queue size. + */ + public int size() { + synchronized (queue) { + return queue.size(); + } + } + + /** + * Returns {@code true} if queue is empty, {@code false} otherwise. + */ + public boolean isEmpty() { + synchronized (queue) { + return queue.isEmpty(); + } + } + + /** + * Returns {@code true} if found events below watermark, {@code false} otherwise. + */ + public boolean hasExpiredEvents(long watermark) { + synchronized (queue) { + return hasExpired0(watermark); + } + } + + /** + * Removes all events from the queue. + */ + public void clear() { + synchronized (queue) { + queue.clear(); + } + } + + private boolean hasExpired0(long watermark) { + assert Thread.holdsLock(queue); + + T next = queue.peek(); + + return next != null && mapper.applyAsLong(next) <= watermark; + } +} diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/DeferredEventsQueueSelfTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/DeferredEventsQueueSelfTest.java new file mode 100644 index 0000000000..ed1af41144 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/DeferredEventsQueueSelfTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Objects; +import org.apache.ignite.internal.lang.IgniteStringFormatter; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link DeferredEventsQueue}. + */ +public class DeferredEventsQueueSelfTest { + @Test + void testEmptyQueue() { + DeferredEventsQueue<Item> queue = new DeferredEventsQueue<>(Item::timestamp); + + assertTrue(queue.isEmpty()); + assertThat(queue.size(), equalTo(0)); + + assertFalse(queue.hasExpiredEvents(Long.MAX_VALUE)); + assertThat(queue.drainUpTo(Long.MAX_VALUE), empty()); + } + + @Test + void testQueueClear() { + DeferredEventsQueue<Item> queue = new DeferredEventsQueue<>(Item::timestamp); + + assertTrue(queue.isEmpty()); + assertThat(queue.size(), equalTo(0)); + + queue.enqueue(new Item(1, 100L)); + queue.enqueue(new Item(2, 100L)); + queue.enqueue(new Item(3, 200L)); + + assertFalse(queue.isEmpty()); + assertThat(queue.size(), equalTo(3)); + + queue.clear(); + + assertTrue(queue.isEmpty()); + assertThat(queue.size(), equalTo(0)); + } + + @Test + void testDrainEdgeCases() { + DeferredEventsQueue<Item> queue = new DeferredEventsQueue<>(Item::timestamp); + + queue.enqueue(new Item(1, 300L)); + queue.enqueue(new Item(2, 100L)); + queue.enqueue(new Item(3, 200L)); + + assertThat(queue.size(), equalTo(3)); + + assertFalse(queue.hasExpiredEvents(Long.MIN_VALUE)); + assertFalse(queue.hasExpiredEvents(-1)); + assertFalse(queue.hasExpiredEvents(0L)); + assertFalse(queue.hasExpiredEvents(2L)); + + assertTrue(queue.hasExpiredEvents(100L)); + assertTrue(queue.hasExpiredEvents(Long.MAX_VALUE)); + + // Ensure too low value leas nothing to drain. + assertThat(queue.drainUpTo(Long.MIN_VALUE), empty()); + assertThat(queue.drainUpTo(2L), empty()); + + assertThat(queue.size(), equalTo(3)); + + // Drain queue. + assertThat(queue.drainUpTo(Long.MAX_VALUE), hasItems( + new Item(1, 300L), + new Item(2, 100L), + new Item(3, 200L) + )); + + assertThat(queue.size(), equalTo(0)); + } + + @Test + void testDrain() { + DeferredEventsQueue<Item> queue = new DeferredEventsQueue<>(Item::timestamp); + + queue.enqueue(new Item(1, 300L)); + queue.enqueue(new Item(2, 100L)); + queue.enqueue(new Item(3, 200L)); + queue.enqueue(new Item(4, 300L)); + queue.enqueue(new Item(5, 200L)); + + assertThat(queue.size(), equalTo(5)); + + // Drain some values + assertThat(queue.drainUpTo(200L), hasItems( + new Item(2, 100L), + new Item(3, 200L), + new Item(5, 200L) + )); + + assertThat(queue.size(), equalTo(2)); + + // Draining once again has no effect. + assertThat(queue.drainUpTo(200L), empty()); + assertThat(queue.size(), equalTo(2)); + + // Can add after drain. + queue.enqueue(new Item(6, 100L)); + assertThat(queue.size(), equalTo(3)); + + // Drain queue. + assertThat(queue.drainUpTo(300L), hasItems( + new Item(6, 100L), + new Item(1, 300L), + new Item(4, 300L) + )); + + assertThat(queue.size(), equalTo(0)); + } + + static class Item { + final long timestamp; + final int value; + + Item(int value, long timestamp) { + this.timestamp = timestamp; + this.value = value; + } + + public long timestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Item item = (Item) o; + return timestamp == item.timestamp && value == item.value; + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, value); + } + + @Override + public String toString() { + return IgniteStringFormatter.format("Item [timestamp={}, value={}]", timestamp, value); + } + } +}
