Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183957 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception { } /** + * Tests that the correct exception is thrown if the query + * contains a wrong queryable state name. + */ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream<Tuple2<Integer, Long>> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor<Tuple2<Integer, Long>> valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture<?> completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); --- End diff -- No because it is already "string-ified".
---