This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1af94462486 [FLINK-29430][runtime] Add sanity check when setCurrentKeyGroupIndex 1af94462486 is described below commit 1af9446248677b9540ed5d53bd2b42f3b724f7b5 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Fri Nov 25 15:47:23 2022 +0800 [FLINK-29430][runtime] Add sanity check when setCurrentKeyGroupIndex This closes #21362. --- .../runtime/state/heap/InternalKeyContextImpl.java | 5 +++ .../state/heap/InternalKeyContextImplTest.java | 43 ++++++++++++++++++++++ .../co/CoBroadcastWithKeyedOperatorTest.java | 35 +++++++++++++++--- 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java index 95aedce49d2..67e794ac959 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; @@ -72,6 +73,10 @@ public class InternalKeyContextImpl<K> implements InternalKeyContext<K> { @Override public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) { + if (!keyGroupRange.contains(currentKeyGroupIndex)) { + throw KeyGroupRangeOffsets.newIllegalKeyGroupException( + currentKeyGroupIndex, keyGroupRange); + } this.currentKeyGroupIndex = currentKeyGroupIndex; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java new file mode 100644 index 00000000000..894aaa20814 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.KeyGroupRange; + +import org.junit.Test; + +/** Tests for {@link InternalKeyContextImpl}. */ +public class InternalKeyContextImplTest { + + @Test + public void testSetKeyGroupIndexWithinRange() { + InternalKeyContextImpl<Integer> integerInternalKeyContext = + new InternalKeyContextImpl<>(KeyGroupRange.of(0, 128), 4096); + // There will be no exception thrown since the key group index is within the range. + integerInternalKeyContext.setCurrentKeyGroupIndex(64); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetKeyGroupIndexOutOfRange() { + InternalKeyContextImpl<Integer> integerInternalKeyContext = + new InternalKeyContextImpl<>(KeyGroupRange.of(0, 128), 4096); + // There will be an IllegalArgumentException thrown since the key group index is out of the + // range. + integerInternalKeyContext.setCurrentKeyGroupIndex(2048); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java index f080ca531b8..d7d2ba4aab4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; +import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -556,9 +557,11 @@ public class CoBroadcastWithKeyedOperatorTest { 3, 2, operatorSubtaskState3)) { - testHarness1.processElement1(new StreamRecord<>("trigger")); - testHarness2.processElement1(new StreamRecord<>("trigger")); - testHarness3.processElement1(new StreamRecord<>("trigger")); + + // Since there is a keyed operator, we should follow the key partition rules. + testHarness1.processElement1(new StreamRecord<>(findValidTriggerKey(testHarness1))); + testHarness2.processElement1(new StreamRecord<>(findValidTriggerKey(testHarness2))); + testHarness3.processElement1(new StreamRecord<>(findValidTriggerKey(testHarness3))); Queue<?> output1 = testHarness1.getOutput(); Queue<?> output2 = testHarness2.getOutput(); @@ -659,8 +662,9 @@ public class CoBroadcastWithKeyedOperatorTest { 1, operatorSubtaskState2)) { - testHarness1.processElement1(new StreamRecord<>("trigger")); - testHarness2.processElement1(new StreamRecord<>("trigger")); + // Since there is a keyed operator, we should follow the key partition rules. + testHarness1.processElement1(new StreamRecord<>(findValidTriggerKey(testHarness1))); + testHarness2.processElement1(new StreamRecord<>(findValidTriggerKey(testHarness2))); Queue<?> output1 = testHarness1.getOutput(); Queue<?> output2 = testHarness2.getOutput(); @@ -679,6 +683,27 @@ public class CoBroadcastWithKeyedOperatorTest { } } + /** + * Find a valid key for a subtask of a keyed stream, following the key partition rules. + * + * @param harness the test harness for the subtask. + * @return a valid key for the subtask. + */ + private String findValidTriggerKey(AbstractStreamOperatorTestHarness<?> harness) { + int subtask = harness.getEnvironment().getTaskInfo().getIndexOfThisSubtask(); + int maxParallelism = + harness.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(); + int parallelism = harness.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(); + + // find the right input element for this subtask + int element = 0; + while (assignKeyToParallelOperator(Integer.toString(element), maxParallelism, parallelism) + != subtask) { + element++; + } + return Integer.toString(element); + } + private static class TestFunctionWithOutput extends KeyedBroadcastProcessFunction<String, String, Integer, String> {