Sxnan commented on code in PR #186:
URL: https://github.com/apache/flink-agents/pull/186#discussion_r2374891868
##########
api/src/main/java/org/apache/flink/agents/api/Event.java:
##########
@@ -28,6 +28,10 @@
public abstract class Event {
private final UUID id;
private final Map<String, Object> attributes;
+ /** The timestamp of the record. */
+ private long timestamp;
Review Comment:
Can we use Long type and use the null value to indicate the absence of the
timestamp? Would that be simpler?
##########
api/src/main/java/org/apache/flink/agents/api/Event.java:
##########
@@ -54,4 +58,21 @@ public Object getAttr(String name) {
public void setAttr(String name, Object value) {
attributes.put(name, value);
}
+
+ public boolean hasTimestamp() {
+ return hasTimestamp;
+ }
+
+ public long getTimestamp() {
Review Comment:
We can just return Long here and remove the `hasTimestamp` method
##########
runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java:
##########
@@ -97,6 +97,14 @@ public List<Event> drainEvents() {
return list;
}
+ public List<Event> drainEvents(long timestamp) {
Review Comment:
I think we can take a Long type timestamp here. We skip setting the
timestamp if it is null.
This way, the caller does not need to figure which drainEvents method to call
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+public class SegmentedQueue {
+ /** Queue of queue entries segmented by watermarks. */
+ private final Deque<KeySegment> segments;
+
+ /** Buffer for pending watermarks. */
+ private final Deque<Watermark> watermarks;
+
+ public SegmentedQueue() {
+ this.segments = new ArrayDeque<>();
+ this.watermarks = new ArrayDeque<>();
+ }
+
+ /** Adds a key to the last key segment. If the queue is empty, a new
segment is created. */
+ public void addKeyToLastSegment(Object key) {
+ KeySegment lastSegment;
+ if (segments.isEmpty()) {
+ lastSegment = appendNewSegment();
+ } else {
+ lastSegment = segments.getLast();
+ }
+ lastSegment.incrementKeyReference(key);
+ }
+
+ /**
+ * Removes a key from all segments, decrementing its reference count.
Returns true if the key
+ * was found and removed.
+ */
+ public boolean removeKey(Object key) {
+ boolean removed = false;
+ for (KeySegment segment : segments) {
+ if (segment.hasActiveKey(key)) {
+ segment.decrementKeyReference(key);
+ removed = true;
+ break;
+ }
+ }
+ return removed;
+ }
+
+ /** Adds a watermark and creates a new segment to associate with it. */
+ public void addWatermark(Watermark watermark) {
+ watermarks.addLast(watermark);
+ appendNewSegment();
+ }
+
+ /** Creates a new key segment and appends it to the end of the queue. */
+ public KeySegment appendNewSegment() {
+ KeySegment newSegment = new KeySegment();
+ segments.addLast(newSegment);
+ return newSegment;
+ }
+
+ /**
+ * Pops the oldest watermark from the watermark deque and removes the
corresponding key segment
+ * from the segments queue.
+ */
+ public Watermark popOldestWatermark() {
Review Comment:
How about we make this method pop the completed watermark? And return null
if no more completed watermark. This way we can ensure that the caller will
never mistakenly pop an uncompleted watermark
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+public class SegmentedQueue {
+ /** Queue of queue entries segmented by watermarks. */
+ private final Deque<KeySegment> segments;
+
+ /** Buffer for pending watermarks. */
+ private final Deque<Watermark> watermarks;
+
+ public SegmentedQueue() {
+ this.segments = new ArrayDeque<>();
+ this.watermarks = new ArrayDeque<>();
+ }
+
+ /** Adds a key to the last key segment. If the queue is empty, a new
segment is created. */
+ public void addKeyToLastSegment(Object key) {
+ KeySegment lastSegment;
+ if (segments.isEmpty()) {
+ lastSegment = appendNewSegment();
+ } else {
+ lastSegment = segments.getLast();
+ }
+ lastSegment.incrementKeyReference(key);
+ }
+
+ /**
+ * Removes a key from all segments, decrementing its reference count.
Returns true if the key
+ * was found and removed.
+ */
+ public boolean removeKey(Object key) {
+ boolean removed = false;
+ for (KeySegment segment : segments) {
+ if (segment.hasActiveKey(key)) {
+ segment.decrementKeyReference(key);
+ removed = true;
+ break;
+ }
+ }
+ return removed;
+ }
+
+ /** Adds a watermark and creates a new segment to associate with it. */
+ public void addWatermark(Watermark watermark) {
+ watermarks.addLast(watermark);
+ appendNewSegment();
+ }
+
+ /** Creates a new key segment and appends it to the end of the queue. */
+ public KeySegment appendNewSegment() {
Review Comment:
This can be private
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A group of keys with reference counting to track active keys. */
+public class KeySegment {
+ /** Maps keys to their reference counts (number of active occurrences). */
Review Comment:
I think it is more accurate to say `(number of unfinished events of the
specific key)`
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+public class SegmentedQueue {
+ /** Queue of queue entries segmented by watermarks. */
+ private final Deque<KeySegment> segments;
+
+ /** Buffer for pending watermarks. */
+ private final Deque<Watermark> watermarks;
+
+ public SegmentedQueue() {
+ this.segments = new ArrayDeque<>();
+ this.watermarks = new ArrayDeque<>();
+ }
+
+ /** Adds a key to the last key segment. If the queue is empty, a new
segment is created. */
+ public void addKeyToLastSegment(Object key) {
+ KeySegment lastSegment;
+ if (segments.isEmpty()) {
+ lastSegment = appendNewSegment();
+ } else {
+ lastSegment = segments.getLast();
+ }
+ lastSegment.incrementKeyReference(key);
+ }
+
+ /**
+ * Removes a key from all segments, decrementing its reference count.
Returns true if the key
+ * was found and removed.
+ */
+ public boolean removeKey(Object key) {
+ boolean removed = false;
+ for (KeySegment segment : segments) {
+ if (segment.hasActiveKey(key)) {
+ segment.decrementKeyReference(key);
+ removed = true;
+ break;
+ }
+ }
+ return removed;
+ }
+
+ /** Adds a watermark and creates a new segment to associate with it. */
+ public void addWatermark(Watermark watermark) {
+ watermarks.addLast(watermark);
+ appendNewSegment();
+ }
+
+ /** Creates a new key segment and appends it to the end of the queue. */
+ public KeySegment appendNewSegment() {
+ KeySegment newSegment = new KeySegment();
+ segments.addLast(newSegment);
+ return newSegment;
+ }
+
+ /**
+ * Pops the oldest watermark from the watermark deque and removes the
corresponding key segment
+ * from the segments queue.
+ */
+ public Watermark popOldestWatermark() {
+ segments.pop();
+ return watermarks.pop();
+ }
+
+ /** Checks if a watermark is ready to be processed (i.e., oldest segment
is empty). */
+ public boolean canProcessWatermark() {
Review Comment:
This and `isFirstSegmentEmpty` can be private
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/SegmentedQueue.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+public class SegmentedQueue {
+ /** Queue of queue entries segmented by watermarks. */
+ private final Deque<KeySegment> segments;
+
+ /** Buffer for pending watermarks. */
+ private final Deque<Watermark> watermarks;
+
+ public SegmentedQueue() {
+ this.segments = new ArrayDeque<>();
+ this.watermarks = new ArrayDeque<>();
+ }
+
+ /** Adds a key to the last key segment. If the queue is empty, a new
segment is created. */
+ public void addKeyToLastSegment(Object key) {
+ KeySegment lastSegment;
+ if (segments.isEmpty()) {
+ lastSegment = appendNewSegment();
+ } else {
+ lastSegment = segments.getLast();
+ }
+ lastSegment.incrementKeyReference(key);
+ }
+
+ /**
+ * Removes a key from all segments, decrementing its reference count.
Returns true if the key
Review Comment:
The doc is incorrect; we actually removed the key from the first segment
that contains the key
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/queue/KeySegment.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.operator.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A group of keys with reference counting to track active keys. */
+public class KeySegment {
+ /** Maps keys to their reference counts (number of active occurrences). */
+ private final Map<Object, Integer> keyReferenceCounts;
+
+ public KeySegment() {
+ this.keyReferenceCounts = new HashMap<>();
+ }
+
+ /** Increments the reference count for a key. */
+ public void incrementKeyReference(Object key) {
+ keyReferenceCounts.merge(key, 1, Integer::sum);
+ }
+
+ /** Decrements the reference count for a key. Removes the key if the count
reaches zero. */
+ public void decrementKeyReference(Object key) {
+ keyReferenceCounts.computeIfPresent(
+ key,
+ (k, count) -> {
+ if (count <= 1) {
+ return null; // Remove the key if count is 1 or less
+ } else {
+ return count - 1;
+ }
+ });
+ }
+
+ /** Checks if a key is active (i.e., its reference count is greater than
zero). */
+ public boolean hasActiveKey(Object key) {
+ return keyReferenceCounts.containsKey(key) &&
keyReferenceCounts.get(key) > 0;
Review Comment:
As we remove the key if its count goes to 0, if the key is in the map, then
it implies that the count is greater than 0.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
Review Comment:
How do we handle recovery from a checkpoint? I think we should add the key
of the `actionTasksKState` to the segment during recovery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]