This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af94462486 [FLINK-29430][runtime] Add sanity check when 
setCurrentKeyGroupIndex
1af94462486 is described below

commit 1af9446248677b9540ed5d53bd2b42f3b724f7b5
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Fri Nov 25 15:47:23 2022 +0800

    [FLINK-29430][runtime] Add sanity check when setCurrentKeyGroupIndex
    
    This closes #21362.
---
 .../runtime/state/heap/InternalKeyContextImpl.java |  5 +++
 .../state/heap/InternalKeyContextImplTest.java     | 43 ++++++++++++++++++++++
 .../co/CoBroadcastWithKeyedOperatorTest.java       | 35 +++++++++++++++---
 3 files changed, 78 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
index 95aedce49d2..67e794ac959 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -72,6 +73,10 @@ public class InternalKeyContextImpl<K> implements 
InternalKeyContext<K> {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }
         this.currentKeyGroupIndex = currentKeyGroupIndex;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
new file mode 100644
index 00000000000..894aaa20814
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+import org.junit.Test;
+
+/** Tests for {@link InternalKeyContextImpl}. */
+public class InternalKeyContextImplTest {
+
+    @Test
+    public void testSetKeyGroupIndexWithinRange() {
+        InternalKeyContextImpl<Integer> integerInternalKeyContext =
+                new InternalKeyContextImpl<>(KeyGroupRange.of(0, 128), 4096);
+        // There will be no exception thrown since the key group index is 
within the range.
+        integerInternalKeyContext.setCurrentKeyGroupIndex(64);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetKeyGroupIndexOutOfRange() {
+        InternalKeyContextImpl<Integer> integerInternalKeyContext =
+                new InternalKeyContextImpl<>(KeyGroupRange.of(0, 128), 4096);
+        // There will be an IllegalArgumentException thrown since the key 
group index is out of the
+        // range.
+        integerInternalKeyContext.setCurrentKeyGroupIndex(2048);
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index f080ca531b8..d7d2ba4aab4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -54,6 +54,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Function;
 
+import static 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -556,9 +557,11 @@ public class CoBroadcastWithKeyedOperatorTest {
                                 3,
                                 2,
                                 operatorSubtaskState3)) {
-            testHarness1.processElement1(new StreamRecord<>("trigger"));
-            testHarness2.processElement1(new StreamRecord<>("trigger"));
-            testHarness3.processElement1(new StreamRecord<>("trigger"));
+
+            // Since there is a keyed operator, we should follow the key 
partition rules.
+            testHarness1.processElement1(new 
StreamRecord<>(findValidTriggerKey(testHarness1)));
+            testHarness2.processElement1(new 
StreamRecord<>(findValidTriggerKey(testHarness2)));
+            testHarness3.processElement1(new 
StreamRecord<>(findValidTriggerKey(testHarness3)));
 
             Queue<?> output1 = testHarness1.getOutput();
             Queue<?> output2 = testHarness2.getOutput();
@@ -659,8 +662,9 @@ public class CoBroadcastWithKeyedOperatorTest {
                                 1,
                                 operatorSubtaskState2)) {
 
-            testHarness1.processElement1(new StreamRecord<>("trigger"));
-            testHarness2.processElement1(new StreamRecord<>("trigger"));
+            // Since there is a keyed operator, we should follow the key 
partition rules.
+            testHarness1.processElement1(new 
StreamRecord<>(findValidTriggerKey(testHarness1)));
+            testHarness2.processElement1(new 
StreamRecord<>(findValidTriggerKey(testHarness2)));
 
             Queue<?> output1 = testHarness1.getOutput();
             Queue<?> output2 = testHarness2.getOutput();
@@ -679,6 +683,27 @@ public class CoBroadcastWithKeyedOperatorTest {
         }
     }
 
+    /**
+     * Find a valid key for a subtask of a keyed stream, following the key 
partition rules.
+     *
+     * @param harness the test harness for the subtask.
+     * @return a valid key for the subtask.
+     */
+    private String findValidTriggerKey(AbstractStreamOperatorTestHarness<?> 
harness) {
+        int subtask = 
harness.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
+        int maxParallelism =
+                
harness.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
+        int parallelism = 
harness.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
+
+        // find the right input element for this subtask
+        int element = 0;
+        while (assignKeyToParallelOperator(Integer.toString(element), 
maxParallelism, parallelism)
+                != subtask) {
+            element++;
+        }
+        return Integer.toString(element);
+    }
+
     private static class TestFunctionWithOutput
             extends KeyedBroadcastProcessFunction<String, String, Integer, 
String> {
 

Reply via email to