This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-21585
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 6c07c49e60fcc6f912be21d28340c8a100c6641c
Author: amashenkov <[email protected]>
AuthorDate: Tue Mar 5 19:03:54 2024 +0300

    Add DeferredEventsQueue for destruction events.
---
 .../ignite/internal/table/DeferredEventsQueue.java | 123 +++++++++++++++
 .../table/DeferredEventsQueueSelfTest.java         | 174 +++++++++++++++++++++
 2 files changed, 297 insertions(+)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java
new file mode 100644
index 0000000000..af4157ea9e
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/DeferredEventsQueue.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.ToLongFunction;
+
+/**
+ * A queue for deferred events, which provide method to drain events up to 
given watermark. An implementation is a thread-safe wrapper over
+ * {@link java.util.PriorityQueue}.
+ *
+ * @param <T> Event type.
+ */
+public class DeferredEventsQueue<T> {
+    private final PriorityQueue<T> queue;
+    private final ToLongFunction<T> mapper;
+
+    /**
+     * Creates a queue.
+     *
+     * @param mapper Event timestamp extractor.
+     */
+    public DeferredEventsQueue(ToLongFunction<T> mapper) {
+        this.mapper = mapper;
+        this.queue = new 
PriorityQueue<>(Comparator.comparingLong(this.mapper));
+    }
+
+    /**
+     * Offers a new event to the queue.
+     *
+     * @param event New deferred event.
+     */
+    public boolean enqueue(T event) {
+        synchronized (queue) {
+            return queue.offer(event);
+        }
+    }
+
+    /**
+     * Drain queue up to given watermark and return dequeued events.
+     *
+     * @param watermark Timestamp to drain up to.
+     * @return Dequeued events.
+     */
+    public List<T> drainUpTo(long watermark) {
+        synchronized (queue) {
+            if (!hasExpired0(watermark)) {
+                return List.of();
+            }
+
+            List<T> events = new ArrayList<>();
+            do {
+                T event = queue.poll();
+
+                events.add(event);
+            } while (hasExpired0(watermark));
+
+            return events;
+        }
+    }
+
+    /**
+     * Returns queue size.
+     */
+    public int size() {
+        synchronized (queue) {
+            return queue.size();
+        }
+    }
+
+    /**
+     * Returns {@code true} if queue is empty, {@code false} otherwise.
+     */
+    public boolean isEmpty() {
+        synchronized (queue) {
+            return queue.isEmpty();
+        }
+    }
+
+    /**
+     * Returns {@code true} if found events below watermark, {@code false} 
otherwise.
+     */
+    public boolean hasExpiredEvents(long watermark) {
+        synchronized (queue) {
+            return hasExpired0(watermark);
+        }
+    }
+
+    /**
+     * Removes all events from the queue.
+     */
+    public void clear() {
+        synchronized (queue) {
+            queue.clear();
+        }
+    }
+
+    private boolean hasExpired0(long watermark) {
+        assert Thread.holdsLock(queue);
+
+        T next = queue.peek();
+
+        return next != null && mapper.applyAsLong(next) <= watermark;
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/DeferredEventsQueueSelfTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/DeferredEventsQueueSelfTest.java
new file mode 100644
index 0000000000..ed1af41144
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/DeferredEventsQueueSelfTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Objects;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link DeferredEventsQueue}.
+ */
+public class DeferredEventsQueueSelfTest {
+    @Test
+    void testEmptyQueue() {
+        DeferredEventsQueue<Item> queue = new 
DeferredEventsQueue<>(Item::timestamp);
+
+        assertTrue(queue.isEmpty());
+        assertThat(queue.size(), equalTo(0));
+
+        assertFalse(queue.hasExpiredEvents(Long.MAX_VALUE));
+        assertThat(queue.drainUpTo(Long.MAX_VALUE), empty());
+    }
+
+    @Test
+    void testQueueClear() {
+        DeferredEventsQueue<Item> queue = new 
DeferredEventsQueue<>(Item::timestamp);
+
+        assertTrue(queue.isEmpty());
+        assertThat(queue.size(), equalTo(0));
+
+        queue.enqueue(new Item(1, 100L));
+        queue.enqueue(new Item(2, 100L));
+        queue.enqueue(new Item(3, 200L));
+
+        assertFalse(queue.isEmpty());
+        assertThat(queue.size(), equalTo(3));
+
+        queue.clear();
+
+        assertTrue(queue.isEmpty());
+        assertThat(queue.size(), equalTo(0));
+    }
+
+    @Test
+    void testDrainEdgeCases() {
+        DeferredEventsQueue<Item> queue = new 
DeferredEventsQueue<>(Item::timestamp);
+
+        queue.enqueue(new Item(1, 300L));
+        queue.enqueue(new Item(2, 100L));
+        queue.enqueue(new Item(3, 200L));
+
+        assertThat(queue.size(), equalTo(3));
+
+        assertFalse(queue.hasExpiredEvents(Long.MIN_VALUE));
+        assertFalse(queue.hasExpiredEvents(-1));
+        assertFalse(queue.hasExpiredEvents(0L));
+        assertFalse(queue.hasExpiredEvents(2L));
+
+        assertTrue(queue.hasExpiredEvents(100L));
+        assertTrue(queue.hasExpiredEvents(Long.MAX_VALUE));
+
+        // Ensure too low value leas nothing to drain.
+        assertThat(queue.drainUpTo(Long.MIN_VALUE), empty());
+        assertThat(queue.drainUpTo(2L), empty());
+
+        assertThat(queue.size(), equalTo(3));
+
+        // Drain queue.
+        assertThat(queue.drainUpTo(Long.MAX_VALUE), hasItems(
+                new Item(1, 300L),
+                new Item(2, 100L),
+                new Item(3, 200L)
+        ));
+
+        assertThat(queue.size(), equalTo(0));
+    }
+
+    @Test
+    void testDrain() {
+        DeferredEventsQueue<Item> queue = new 
DeferredEventsQueue<>(Item::timestamp);
+
+        queue.enqueue(new Item(1, 300L));
+        queue.enqueue(new Item(2, 100L));
+        queue.enqueue(new Item(3, 200L));
+        queue.enqueue(new Item(4, 300L));
+        queue.enqueue(new Item(5, 200L));
+
+        assertThat(queue.size(), equalTo(5));
+
+        // Drain some values
+        assertThat(queue.drainUpTo(200L), hasItems(
+                new Item(2, 100L),
+                new Item(3, 200L),
+                new Item(5, 200L)
+        ));
+
+        assertThat(queue.size(), equalTo(2));
+
+        // Draining once again has no effect.
+        assertThat(queue.drainUpTo(200L), empty());
+        assertThat(queue.size(), equalTo(2));
+
+        // Can add after drain.
+        queue.enqueue(new Item(6, 100L));
+        assertThat(queue.size(), equalTo(3));
+
+        // Drain queue.
+        assertThat(queue.drainUpTo(300L), hasItems(
+                new Item(6, 100L),
+                new Item(1, 300L),
+                new Item(4, 300L)
+        ));
+
+        assertThat(queue.size(), equalTo(0));
+    }
+
+    static class Item {
+        final long timestamp;
+        final int value;
+
+        Item(int value, long timestamp) {
+            this.timestamp = timestamp;
+            this.value = value;
+        }
+
+        public long timestamp() {
+            return timestamp;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Item item = (Item) o;
+            return timestamp == item.timestamp && value == item.value;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(timestamp, value);
+        }
+
+        @Override
+        public String toString() {
+            return IgniteStringFormatter.format("Item [timestamp={}, 
value={}]", timestamp, value);
+        }
+    }
+}

Reply via email to