[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16253768#comment-16253768 ]
ASF GitHub Bot commented on FLINK-8063: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183957 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception { } /** + * Tests that the correct exception is thrown if the query + * contains a wrong queryable state name. + */ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream<Tuple2<Integer, Long>> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor<Tuple2<Integer, Long>> valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture<?> completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); --- End diff -- No because it is already "string-ified". > Client blocks indefinitely when querying a non-existing state > ------------------------------------------------------------- > > Key: FLINK-8063 > URL: https://issues.apache.org/jira/browse/FLINK-8063 > Project: Flink > Issue Type: Improvement > Components: Queryable State > Affects Versions: 1.4.0 > Reporter: Chesnay Schepler > Assignee: Kostas Kloudas > Priority: Critical > Fix For: 1.4.0 > > > When querying for a non-existing state (as in, no state was registered under > queryableStateName) the client blocks indefinitely. -- This message was sent by Atlassian JIRA (v6.4.14#64029)