This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b6243bb0ba [FLINK-5279] Print state name and type in error message 
when trying to access keyed state in non-keyed operator
7b6243bb0ba is described below

commit 7b6243bb0ba55aafad1ca5a17bc457d229763433
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Sat Sep 2 23:09:36 2023 +0800

    [FLINK-5279] Print state name and type in error message when trying to 
access keyed state in non-keyed operator
---
 .../api/operators/StreamingRuntimeContext.java     |  4 +++-
 .../co/CoBroadcastWithNonKeyedOperatorTest.java    | 22 ++++++++++++----------
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 97dd107f352..b9fb0c003a8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -237,7 +237,9 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         checkNotNull(stateDescriptor, "The state properties must not be null");
         checkNotNull(
                 keyedStateStore,
-                "Keyed state can only be used on a 'keyed stream', i.e., after 
a 'keyBy()' operation.");
+                String.format(
+                        "Keyed state '%s' with type %s can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.",
+                        stateDescriptor.getName(), stateDescriptor.getType()));
         return keyedStateStore;
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
index dae80d11991..e33a4d25021 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
@@ -458,15 +458,14 @@ public class CoBroadcastWithNonKeyedOperatorTest {
 
         boolean exceptionThrown = false;
 
+        final ValueStateDescriptor<String> valueState =
+                new ValueStateDescriptor<>("any", 
BasicTypeInfo.STRING_TYPE_INFO);
+
         try (TwoInputStreamOperatorTestHarness<String, Integer, String> 
testHarness =
                 getInitializedTestHarness(
                         new BroadcastProcessFunction<String, Integer, 
String>() {
                             private static final long serialVersionUID = 
-1725365436500098384L;
 
-                            private final ValueStateDescriptor<String> 
valueState =
-                                    new ValueStateDescriptor<>(
-                                            "any", 
BasicTypeInfo.STRING_TYPE_INFO);
-
                             @Override
                             public void processBroadcastElement(
                                     Integer value, Context ctx, 
Collector<String> out)
@@ -488,7 +487,9 @@ public class CoBroadcastWithNonKeyedOperatorTest {
             testHarness.processElement2(new StreamRecord<>(5, 12L));
         } catch (NullPointerException e) {
             Assert.assertEquals(
-                    "Keyed state can only be used on a 'keyed stream', i.e., 
after a 'keyBy()' operation.",
+                    String.format(
+                            "Keyed state '%s' with type %s can only be used on 
a 'keyed stream', i.e., after a 'keyBy()' operation.",
+                            valueState.getName(), valueState.getType()),
                     e.getMessage());
             exceptionThrown = true;
         }
@@ -503,15 +504,14 @@ public class CoBroadcastWithNonKeyedOperatorTest {
 
         boolean exceptionThrown = false;
 
+        final ValueStateDescriptor<String> valueState =
+                new ValueStateDescriptor<>("any", 
BasicTypeInfo.STRING_TYPE_INFO);
+
         try (TwoInputStreamOperatorTestHarness<String, Integer, String> 
testHarness =
                 getInitializedTestHarness(
                         new BroadcastProcessFunction<String, Integer, 
String>() {
                             private static final long serialVersionUID = 
-1725365436500098384L;
 
-                            private final ValueStateDescriptor<String> 
valueState =
-                                    new ValueStateDescriptor<>(
-                                            "any", 
BasicTypeInfo.STRING_TYPE_INFO);
-
                             @Override
                             public void processBroadcastElement(
                                     Integer value, Context ctx, 
Collector<String> out)
@@ -533,7 +533,9 @@ public class CoBroadcastWithNonKeyedOperatorTest {
             testHarness.processElement1(new StreamRecord<>("5", 12L));
         } catch (NullPointerException e) {
             Assert.assertEquals(
-                    "Keyed state can only be used on a 'keyed stream', i.e., 
after a 'keyBy()' operation.",
+                    String.format(
+                            "Keyed state '%s' with type %s can only be used on 
a 'keyed stream', i.e., after a 'keyBy()' operation.",
+                            valueState.getName(), valueState.getType()),
                     e.getMessage());
             exceptionThrown = true;
         }

Reply via email to