This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new fe81b514 [runtime] Filter resumed currentProcessingKeys by subtask
ownership a… (#581)
fe81b514 is described below
commit fe81b5145f94ec0b387fad90fb778fa3ef518663
Author: Joey Tong <[email protected]>
AuthorDate: Thu Mar 26 19:12:36 2026 +0800
[runtime] Filter resumed currentProcessingKeys by subtask ownership a…
(#581)
* [runtime] Filter resumed currentProcessingKeys by subtask ownership after
rescaling
---
.../runtime/operator/ActionExecutionOperator.java | 21 +++++++
.../operator/ActionExecutionOperatorTest.java | 68 ++++++++++++++++++++++
2 files changed, 89 insertions(+)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 68480440..89e35131 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -68,6 +68,8 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.python.env.PythonDependencyInfo;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
@@ -916,7 +918,13 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private void tryResumeProcessActionTasks() throws Exception {
Iterable<Object> keys = currentProcessingKeysOpState.get();
if (keys != null) {
+ int maxParallelism =
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks();
+ KeyGroupRange currentSubtaskKeyGroupRange =
+ getCurrentSubtaskKeyGroupRange(maxParallelism);
for (Object key : keys) {
+ if (!isKeyOwnedByCurrentSubtask(key, maxParallelism,
currentSubtaskKeyGroupRange)) {
+ continue;
+ }
keySegmentQueue.addKeyToLastSegment(key);
mailboxExecutor.submit(
() -> tryProcessActionTaskForKey(key), "process action
task");
@@ -1125,6 +1133,19 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
}
}
+ private KeyGroupRange getCurrentSubtaskKeyGroupRange(int maxParallelism) {
+ int parallelism =
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+ int subtaskIndex =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ return KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+ maxParallelism, parallelism, subtaskIndex);
+ }
+
+ private boolean isKeyOwnedByCurrentSubtask(
+ Object key, int maxParallelism, KeyGroupRange
currentSubtaskKeyGroupRange) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key,
maxParallelism);
+ return currentSubtaskKeyGroupRange.contains(keyGroup);
+ }
+
/** Failed to execute Action task. */
public static class ActionTaskExecutionException extends Exception {
public ActionTaskExecutionException(String message, Throwable cause) {
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index e1254441..a45a52d4 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -33,9 +33,12 @@ import
org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
import org.apache.flink.agents.runtime.eventlog.FileEventLogger;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.ExceptionUtils;
import org.junit.jupiter.api.Test;
@@ -151,6 +154,71 @@ public class ActionExecutionOperatorTest {
}
}
+ @Test
+ void testRestoreOnlyResumesKeysOwnedByCurrentSubtask() throws Exception {
+ final int maxParallelism = 4;
+ final int oldParallelism = 1;
+ final int newParallelism = 2;
+ final long key = 1L;
+
+ OperatorSubtaskState snapshot;
+ try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new
ActionExecutionOperatorFactory(TestAgent.getAgentPlan(false), true),
+ (KeySelector<Long, Long>) value -> value,
+ TypeInformation.of(Long.class),
+ maxParallelism,
+ oldParallelism,
+ 0)) {
+ testHarness.open();
+ testHarness.processElement(new StreamRecord<>(key));
+ assertThat(testHarness.getTaskMailbox().size()).isEqualTo(1);
+ snapshot = testHarness.snapshot(1L, 1L);
+ }
+
+ int ownerSubtask =
+ KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
+ maxParallelism,
+ newParallelism,
+ KeyGroupRangeAssignment.assignToKeyGroup(key,
maxParallelism));
+ int nonOwnerSubtask = 1 - ownerSubtask;
+
+ OperatorSubtaskState ownerState =
+ AbstractStreamOperatorTestHarness.repartitionOperatorState(
+ snapshot, maxParallelism, oldParallelism,
newParallelism, ownerSubtask);
+ OperatorSubtaskState nonOwnerState =
+ AbstractStreamOperatorTestHarness.repartitionOperatorState(
+ snapshot, maxParallelism, oldParallelism,
newParallelism, nonOwnerSubtask);
+
+ try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
ownerHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new ActionExecutionOperatorFactory(
+ TestAgent.getAgentPlan(false), true),
+ (KeySelector<Long, Long>) value -> value,
+ TypeInformation.of(Long.class),
+ maxParallelism,
+ newParallelism,
+ ownerSubtask);
+ KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
nonOwnerHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new ActionExecutionOperatorFactory(
+ TestAgent.getAgentPlan(false), true),
+ (KeySelector<Long, Long>) value -> value,
+ TypeInformation.of(Long.class),
+ maxParallelism,
+ newParallelism,
+ nonOwnerSubtask)) {
+ ownerHarness.initializeState(ownerState);
+ nonOwnerHarness.initializeState(nonOwnerState);
+
+ ownerHarness.open();
+ nonOwnerHarness.open();
+
+ assertThat(ownerHarness.getTaskMailbox().size()).isEqualTo(1);
+ assertThat(nonOwnerHarness.getTaskMailbox().size()).isZero();
+ }
+ }
+
@Test
void testMemoryAccessProhibitedOutsideMailboxThread() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object>
testHarness =