This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new ebe2176 [runtime] ActionExecutionOperator handle watermark properly
(#186)
ebe2176 is described below
commit ebe21764e50a0775929a951527a1432b74869950
Author: Eugene <[email protected]>
AuthorDate: Fri Sep 26 13:14:32 2025 +0800
[runtime] ActionExecutionOperator handle watermark properly (#186)
---
.../java/org/apache/flink/agents/api/Event.java | 14 ++
.../agents/runtime/context/RunnerContextImpl.java | 5 +-
.../runtime/operator/ActionExecutionOperator.java | 56 +++++-
.../agents/runtime/operator/JavaActionTask.java | 3 +-
.../agents/runtime/operator/queue/KeySegment.java | 64 ++++++
.../runtime/operator/queue/SegmentedQueue.java | 108 +++++++++++
.../runtime/python/operator/PythonActionTask.java | 3 +-
.../python/operator/PythonGeneratorActionTask.java | 5 +-
.../runtime/python/utils/PythonActionExecutor.java | 2 +-
.../operator/ActionExecutionOperatorTest.java | 45 +++++
.../runtime/operator/queue/KeySegmentTest.java | 137 +++++++++++++
.../runtime/operator/queue/SegmentedQueueTest.java | 216 +++++++++++++++++++++
12 files changed, 651 insertions(+), 7 deletions(-)
diff --git a/api/src/main/java/org/apache/flink/agents/api/Event.java
b/api/src/main/java/org/apache/flink/agents/api/Event.java
index eabd2f7..e0e77e5 100644
--- a/api/src/main/java/org/apache/flink/agents/api/Event.java
+++ b/api/src/main/java/org/apache/flink/agents/api/Event.java
@@ -28,6 +28,8 @@ import java.util.UUID;
public abstract class Event {
private final UUID id;
private final Map<String, Object> attributes;
+ /** The timestamp of the source record. */
+ private Long sourceTimestamp;
public Event() {
this(UUID.randomUUID(), new HashMap<>());
@@ -54,4 +56,16 @@ public abstract class Event {
public void setAttr(String name, Object value) {
attributes.put(name, value);
}
+
+ public boolean hasSourceTimestamp() {
+ return sourceTimestamp != null;
+ }
+
+ public Long getSourceTimestamp() {
+ return sourceTimestamp;
+ }
+
+ public void setSourceTimestamp(long timestamp) {
+ this.sourceTimestamp = timestamp;
+ }
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index e8ca999..d998af6 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -90,9 +90,12 @@ public class RunnerContextImpl implements RunnerContext {
pendingEvents.add(event);
}
- public List<Event> drainEvents() {
+ public List<Event> drainEvents(Long timestamp) {
mailboxThreadChecker.run();
List<Event> list = new ArrayList<>(this.pendingEvents);
+ if (timestamp != null) {
+ list.forEach(event -> event.setSourceTimestamp(timestamp));
+ }
this.pendingEvents.clear();
return list;
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index b91fa52..2543dee 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -40,6 +40,7 @@ import
org.apache.flink.agents.runtime.memory.CachedMemoryStore;
import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
+import org.apache.flink.agents.runtime.operator.queue.SegmentedQueue;
import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
import org.apache.flink.agents.runtime.python.operator.PythonActionTask;
@@ -65,6 +66,7 @@ import
org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
@@ -105,6 +107,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private static final String RECOVERY_MARKER_STATE_NAME = "recoveryMarker";
private static final String MESSAGE_SEQUENCE_NUMBER_STATE_NAME =
"messageSequenceNumber";
+ private static final String PENDING_INPUT_EVENT_STATE_NAME =
"pendingInputEvents";
private final AgentPlan agentPlan;
@@ -121,6 +124,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private transient BuiltInMetrics builtInMetrics;
+ private transient SegmentedQueue keySegmentQueue;
+
private final transient MailboxExecutor mailboxExecutor;
// We need to check whether the current thread is the mailbox thread using
the mailbox
@@ -190,6 +195,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup());
builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan);
+ keySegmentQueue = new SegmentedQueue();
+
// init the action state store with proper implementation
if (actionStateStore == null
&& KAFKA.getType()
@@ -224,7 +231,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
getRuntimeContext()
.getListState(
new ListStateDescriptor<>(
- "pendingInputEvents",
TypeInformation.of(Event.class)));
+ PENDING_INPUT_EVENT_STATE_NAME,
+ TypeInformation.of(Event.class)));
// We use UnionList here to ensure that the task can access all keys
after parallelism
// modifications.
// Subsequent steps {@link #tryResumeProcessActionTasks} will then
filter out keys that do
@@ -257,6 +265,12 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
eventLogger.open(new EventLoggerOpenParams(runtimeContext));
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ keySegmentQueue.addWatermark(mark);
+ processEligibleWatermarks();
+ }
+
@Override
public void processElement(StreamRecord<IN> record) throws Exception {
IN input = record.getValue();
@@ -264,6 +278,11 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
// wrap to InputEvent first
Event inputEvent = wrapToInputEvent(input);
+ if (record.hasTimestamp()) {
+ inputEvent.setSourceTimestamp(record.getTimestamp());
+ }
+
+ keySegmentQueue.addKeyToLastSegment(getCurrentKey());
if (currentKeyHasMoreActionTask()) {
// If there are already actions being processed for the current
key, the newly incoming
@@ -287,7 +306,12 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
if (EventUtil.isOutputEvent(event)) {
// If the event is an OutputEvent, we send it downstream.
OUT outputData = getOutputFromOutputEvent(event);
- output.collect(reusedStreamRecord.replace(outputData));
+ if (event.hasSourceTimestamp()) {
+ output.collect(reusedStreamRecord.replace(outputData,
event.getSourceTimestamp()));
+ } else {
+ reusedStreamRecord.eraseTimestamp();
+ output.collect(reusedStreamRecord.replace(outputData));
+ }
} else {
if (isInputEvent) {
// If the event is an InputEvent, we mark that the key is
currently being processed.
@@ -355,6 +379,10 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
+ key
+ " should be 1, but got "
+ removedCount);
+ checkState(
+ keySegmentQueue.removeKey(key),
+ "Current key" + key + " is missing from the
segmentedQueue.");
+ processEligibleWatermarks();
return;
}
@@ -435,6 +463,10 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
+ key
+ " should be 1, but got "
+ removedCount);
+ checkState(
+ keySegmentQueue.removeKey(key),
+ "Current key" + key + " is missing from the
segmentedQueue.");
+ processEligibleWatermarks();
Event pendingInputEvent =
pollFromListState(pendingInputEventsKState);
if (pendingInputEvent != null) {
processEvent(key, pendingInputEvent);
@@ -655,10 +687,22 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
Iterable<Object> keys = currentProcessingKeysOpState.get();
if (keys != null) {
for (Object key : keys) {
+ keySegmentQueue.addKeyToLastSegment(key);
mailboxExecutor.submit(
() -> tryProcessActionTaskForKey(key), "process action
task");
}
}
+
+ getKeyedStateBackend()
+ .applyToAllKeys(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>(
+ PENDING_INPUT_EVENT_STATE_NAME,
TypeInformation.of(Event.class)),
+ (key, state) ->
+ state.get()
+ .forEach(
+ event ->
keySegmentQueue.addKeyToLastSegment(key)));
}
private void initOrIncSequenceNumber() throws Exception {
@@ -726,6 +770,14 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
}
}
+ private void processEligibleWatermarks() throws Exception {
+ Watermark mark = keySegmentQueue.popOldestWatermark();
+ while (mark != null) {
+ super.processWatermark(mark);
+ mark = keySegmentQueue.popOldestWatermark();
+ }
+ }
+
/** Failed to execute Action task. */
public static class ActionTaskExecutionException extends Exception {
public ActionTaskExecutionException(String message, Throwable cause) {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
index 894df22..1565bd0 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java
@@ -45,6 +45,7 @@ public class JavaActionTask extends ActionTask {
key);
runnerContext.checkNoPendingEvents();
action.getExec().call(event, runnerContext);
- return new ActionTaskResult(true, runnerContext.drainEvents(), null);
+ return new ActionTaskResult(
+ true, runnerContext.drainEvents(event.getSourceTimestamp()),
null);
}
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java
new file mode 100644
index 0000000..39e284c
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A group of keys with the number of unfinished input records for each
specific key. */
+public class KeySegment {
+ /** Maps keys to their reference counts (number of unfinished input
records). */
+ private final Map<Object, Integer> keyReferenceCounts;
+
+ public KeySegment() {
+ this.keyReferenceCounts = new HashMap<>();
+ }
+
+ /** Increments the reference count for a key. */
+ public void incrementKeyReference(Object key) {
+ keyReferenceCounts.merge(key, 1, Integer::sum);
+ }
+
+ /** Decrements the reference count for a key. Removes the key if the count
reaches zero. */
+ public void decrementKeyReference(Object key) {
+ keyReferenceCounts.computeIfPresent(
+ key,
+ (k, count) -> {
+ if (count <= 1) {
+ return null; // Remove the key if count is 1 or less
+ } else {
+ return count - 1;
+ }
+ });
+ }
+
+ /** Checks if a key is active (i.e., its reference count is greater than
zero). */
+ public boolean hasActiveKey(Object key) {
+ return keyReferenceCounts.containsKey(key);
+ }
+
+ /** Checks if the group is empty (no active keys). */
+ public boolean isEmpty() {
+ return keyReferenceCounts.isEmpty();
+ }
+
+ // Package-private getter for test access
+ Map<Object, Integer> getKeyReferenceCounts() {
+ return keyReferenceCounts;
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java
new file mode 100644
index 0000000..dad5c14
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+public class SegmentedQueue {
+ /** Queue of queue entries segmented by watermarks. */
+ private final Deque<KeySegment> segments;
+
+ /** Buffer for pending watermarks. */
+ private final Deque<Watermark> watermarks;
+
+ public SegmentedQueue() {
+ this.segments = new ArrayDeque<>();
+ this.watermarks = new ArrayDeque<>();
+ }
+
+ /** Adds a key to the last key segment. If the queue is empty, a new
segment is created. */
+ public void addKeyToLastSegment(Object key) {
+ KeySegment lastSegment;
+ if (segments.isEmpty()) {
+ lastSegment = appendNewSegment();
+ } else {
+ lastSegment = segments.getLast();
+ }
+ lastSegment.incrementKeyReference(key);
+ }
+
+ /**
+ * Removes the key from the first segment that contains the key. Returns
true if the key was
+ * found and removed.
+ */
+ public boolean removeKey(Object key) {
+ boolean removed = false;
+ for (KeySegment segment : segments) {
+ if (segment.hasActiveKey(key)) {
+ segment.decrementKeyReference(key);
+ removed = true;
+ break;
+ }
+ }
+ return removed;
+ }
+
+ /** Adds a watermark and creates a new segment to associate with it. */
+ public void addWatermark(Watermark watermark) {
+ watermarks.addLast(watermark);
+ appendNewSegment();
+ }
+
+ /** Creates a new key segment and appends it to the end of the queue. */
+ private KeySegment appendNewSegment() {
+ KeySegment newSegment = new KeySegment();
+ segments.addLast(newSegment);
+ return newSegment;
+ }
+
+ /**
+ * Pops the oldest watermark from the watermark deque and removes the
corresponding key segment
+ * from the segments queue.
+ */
+ public Watermark popOldestWatermark() {
+ if (canProcessWatermark()) {
+ segments.pop();
+ return watermarks.pop();
+ }
+ return null;
+ }
+
+ /** Checks if a watermark is ready to be processed (i.e., oldest segment
is empty). */
+ private boolean canProcessWatermark() {
+ return isFirstSegmentEmpty() && !watermarks.isEmpty();
+ }
+
+ /** Checks if the first key segment is empty. */
+ private boolean isFirstSegmentEmpty() {
+ return !this.segments.isEmpty() && segments.getFirst().isEmpty();
+ }
+
+ // Package-private getter for test access
+ Deque<KeySegment> getSegments() {
+ return segments;
+ }
+
+ // Package-private getter for test access
+ Deque<Watermark> getWatermarks() {
+ return watermarks;
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index 128abbf..ab3bec8 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -69,6 +69,7 @@ public class PythonActionTask extends ActionTask {
tempGeneratedActionTask.setRunnerContext(runnerContext);
return tempGeneratedActionTask.invoke();
}
- return new ActionTaskResult(true, runnerContext.drainEvents(), null);
+ return new ActionTaskResult(
+ true, runnerContext.drainEvents(event.getSourceTimestamp()),
null);
}
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index f3f7792..9b33d85 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -45,6 +45,9 @@ public class PythonGeneratorActionTask extends
PythonActionTask {
key);
boolean finished =
pythonActionExecutor.callPythonGenerator(pythonGeneratorRef);
ActionTask generatedActionTask = finished ? null : this;
- return new ActionTaskResult(finished, runnerContext.drainEvents(),
generatedActionTask);
+ return new ActionTaskResult(
+ finished,
+ runnerContext.drainEvents(event.getSourceTimestamp()),
+ generatedActionTask);
}
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 2ff731f..5b04322 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -134,7 +134,7 @@ public class PythonActionExecutor {
return pythonGeneratorRef;
}
} catch (Exception e) {
- runnerContext.drainEvents();
+ runnerContext.drainEvents(null);
throw new PythonActionExecutionException("Failed to execute Python
action", e);
}
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index cbe4096..4027589 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -29,6 +29,7 @@ import
org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@@ -443,6 +444,50 @@ public class ActionExecutionOperatorTest {
}
}
+ @Test
+ void testWatermark() throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+ (KeySelector<Long, Long>) value -> value,
+ TypeInformation.of(Long.class))) {
+
+ final long initialTime = 0L;
+
+ testHarness.open();
+
+ // Process input data 1 with key 0
+ testHarness.processWatermark(new Watermark(initialTime + 1));
+ testHarness.processElement(new StreamRecord<>(0L, initialTime +
2));
+ testHarness.processElement(new StreamRecord<>(0L, initialTime +
3));
+ testHarness.processElement(new StreamRecord<>(1L, initialTime +
4));
+ testHarness.processWatermark(new Watermark(initialTime + 5));
+ testHarness.processElement(new StreamRecord<>(1L, initialTime +
6));
+ testHarness.processElement(new StreamRecord<>(0L, initialTime +
7));
+ testHarness.processElement(new StreamRecord<>(1L, initialTime +
8));
+ testHarness.processWatermark(new Watermark(initialTime + 9));
+
+ testHarness.endInput();
+ testHarness.close();
+
+ Object[] jobOutputQueue = testHarness.getOutput().toArray();
+ assertThat(jobOutputQueue.length).isEqualTo(9);
+
+ long lastWatermark = Long.MIN_VALUE;
+
+ for (Object obj : jobOutputQueue) {
+ if (obj instanceof StreamRecord) {
+ StreamRecord<?> streamRecord = (StreamRecord<?>) obj;
+
assertThat(streamRecord.getTimestamp()).isGreaterThan(lastWatermark);
+ } else if (obj instanceof Watermark) {
+ Watermark watermark = (Watermark) obj;
+
assertThat(watermark.getTimestamp()).isGreaterThan(lastWatermark);
+ lastWatermark = watermark.getTimestamp();
+ }
+ }
+ }
+ }
+
public static class TestAgent {
public static class MiddleEvent extends Event {
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/KeySegmentTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/KeySegmentTest.java
new file mode 100644
index 0000000..914866f
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/KeySegmentTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class KeySegmentTest {
+
+ @Test
+ void testIncrementKeyReference() {
+ KeySegment keySegment = new KeySegment();
+
+ // Test that a key is added with count 1
+ keySegment.incrementKeyReference("key1");
+ assertTrue(keySegment.hasActiveKey("key1"));
+ assertFalse(keySegment.isEmpty());
+ }
+
+ @Test
+ void testIncrementKeyReferenceMultipleTimes() {
+ KeySegment keySegment = new KeySegment();
+
+ // Test incrementing the same key multiple times
+ keySegment.incrementKeyReference("key1");
+ keySegment.incrementKeyReference("key1");
+ keySegment.incrementKeyReference("key1");
+
+ assertTrue(keySegment.hasActiveKey("key1"));
+ assertEquals(3, keySegment.getKeyReferenceCounts().get("key1"));
+ }
+
+ @Test
+ void testDecrementKeyReference() {
+ KeySegment keySegment = new KeySegment();
+
+ // Add a key with count 1
+ keySegment.incrementKeyReference("key1");
+
+ // Decrement it - should be removed
+ keySegment.decrementKeyReference("key1");
+ assertFalse(keySegment.hasActiveKey("key1"));
+ assertTrue(keySegment.isEmpty());
+ }
+
+ @Test
+ void testDecrementKeyReferenceMultipleTimes() {
+ KeySegment keySegment = new KeySegment();
+
+ // Add a key with count 3
+ keySegment.incrementKeyReference("key1");
+ keySegment.incrementKeyReference("key1");
+ keySegment.incrementKeyReference("key1");
+
+ // Decrement it twice - should still exist with count 1
+ keySegment.decrementKeyReference("key1");
+ keySegment.decrementKeyReference("key1");
+ assertTrue(keySegment.hasActiveKey("key1"));
+ assertEquals(1, keySegment.getKeyReferenceCounts().get("key1"));
+
+ // Decrement once more - should be removed
+ keySegment.decrementKeyReference("key1");
+ assertFalse(keySegment.hasActiveKey("key1"));
+ assertTrue(keySegment.isEmpty());
+ }
+
+ @Test
+ void testHasActiveKey() {
+ KeySegment keySegment = new KeySegment();
+
+ // Test with non-existent key
+ assertFalse(keySegment.hasActiveKey("key1"));
+
+ // Test with existing key
+ keySegment.incrementKeyReference("key1");
+ assertTrue(keySegment.hasActiveKey("key1"));
+ }
+
+ @Test
+ void testIsEmpty() {
+ KeySegment keySegment = new KeySegment();
+
+ // Should be empty initially
+ assertTrue(keySegment.isEmpty());
+
+ // Add a key - should not be empty
+ keySegment.incrementKeyReference("key1");
+ assertFalse(keySegment.isEmpty());
+
+ // Remove the key - should be empty again
+ keySegment.decrementKeyReference("key1");
+ assertTrue(keySegment.isEmpty());
+ }
+
+ @Test
+ void testMultipleKeys() {
+ KeySegment keySegment = new KeySegment();
+
+ // Add multiple keys
+ keySegment.incrementKeyReference("key1");
+ keySegment.incrementKeyReference("key2");
+ keySegment.incrementKeyReference("key3");
+
+ assertTrue(keySegment.hasActiveKey("key1"));
+ assertTrue(keySegment.hasActiveKey("key2"));
+ assertTrue(keySegment.hasActiveKey("key3"));
+ assertFalse(keySegment.isEmpty());
+
+ // Remove one key
+ keySegment.decrementKeyReference("key2");
+ assertTrue(keySegment.hasActiveKey("key1"));
+ assertFalse(keySegment.hasActiveKey("key2"));
+ assertTrue(keySegment.hasActiveKey("key3"));
+ assertFalse(keySegment.isEmpty());
+
+ // Remove remaining keys
+ keySegment.decrementKeyReference("key1");
+ keySegment.decrementKeyReference("key3");
+ assertTrue(keySegment.isEmpty());
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueueTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueueTest.java
new file mode 100644
index 0000000..8291b46
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueueTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.jupiter.api.Test;
+
+import java.util.Deque;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SegmentedQueueTest {
+
+ @Test
+ void testAddKeyToLastSegmentCreatesNewSegmentWhenEmpty() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Add first key (creates segment)
+ queue.addKeyToLastSegment("key1");
+
+ // Add second key (should go to same segment)
+ queue.addKeyToLastSegment("key2");
+
+ // Verify both keys are in the last segment
+ assertTrue(hasKeyInLastSegment(queue, "key1"));
+ assertTrue(hasKeyInLastSegment(queue, "key2"));
+ }
+
+ @Test
+ void testAddKeyToLastSegmentAddsToExistingSegment() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Add watermark (creates segment)
+ Watermark watermark = new Watermark(1000L);
+ queue.addWatermark(watermark);
+
+ // Add keys to the last segment
+ queue.addKeyToLastSegment("key1");
+ queue.addKeyToLastSegment("key2");
+
+ // Verify both keys are in the last segment
+ assertTrue(hasKeyInLastSegment(queue, "key1"));
+ assertTrue(hasKeyInLastSegment(queue, "key2"));
+ }
+
+ @Test
+ void testRemoveKeyFromSegment() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Add keys
+ queue.addKeyToLastSegment("key1");
+ queue.addKeyToLastSegment("key2");
+
+ // Remove one key
+ boolean removed = queue.removeKey("key1");
+
+ // Verify the key was removed
+ assertTrue(removed);
+ assertFalse(hasKeyInLastSegment(queue, "key1"));
+ assertTrue(hasKeyInLastSegment(queue, "key2"));
+ }
+
+ @Test
+ void testRemoveNonExistentKey() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Try to remove a key that doesn't exist
+ boolean removed = queue.removeKey("nonExistentKey");
+
+ // Should return false
+ assertFalse(removed);
+ }
+
+ @Test
+ void testAddWatermarkCreatesNewSegment() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Add a key first
+ queue.addKeyToLastSegment("key1");
+ int initialSegments = queue.getSegments().size();
+
+ // Add a watermark
+ Watermark watermark = new Watermark(1000L);
+ queue.addWatermark(watermark);
+
+ // Verify a new segment was created
+ assertEquals(initialSegments + 1, queue.getSegments().size());
+ assertEquals(1, queue.getWatermarks().size());
+ }
+
+ @Test
+ void testPopOldestWatermarkWhenSegmentIsEmpty() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Add a watermark (which creates a new segment)
+ Watermark watermark1 = new Watermark(1000L);
+ queue.addWatermark(watermark1);
+
+ // Add another watermark
+ Watermark watermark2 = new Watermark(2000L);
+ queue.addWatermark(watermark2);
+
+ // Pop the oldest watermark
+ Watermark popped = queue.popOldestWatermark();
+
+ // Verify the correct watermark was popped
+ assertEquals(watermark1.getTimestamp(), popped.getTimestamp());
+ assertEquals(1, queue.getWatermarks().size());
+ }
+
+ @Test
+ void testPopOldestWatermarkWhenSegmentIsNotEmpty() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Add a key
+ queue.addKeyToLastSegment("key1");
+
+ // Add a watermark
+ Watermark watermark = new Watermark(1000L);
+ queue.addWatermark(watermark);
+
+ // Try to pop the watermark - should return null since segment is not
empty
+ Watermark popped = queue.popOldestWatermark();
+
+ // Should return null because segment is not empty
+ assertNull(popped);
+ }
+
+ @Test
+ void testPopOldestWatermarkWhenNoWatermarks() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Try to pop a watermark when there are none
+ Watermark popped = queue.popOldestWatermark();
+
+ // Should return null
+ assertNull(popped);
+ }
+
+ @Test
+ void testComplexScenario() {
+ SegmentedQueue queue = new SegmentedQueue();
+
+ // Add two continuous watermarks (creates new segments)
+ queue.addWatermark(new Watermark(1000L));
+ queue.addWatermark(new Watermark(1001L));
+
+ // The first two segment should be empty, so we can pop the watermark
+ Watermark popped = queue.popOldestWatermark();
+ assertNotNull(popped);
+ assertEquals(1000L, popped.getTimestamp());
+
+ popped = queue.popOldestWatermark();
+ assertNotNull(popped);
+ assertEquals(1001L, popped.getTimestamp());
+
+ // Add some keys
+ queue.addKeyToLastSegment("key1");
+ queue.addKeyToLastSegment("key2");
+
+ // Add another watermark (creates new segment)
+ queue.addWatermark(new Watermark(2000L));
+
+ // Add more keys to the new segment
+ queue.addKeyToLastSegment("key3");
+ queue.addKeyToLastSegment("key4");
+
+ // Remove some keys
+ assertTrue(queue.removeKey("key1"));
+ assertTrue(queue.removeKey("key3"));
+
+ popped = queue.popOldestWatermark();
+ assertNull(popped);
+
+ // Add another watermark (creates new segment)
+ queue.addWatermark(new Watermark(3000L));
+
+ // Add more keys
+ queue.addKeyToLastSegment("key5");
+
+ // Remove remaining keys from first segment
+ assertTrue(queue.removeKey("key2"));
+
+ popped = queue.popOldestWatermark();
+ assertNotNull(popped);
+ assertEquals(2000L, popped.getTimestamp());
+
+ // Remove remaining keys from second segment
+ assertTrue(queue.removeKey("key4"));
+
+ popped = queue.popOldestWatermark();
+ assertNotNull(popped);
+ assertEquals(3000L, popped.getTimestamp());
+ }
+
+ // Helper methods to access private fields for testing
+ private boolean hasKeyInLastSegment(SegmentedQueue queue, Object key) {
+ Deque<KeySegment> segments = queue.getSegments();
+ return segments.getLast().hasActiveKey(key);
+ }
+}