This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fcc81ec [runtime] Filter resumed currentProcessingKeys by subtask 
ownership a… (#581)
0fcc81ec is described below

commit 0fcc81ecfe2e897a4c70b16fc4b20627c95a6c2e
Author: Joey Tong <[email protected]>
AuthorDate: Thu Mar 26 19:12:36 2026 +0800

    [runtime] Filter resumed currentProcessingKeys by subtask ownership a… 
(#581)
    
    * [runtime] Filter resumed currentProcessingKeys by subtask ownership after 
rescaling
---
 .../runtime/operator/ActionExecutionOperator.java  | 21 +++++++
 .../operator/ActionExecutionOperatorTest.java      | 68 ++++++++++++++++++++++
 2 files changed, 89 insertions(+)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 68480440..89e35131 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -68,6 +68,8 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -916,7 +918,13 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
     private void tryResumeProcessActionTasks() throws Exception {
         Iterable<Object> keys = currentProcessingKeysOpState.get();
         if (keys != null) {
+            int maxParallelism = 
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks();
+            KeyGroupRange currentSubtaskKeyGroupRange =
+                    getCurrentSubtaskKeyGroupRange(maxParallelism);
             for (Object key : keys) {
+                if (!isKeyOwnedByCurrentSubtask(key, maxParallelism, 
currentSubtaskKeyGroupRange)) {
+                    continue;
+                }
                 keySegmentQueue.addKeyToLastSegment(key);
                 mailboxExecutor.submit(
                         () -> tryProcessActionTaskForKey(key), "process action 
task");
@@ -1125,6 +1133,19 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         }
     }
 
+    private KeyGroupRange getCurrentSubtaskKeyGroupRange(int maxParallelism) {
+        int parallelism = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+        int subtaskIndex = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+        return KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+                maxParallelism, parallelism, subtaskIndex);
+    }
+
+    private boolean isKeyOwnedByCurrentSubtask(
+            Object key, int maxParallelism, KeyGroupRange 
currentSubtaskKeyGroupRange) {
+        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 
maxParallelism);
+        return currentSubtaskKeyGroupRange.contains(keyGroup);
+    }
+
     /** Failed to execute Action task. */
     public static class ActionTaskExecutionException extends Exception {
         public ActionTaskExecutionException(String message, Throwable cause) {
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index e1254441..a45a52d4 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -33,9 +33,12 @@ import 
org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
 import org.apache.flink.agents.runtime.eventlog.FileEventLogger;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.ExceptionUtils;
 import org.junit.jupiter.api.Test;
@@ -151,6 +154,71 @@ public class ActionExecutionOperatorTest {
         }
     }
 
+    @Test
+    void testRestoreOnlyResumesKeysOwnedByCurrentSubtask() throws Exception {
+        final int maxParallelism = 4;
+        final int oldParallelism = 1;
+        final int newParallelism = 2;
+        final long key = 1L;
+
+        OperatorSubtaskState snapshot;
+        try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        new 
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+                        (KeySelector<Long, Long>) value -> value,
+                        TypeInformation.of(Long.class),
+                        maxParallelism,
+                        oldParallelism,
+                        0)) {
+            testHarness.open();
+            testHarness.processElement(new StreamRecord<>(key));
+            assertThat(testHarness.getTaskMailbox().size()).isEqualTo(1);
+            snapshot = testHarness.snapshot(1L, 1L);
+        }
+
+        int ownerSubtask =
+                KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+                        maxParallelism,
+                        newParallelism,
+                        KeyGroupRangeAssignment.assignToKeyGroup(key, 
maxParallelism));
+        int nonOwnerSubtask = 1 - ownerSubtask;
+
+        OperatorSubtaskState ownerState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        snapshot, maxParallelism, oldParallelism, 
newParallelism, ownerSubtask);
+        OperatorSubtaskState nonOwnerState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        snapshot, maxParallelism, oldParallelism, 
newParallelism, nonOwnerSubtask);
+
+        try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
ownerHarness =
+                        new KeyedOneInputStreamOperatorTestHarness<>(
+                                new ActionExecutionOperatorFactory(
+                                        TestAgent.getAgentPlan(false), true),
+                                (KeySelector<Long, Long>) value -> value,
+                                TypeInformation.of(Long.class),
+                                maxParallelism,
+                                newParallelism,
+                                ownerSubtask);
+                KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
nonOwnerHarness =
+                        new KeyedOneInputStreamOperatorTestHarness<>(
+                                new ActionExecutionOperatorFactory(
+                                        TestAgent.getAgentPlan(false), true),
+                                (KeySelector<Long, Long>) value -> value,
+                                TypeInformation.of(Long.class),
+                                maxParallelism,
+                                newParallelism,
+                                nonOwnerSubtask)) {
+            ownerHarness.initializeState(ownerState);
+            nonOwnerHarness.initializeState(nonOwnerState);
+
+            ownerHarness.open();
+            nonOwnerHarness.open();
+
+            assertThat(ownerHarness.getTaskMailbox().size()).isEqualTo(1);
+            assertThat(nonOwnerHarness.getTaskMailbox().size()).isZero();
+        }
+    }
+
     @Test
     void testMemoryAccessProhibitedOutsideMailboxThread() throws Exception {
         try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
testHarness =

Reply via email to