This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new ebe2176  [runtime] ActionExecutionOperator handle watermark properly 
(#186)
ebe2176 is described below

commit ebe21764e50a0775929a951527a1432b74869950
Author: Eugene <[email protected]>
AuthorDate: Fri Sep 26 13:14:32 2025 +0800

    [runtime] ActionExecutionOperator handle watermark properly (#186)
---
 .../java/org/apache/flink/agents/api/Event.java    |  14 ++
 .../agents/runtime/context/RunnerContextImpl.java  |   5 +-
 .../runtime/operator/ActionExecutionOperator.java  |  56 +++++-
 .../agents/runtime/operator/JavaActionTask.java    |   3 +-
 .../agents/runtime/operator/queue/KeySegment.java  |  64 ++++++
 .../runtime/operator/queue/SegmentedQueue.java     | 108 +++++++++++
 .../runtime/python/operator/PythonActionTask.java  |   3 +-
 .../python/operator/PythonGeneratorActionTask.java |   5 +-
 .../runtime/python/utils/PythonActionExecutor.java |   2 +-
 .../operator/ActionExecutionOperatorTest.java      |  45 +++++
 .../runtime/operator/queue/KeySegmentTest.java     | 137 +++++++++++++
 .../runtime/operator/queue/SegmentedQueueTest.java | 216 +++++++++++++++++++++
 12 files changed, 651 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/org/apache/flink/agents/api/Event.java 
b/api/src/main/java/org/apache/flink/agents/api/Event.java
index eabd2f7..e0e77e5 100644
--- a/api/src/main/java/org/apache/flink/agents/api/Event.java
+++ b/api/src/main/java/org/apache/flink/agents/api/Event.java
@@ -28,6 +28,8 @@ import java.util.UUID;
 public abstract class Event {
     private final UUID id;
     private final Map<String, Object> attributes;
+    /** The timestamp of the source record. */
+    private Long sourceTimestamp;
 
     public Event() {
         this(UUID.randomUUID(), new HashMap<>());
@@ -54,4 +56,16 @@ public abstract class Event {
     public void setAttr(String name, Object value) {
         attributes.put(name, value);
     }
+
+    public boolean hasSourceTimestamp() {
+        return sourceTimestamp != null;
+    }
+
+    public Long getSourceTimestamp() {
+        return sourceTimestamp;
+    }
+
+    public void setSourceTimestamp(long timestamp) {
+        this.sourceTimestamp = timestamp;
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index e8ca999..d998af6 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -90,9 +90,12 @@ public class RunnerContextImpl implements RunnerContext {
         pendingEvents.add(event);
     }
 
-    public List<Event> drainEvents() {
+    public List<Event> drainEvents(Long timestamp) {
         mailboxThreadChecker.run();
         List<Event> list = new ArrayList<>(this.pendingEvents);
+        if (timestamp != null) {
+            list.forEach(event -> event.setSourceTimestamp(timestamp));
+        }
         this.pendingEvents.clear();
         return list;
     }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index b91fa52..2543dee 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.agents.runtime.memory.CachedMemoryStore;
 import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
 import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
 import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
+import org.apache.flink.agents.runtime.operator.queue.SegmentedQueue;
 import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.python.operator.PythonActionTask;
@@ -65,6 +66,7 @@ import 
org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
@@ -105,6 +107,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private static final String RECOVERY_MARKER_STATE_NAME = "recoveryMarker";
     private static final String MESSAGE_SEQUENCE_NUMBER_STATE_NAME = 
"messageSequenceNumber";
+    private static final String PENDING_INPUT_EVENT_STATE_NAME = 
"pendingInputEvents";
 
     private final AgentPlan agentPlan;
 
@@ -121,6 +124,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private transient BuiltInMetrics builtInMetrics;
 
+    private transient SegmentedQueue keySegmentQueue;
+
     private final transient MailboxExecutor mailboxExecutor;
 
     // We need to check whether the current thread is the mailbox thread using 
the mailbox
@@ -190,6 +195,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup());
         builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan);
 
+        keySegmentQueue = new SegmentedQueue();
+
         // init the action state store with proper implementation
         if (actionStateStore == null
                 && KAFKA.getType()
@@ -224,7 +231,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                 getRuntimeContext()
                         .getListState(
                                 new ListStateDescriptor<>(
-                                        "pendingInputEvents", 
TypeInformation.of(Event.class)));
+                                        PENDING_INPUT_EVENT_STATE_NAME,
+                                        TypeInformation.of(Event.class)));
         // We use UnionList here to ensure that the task can access all keys 
after parallelism
         // modifications.
         // Subsequent steps {@link #tryResumeProcessActionTasks} will then 
filter out keys that do
@@ -257,6 +265,12 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         eventLogger.open(new EventLoggerOpenParams(runtimeContext));
     }
 
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        keySegmentQueue.addWatermark(mark);
+        processEligibleWatermarks();
+    }
+
     @Override
     public void processElement(StreamRecord<IN> record) throws Exception {
         IN input = record.getValue();
@@ -264,6 +278,11 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
         // wrap to InputEvent first
         Event inputEvent = wrapToInputEvent(input);
+        if (record.hasTimestamp()) {
+            inputEvent.setSourceTimestamp(record.getTimestamp());
+        }
+
+        keySegmentQueue.addKeyToLastSegment(getCurrentKey());
 
         if (currentKeyHasMoreActionTask()) {
             // If there are already actions being processed for the current 
key, the newly incoming
@@ -287,7 +306,12 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         if (EventUtil.isOutputEvent(event)) {
             // If the event is an OutputEvent, we send it downstream.
             OUT outputData = getOutputFromOutputEvent(event);
-            output.collect(reusedStreamRecord.replace(outputData));
+            if (event.hasSourceTimestamp()) {
+                output.collect(reusedStreamRecord.replace(outputData, 
event.getSourceTimestamp()));
+            } else {
+                reusedStreamRecord.eraseTimestamp();
+                output.collect(reusedStreamRecord.replace(outputData));
+            }
         } else {
             if (isInputEvent) {
                 // If the event is an InputEvent, we mark that the key is 
currently being processed.
@@ -355,6 +379,10 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                             + key
                             + " should be 1, but got "
                             + removedCount);
+            checkState(
+                    keySegmentQueue.removeKey(key),
+                    "Current key" + key + " is missing from the 
segmentedQueue.");
+            processEligibleWatermarks();
             return;
         }
 
@@ -435,6 +463,10 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                             + key
                             + " should be 1, but got "
                             + removedCount);
+            checkState(
+                    keySegmentQueue.removeKey(key),
+                    "Current key" + key + " is missing from the 
segmentedQueue.");
+            processEligibleWatermarks();
             Event pendingInputEvent = 
pollFromListState(pendingInputEventsKState);
             if (pendingInputEvent != null) {
                 processEvent(key, pendingInputEvent);
@@ -655,10 +687,22 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         Iterable<Object> keys = currentProcessingKeysOpState.get();
         if (keys != null) {
             for (Object key : keys) {
+                keySegmentQueue.addKeyToLastSegment(key);
                 mailboxExecutor.submit(
                         () -> tryProcessActionTaskForKey(key), "process action 
task");
             }
         }
+
+        getKeyedStateBackend()
+                .applyToAllKeys(
+                        VoidNamespace.INSTANCE,
+                        VoidNamespaceSerializer.INSTANCE,
+                        new ListStateDescriptor<>(
+                                PENDING_INPUT_EVENT_STATE_NAME, 
TypeInformation.of(Event.class)),
+                        (key, state) ->
+                                state.get()
+                                        .forEach(
+                                                event -> 
keySegmentQueue.addKeyToLastSegment(key)));
     }
 
     private void initOrIncSequenceNumber() throws Exception {
@@ -726,6 +770,14 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         }
     }
 
+    private void processEligibleWatermarks() throws Exception {
+        Watermark mark = keySegmentQueue.popOldestWatermark();
+        while (mark != null) {
+            super.processWatermark(mark);
+            mark = keySegmentQueue.popOldestWatermark();
+        }
+    }
+
     /** Failed to execute Action task. */
     public static class ActionTaskExecutionException extends Exception {
         public ActionTaskExecutionException(String message, Throwable cause) {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
index 894df22..1565bd0 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
@@ -45,6 +45,7 @@ public class JavaActionTask extends ActionTask {
                 key);
         runnerContext.checkNoPendingEvents();
         action.getExec().call(event, runnerContext);
-        return new ActionTaskResult(true, runnerContext.drainEvents(), null);
+        return new ActionTaskResult(
+                true, runnerContext.drainEvents(event.getSourceTimestamp()), 
null);
     }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java
new file mode 100644
index 0000000..39e284c
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A group of keys with the number of unfinished input records for each 
specific key. */
+public class KeySegment {
+    /** Maps keys to their reference counts (number of unfinished input 
records). */
+    private final Map<Object, Integer> keyReferenceCounts;
+
+    public KeySegment() {
+        this.keyReferenceCounts = new HashMap<>();
+    }
+
+    /** Increments the reference count for a key. */
+    public void incrementKeyReference(Object key) {
+        keyReferenceCounts.merge(key, 1, Integer::sum);
+    }
+
+    /** Decrements the reference count for a key. Removes the key if the count 
reaches zero. */
+    public void decrementKeyReference(Object key) {
+        keyReferenceCounts.computeIfPresent(
+                key,
+                (k, count) -> {
+                    if (count <= 1) {
+                        return null; // Remove the key if count is 1 or less
+                    } else {
+                        return count - 1;
+                    }
+                });
+    }
+
+    /** Checks if a key is active (i.e., its reference count is greater than 
zero). */
+    public boolean hasActiveKey(Object key) {
+        return keyReferenceCounts.containsKey(key);
+    }
+
+    /** Checks if the group is empty (no active keys). */
+    public boolean isEmpty() {
+        return keyReferenceCounts.isEmpty();
+    }
+
+    // Package-private getter for test access
+    Map<Object, Integer> getKeyReferenceCounts() {
+        return keyReferenceCounts;
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java
new file mode 100644
index 0000000..dad5c14
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+public class SegmentedQueue {
+    /** Queue of queue entries segmented by watermarks. */
+    private final Deque<KeySegment> segments;
+
+    /** Buffer for pending watermarks. */
+    private final Deque<Watermark> watermarks;
+
+    public SegmentedQueue() {
+        this.segments = new ArrayDeque<>();
+        this.watermarks = new ArrayDeque<>();
+    }
+
+    /** Adds a key to the last key segment. If the queue is empty, a new 
segment is created. */
+    public void addKeyToLastSegment(Object key) {
+        KeySegment lastSegment;
+        if (segments.isEmpty()) {
+            lastSegment = appendNewSegment();
+        } else {
+            lastSegment = segments.getLast();
+        }
+        lastSegment.incrementKeyReference(key);
+    }
+
+    /**
+     * Removes the key from the first segment that contains the key. Returns 
true if the key was
+     * found and removed.
+     */
+    public boolean removeKey(Object key) {
+        boolean removed = false;
+        for (KeySegment segment : segments) {
+            if (segment.hasActiveKey(key)) {
+                segment.decrementKeyReference(key);
+                removed = true;
+                break;
+            }
+        }
+        return removed;
+    }
+
+    /** Adds a watermark and creates a new segment to associate with it. */
+    public void addWatermark(Watermark watermark) {
+        watermarks.addLast(watermark);
+        appendNewSegment();
+    }
+
+    /** Creates a new key segment and appends it to the end of the queue. */
+    private KeySegment appendNewSegment() {
+        KeySegment newSegment = new KeySegment();
+        segments.addLast(newSegment);
+        return newSegment;
+    }
+
+    /**
+     * Pops the oldest watermark from the watermark deque and removes the 
corresponding key segment
+     * from the segments queue.
+     */
+    public Watermark popOldestWatermark() {
+        if (canProcessWatermark()) {
+            segments.pop();
+            return watermarks.pop();
+        }
+        return null;
+    }
+
+    /** Checks if a watermark is ready to be processed (i.e., oldest segment 
is empty). */
+    private boolean canProcessWatermark() {
+        return isFirstSegmentEmpty() && !watermarks.isEmpty();
+    }
+
+    /** Checks if the first key segment is empty. */
+    private boolean isFirstSegmentEmpty() {
+        return !this.segments.isEmpty() && segments.getFirst().isEmpty();
+    }
+
+    // Package-private getter for test access
+    Deque<KeySegment> getSegments() {
+        return segments;
+    }
+
+    // Package-private getter for test access
+    Deque<Watermark> getWatermarks() {
+        return watermarks;
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index 128abbf..ab3bec8 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -69,6 +69,7 @@ public class PythonActionTask extends ActionTask {
             tempGeneratedActionTask.setRunnerContext(runnerContext);
             return tempGeneratedActionTask.invoke();
         }
-        return new ActionTaskResult(true, runnerContext.drainEvents(), null);
+        return new ActionTaskResult(
+                true, runnerContext.drainEvents(event.getSourceTimestamp()), 
null);
     }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index f3f7792..9b33d85 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -45,6 +45,9 @@ public class PythonGeneratorActionTask extends 
PythonActionTask {
                 key);
         boolean finished = 
pythonActionExecutor.callPythonGenerator(pythonGeneratorRef);
         ActionTask generatedActionTask = finished ? null : this;
-        return new ActionTaskResult(finished, runnerContext.drainEvents(), 
generatedActionTask);
+        return new ActionTaskResult(
+                finished,
+                runnerContext.drainEvents(event.getSourceTimestamp()),
+                generatedActionTask);
     }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 2ff731f..5b04322 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -134,7 +134,7 @@ public class PythonActionExecutor {
                 return pythonGeneratorRef;
             }
         } catch (Exception e) {
-            runnerContext.drainEvents();
+            runnerContext.drainEvents(null);
             throw new PythonActionExecutionException("Failed to execute Python 
action", e);
         }
     }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index cbe4096..4027589 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.agents.runtime.actionstate.ActionState;
 import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@@ -443,6 +444,50 @@ public class ActionExecutionOperatorTest {
         }
     }
 
+    @Test
+    void testWatermark() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        new 
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+                        (KeySelector<Long, Long>) value -> value,
+                        TypeInformation.of(Long.class))) {
+
+            final long initialTime = 0L;
+
+            testHarness.open();
+
+            // Process input data 1 with key 0
+            testHarness.processWatermark(new Watermark(initialTime + 1));
+            testHarness.processElement(new StreamRecord<>(0L, initialTime + 
2));
+            testHarness.processElement(new StreamRecord<>(0L, initialTime + 
3));
+            testHarness.processElement(new StreamRecord<>(1L, initialTime + 
4));
+            testHarness.processWatermark(new Watermark(initialTime + 5));
+            testHarness.processElement(new StreamRecord<>(1L, initialTime + 
6));
+            testHarness.processElement(new StreamRecord<>(0L, initialTime + 
7));
+            testHarness.processElement(new StreamRecord<>(1L, initialTime + 
8));
+            testHarness.processWatermark(new Watermark(initialTime + 9));
+
+            testHarness.endInput();
+            testHarness.close();
+
+            Object[] jobOutputQueue = testHarness.getOutput().toArray();
+            assertThat(jobOutputQueue.length).isEqualTo(9);
+
+            long lastWatermark = Long.MIN_VALUE;
+
+            for (Object obj : jobOutputQueue) {
+                if (obj instanceof StreamRecord) {
+                    StreamRecord<?> streamRecord = (StreamRecord<?>) obj;
+                    
assertThat(streamRecord.getTimestamp()).isGreaterThan(lastWatermark);
+                } else if (obj instanceof Watermark) {
+                    Watermark watermark = (Watermark) obj;
+                    
assertThat(watermark.getTimestamp()).isGreaterThan(lastWatermark);
+                    lastWatermark = watermark.getTimestamp();
+                }
+            }
+        }
+    }
+
     public static class TestAgent {
 
         public static class MiddleEvent extends Event {
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/KeySegmentTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/KeySegmentTest.java
new file mode 100644
index 0000000..914866f
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/KeySegmentTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class KeySegmentTest {
+
+    @Test
+    void testIncrementKeyReference() {
+        KeySegment keySegment = new KeySegment();
+
+        // Test that a key is added with count 1
+        keySegment.incrementKeyReference("key1");
+        assertTrue(keySegment.hasActiveKey("key1"));
+        assertFalse(keySegment.isEmpty());
+    }
+
+    @Test
+    void testIncrementKeyReferenceMultipleTimes() {
+        KeySegment keySegment = new KeySegment();
+
+        // Test incrementing the same key multiple times
+        keySegment.incrementKeyReference("key1");
+        keySegment.incrementKeyReference("key1");
+        keySegment.incrementKeyReference("key1");
+
+        assertTrue(keySegment.hasActiveKey("key1"));
+        assertEquals(3, keySegment.getKeyReferenceCounts().get("key1"));
+    }
+
+    @Test
+    void testDecrementKeyReference() {
+        KeySegment keySegment = new KeySegment();
+
+        // Add a key with count 1
+        keySegment.incrementKeyReference("key1");
+
+        // Decrement it - should be removed
+        keySegment.decrementKeyReference("key1");
+        assertFalse(keySegment.hasActiveKey("key1"));
+        assertTrue(keySegment.isEmpty());
+    }
+
+    @Test
+    void testDecrementKeyReferenceMultipleTimes() {
+        KeySegment keySegment = new KeySegment();
+
+        // Add a key with count 3
+        keySegment.incrementKeyReference("key1");
+        keySegment.incrementKeyReference("key1");
+        keySegment.incrementKeyReference("key1");
+
+        // Decrement it twice - should still exist with count 1
+        keySegment.decrementKeyReference("key1");
+        keySegment.decrementKeyReference("key1");
+        assertTrue(keySegment.hasActiveKey("key1"));
+        assertEquals(1, keySegment.getKeyReferenceCounts().get("key1"));
+
+        // Decrement once more - should be removed
+        keySegment.decrementKeyReference("key1");
+        assertFalse(keySegment.hasActiveKey("key1"));
+        assertTrue(keySegment.isEmpty());
+    }
+
+    @Test
+    void testHasActiveKey() {
+        KeySegment keySegment = new KeySegment();
+
+        // Test with non-existent key
+        assertFalse(keySegment.hasActiveKey("key1"));
+
+        // Test with existing key
+        keySegment.incrementKeyReference("key1");
+        assertTrue(keySegment.hasActiveKey("key1"));
+    }
+
+    @Test
+    void testIsEmpty() {
+        KeySegment keySegment = new KeySegment();
+
+        // Should be empty initially
+        assertTrue(keySegment.isEmpty());
+
+        // Add a key - should not be empty
+        keySegment.incrementKeyReference("key1");
+        assertFalse(keySegment.isEmpty());
+
+        // Remove the key - should be empty again
+        keySegment.decrementKeyReference("key1");
+        assertTrue(keySegment.isEmpty());
+    }
+
+    @Test
+    void testMultipleKeys() {
+        KeySegment keySegment = new KeySegment();
+
+        // Add multiple keys
+        keySegment.incrementKeyReference("key1");
+        keySegment.incrementKeyReference("key2");
+        keySegment.incrementKeyReference("key3");
+
+        assertTrue(keySegment.hasActiveKey("key1"));
+        assertTrue(keySegment.hasActiveKey("key2"));
+        assertTrue(keySegment.hasActiveKey("key3"));
+        assertFalse(keySegment.isEmpty());
+
+        // Remove one key
+        keySegment.decrementKeyReference("key2");
+        assertTrue(keySegment.hasActiveKey("key1"));
+        assertFalse(keySegment.hasActiveKey("key2"));
+        assertTrue(keySegment.hasActiveKey("key3"));
+        assertFalse(keySegment.isEmpty());
+
+        // Remove remaining keys
+        keySegment.decrementKeyReference("key1");
+        keySegment.decrementKeyReference("key3");
+        assertTrue(keySegment.isEmpty());
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueueTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueueTest.java
new file mode 100644
index 0000000..8291b46
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueueTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.jupiter.api.Test;
+
+import java.util.Deque;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SegmentedQueueTest {
+
+    @Test
+    void testAddKeyToLastSegmentCreatesNewSegmentWhenEmpty() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Add first key (creates segment)
+        queue.addKeyToLastSegment("key1");
+
+        // Add second key (should go to same segment)
+        queue.addKeyToLastSegment("key2");
+
+        // Verify both keys are in the last segment
+        assertTrue(hasKeyInLastSegment(queue, "key1"));
+        assertTrue(hasKeyInLastSegment(queue, "key2"));
+    }
+
+    @Test
+    void testAddKeyToLastSegmentAddsToExistingSegment() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Add watermark (creates segment)
+        Watermark watermark = new Watermark(1000L);
+        queue.addWatermark(watermark);
+
+        // Add keys to the last segment
+        queue.addKeyToLastSegment("key1");
+        queue.addKeyToLastSegment("key2");
+
+        // Verify both keys are in the last segment
+        assertTrue(hasKeyInLastSegment(queue, "key1"));
+        assertTrue(hasKeyInLastSegment(queue, "key2"));
+    }
+
+    @Test
+    void testRemoveKeyFromSegment() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Add keys
+        queue.addKeyToLastSegment("key1");
+        queue.addKeyToLastSegment("key2");
+
+        // Remove one key
+        boolean removed = queue.removeKey("key1");
+
+        // Verify the key was removed
+        assertTrue(removed);
+        assertFalse(hasKeyInLastSegment(queue, "key1"));
+        assertTrue(hasKeyInLastSegment(queue, "key2"));
+    }
+
+    @Test
+    void testRemoveNonExistentKey() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Try to remove a key that doesn't exist
+        boolean removed = queue.removeKey("nonExistentKey");
+
+        // Should return false
+        assertFalse(removed);
+    }
+
+    @Test
+    void testAddWatermarkCreatesNewSegment() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Add a key first
+        queue.addKeyToLastSegment("key1");
+        int initialSegments = queue.getSegments().size();
+
+        // Add a watermark
+        Watermark watermark = new Watermark(1000L);
+        queue.addWatermark(watermark);
+
+        // Verify a new segment was created
+        assertEquals(initialSegments + 1, queue.getSegments().size());
+        assertEquals(1, queue.getWatermarks().size());
+    }
+
+    @Test
+    void testPopOldestWatermarkWhenSegmentIsEmpty() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Add a watermark (which creates a new segment)
+        Watermark watermark1 = new Watermark(1000L);
+        queue.addWatermark(watermark1);
+
+        // Add another watermark
+        Watermark watermark2 = new Watermark(2000L);
+        queue.addWatermark(watermark2);
+
+        // Pop the oldest watermark
+        Watermark popped = queue.popOldestWatermark();
+
+        // Verify the correct watermark was popped
+        assertEquals(watermark1.getTimestamp(), popped.getTimestamp());
+        assertEquals(1, queue.getWatermarks().size());
+    }
+
+    @Test
+    void testPopOldestWatermarkWhenSegmentIsNotEmpty() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Add a key
+        queue.addKeyToLastSegment("key1");
+
+        // Add a watermark
+        Watermark watermark = new Watermark(1000L);
+        queue.addWatermark(watermark);
+
+        // Try to pop the watermark - should return null since segment is not 
empty
+        Watermark popped = queue.popOldestWatermark();
+
+        // Should return null because segment is not empty
+        assertNull(popped);
+    }
+
+    @Test
+    void testPopOldestWatermarkWhenNoWatermarks() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Try to pop a watermark when there are none
+        Watermark popped = queue.popOldestWatermark();
+
+        // Should return null
+        assertNull(popped);
+    }
+
+    @Test
+    void testComplexScenario() {
+        SegmentedQueue queue = new SegmentedQueue();
+
+        // Add two continuous watermarks (creates new segments)
+        queue.addWatermark(new Watermark(1000L));
+        queue.addWatermark(new Watermark(1001L));
+
+        // The first two segment should be empty, so we can pop the watermark
+        Watermark popped = queue.popOldestWatermark();
+        assertNotNull(popped);
+        assertEquals(1000L, popped.getTimestamp());
+
+        popped = queue.popOldestWatermark();
+        assertNotNull(popped);
+        assertEquals(1001L, popped.getTimestamp());
+
+        // Add some keys
+        queue.addKeyToLastSegment("key1");
+        queue.addKeyToLastSegment("key2");
+
+        // Add another watermark (creates new segment)
+        queue.addWatermark(new Watermark(2000L));
+
+        // Add more keys to the new segment
+        queue.addKeyToLastSegment("key3");
+        queue.addKeyToLastSegment("key4");
+
+        // Remove some keys
+        assertTrue(queue.removeKey("key1"));
+        assertTrue(queue.removeKey("key3"));
+
+        popped = queue.popOldestWatermark();
+        assertNull(popped);
+
+        // Add another watermark (creates new segment)
+        queue.addWatermark(new Watermark(3000L));
+
+        // Add more keys
+        queue.addKeyToLastSegment("key5");
+
+        // Remove remaining keys from first segment
+        assertTrue(queue.removeKey("key2"));
+
+        popped = queue.popOldestWatermark();
+        assertNotNull(popped);
+        assertEquals(2000L, popped.getTimestamp());
+
+        // Remove remaining keys from second segment
+        assertTrue(queue.removeKey("key4"));
+
+        popped = queue.popOldestWatermark();
+        assertNotNull(popped);
+        assertEquals(3000L, popped.getTimestamp());
+    }
+
+    // Helper methods to access private fields for testing
+    private boolean hasKeyInLastSegment(SegmentedQueue queue, Object key) {
+        Deque<KeySegment> segments = queue.getSegments();
+        return segments.getLast().hasActiveKey(key);
+    }
+}


Reply via email to