[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16253781#comment-16253781 ]
ASF GitHub Bot commented on FLINK-8063: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151184658 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -1372,84 +1492,60 @@ public String fold(String accumulator, Tuple2<Integer, Long> value) throws Excep ///// General Utility Methods ////// - private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries( + private static <K, S extends State, V> CompletableFuture<S> getKvState( final QueryableStateClient client, final JobID jobId, final String queryName, final K key, final TypeInformation<K> keyTypeInfo, final StateDescriptor<S, V> stateDescriptor, - final Time retryDelay, final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static <T> CompletableFuture<T> retryWithDelay( - final Supplier<CompletableFuture<T>> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { - - final CompletableFuture<T> resultFuture = new CompletableFuture<>(); - - retryWithDelay( - resultFuture, - operation, - retries, - retryDelay, - scheduledExecutor, - failIfUnknownKeyOrNamespace); + final ScheduledExecutor executor) throws InterruptedException { + final CompletableFuture<S> resultFuture = new CompletableFuture<>(); + getKvStateIgnoringCertainExceptions( + resultFuture, client, jobId, queryName, key, keyTypeInfo, + stateDescriptor, failForUnknownKeyOrNamespace, executor); return resultFuture; } - public static <T> void retryWithDelay( - final CompletableFuture<T> resultFuture, - final Supplier<CompletableFuture<T>> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { + private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions( + final CompletableFuture<S> resultFuture, + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation<K> keyTypeInfo, + final StateDescriptor<S, V> stateDescriptor, + final boolean failForUnknownKeyOrNamespace, + final ScheduledExecutor executor) throws InterruptedException { if (!resultFuture.isDone()) { - final CompletableFuture<T> operationResultFuture = operation.get(); - operationResultFuture.whenCompleteAsync( - (t, throwable) -> { - if (throwable != null) { - if (throwable.getCause() instanceof CancellationException) { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause())); - } else if (throwable.getCause() instanceof AssertionError || - (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) { - resultFuture.completeExceptionally(throwable.getCause()); - } else { - if (retries > 0) { - final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule( - () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace), - retryDelay.toMilliseconds(), - TimeUnit.MILLISECONDS); - - resultFuture.whenComplete( - (innerT, innerThrowable) -> scheduledFuture.cancel(false)); - } else { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " + - "has been exhausted.", throwable)); - } - } - } else { - resultFuture.complete(t); + Thread.sleep(100L); + CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); + expected.whenCompleteAsync((result, throwable) -> { + if (throwable != null) { + if ( + throwable.getCause() instanceof CancellationException || + throwable.getCause() instanceof AssertionError || + (failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException) + ) { + resultFuture.completeExceptionally(throwable.getCause()); + } else { --- End diff -- this branch may never exit and should use the `deadline` as an exit condition. > Client blocks indefinitely when querying a non-existing state > ------------------------------------------------------------- > > Key: FLINK-8063 > URL: https://issues.apache.org/jira/browse/FLINK-8063 > Project: Flink > Issue Type: Improvement > Components: Queryable State > Affects Versions: 1.4.0 > Reporter: Chesnay Schepler > Assignee: Kostas Kloudas > Priority: Critical > Fix For: 1.4.0 > > > When querying for a non-existing state (as in, no state was registered under > queryableStateName) the client blocks indefinitely. -- This message was sent by Atlassian JIRA (v6.4.14#64029)