This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7b6243bb0ba [FLINK-5279] Print state name and type in error message when trying to access keyed state in non-keyed operator 7b6243bb0ba is described below commit 7b6243bb0ba55aafad1ca5a17bc457d229763433 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Sat Sep 2 23:09:36 2023 +0800 [FLINK-5279] Print state name and type in error message when trying to access keyed state in non-keyed operator --- .../api/operators/StreamingRuntimeContext.java | 4 +++- .../co/CoBroadcastWithNonKeyedOperatorTest.java | 22 ++++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 97dd107f352..b9fb0c003a8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -237,7 +237,9 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { checkNotNull(stateDescriptor, "The state properties must not be null"); checkNotNull( keyedStateStore, - "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation."); + String.format( + "Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", + stateDescriptor.getName(), stateDescriptor.getType())); return keyedStateStore; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java index dae80d11991..e33a4d25021 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java @@ -458,15 +458,14 @@ public class CoBroadcastWithNonKeyedOperatorTest { boolean exceptionThrown = false; + final ValueStateDescriptor<String> valueState = + new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = getInitializedTestHarness( new BroadcastProcessFunction<String, Integer, String>() { private static final long serialVersionUID = -1725365436500098384L; - private final ValueStateDescriptor<String> valueState = - new ValueStateDescriptor<>( - "any", BasicTypeInfo.STRING_TYPE_INFO); - @Override public void processBroadcastElement( Integer value, Context ctx, Collector<String> out) @@ -488,7 +487,9 @@ public class CoBroadcastWithNonKeyedOperatorTest { testHarness.processElement2(new StreamRecord<>(5, 12L)); } catch (NullPointerException e) { Assert.assertEquals( - "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", + String.format( + "Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", + valueState.getName(), valueState.getType()), e.getMessage()); exceptionThrown = true; } @@ -503,15 +504,14 @@ public class CoBroadcastWithNonKeyedOperatorTest { boolean exceptionThrown = false; + final ValueStateDescriptor<String> valueState = + new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = getInitializedTestHarness( new BroadcastProcessFunction<String, Integer, String>() { private static final long serialVersionUID = -1725365436500098384L; - private final ValueStateDescriptor<String> valueState = - new ValueStateDescriptor<>( - "any", BasicTypeInfo.STRING_TYPE_INFO); - @Override public void processBroadcastElement( Integer value, Context ctx, Collector<String> out) @@ -533,7 +533,9 @@ public class CoBroadcastWithNonKeyedOperatorTest { testHarness.processElement1(new StreamRecord<>("5", 12L)); } catch (NullPointerException e) { Assert.assertEquals( - "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", + String.format( + "Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", + valueState.getName(), valueState.getType()), e.getMessage()); exceptionThrown = true; }