[ 
https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16253781#comment-16253781
 ] 

ASF GitHub Bot commented on FLINK-8063:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5021#discussion_r151184658
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
    @@ -1372,84 +1492,60 @@ public String fold(String accumulator, 
Tuple2<Integer, Long> value) throws Excep
     
        /////                           General Utility Methods                 
        //////
     
    -   private static <K, S extends State, V> CompletableFuture<S> 
getKvStateWithRetries(
    +   private static <K, S extends State, V> CompletableFuture<S> getKvState(
                        final QueryableStateClient client,
                        final JobID jobId,
                        final String queryName,
                        final K key,
                        final TypeInformation<K> keyTypeInfo,
                        final StateDescriptor<S, V> stateDescriptor,
    -                   final Time retryDelay,
                        final boolean failForUnknownKeyOrNamespace,
    -                   final ScheduledExecutor executor) {
    -           return retryWithDelay(
    -                           () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor),
    -                           NO_OF_RETRIES,
    -                           retryDelay,
    -                           executor,
    -                           failForUnknownKeyOrNamespace);
    -   }
    -
    -   private static <T> CompletableFuture<T> retryWithDelay(
    -                   final Supplier<CompletableFuture<T>> operation,
    -                   final int retries,
    -                   final Time retryDelay,
    -                   final ScheduledExecutor scheduledExecutor,
    -                   final boolean failIfUnknownKeyOrNamespace) {
    -
    -           final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
    -
    -           retryWithDelay(
    -                           resultFuture,
    -                           operation,
    -                           retries,
    -                           retryDelay,
    -                           scheduledExecutor,
    -                           failIfUnknownKeyOrNamespace);
    +                   final ScheduledExecutor executor) throws 
InterruptedException {
     
    +           final CompletableFuture<S> resultFuture = new 
CompletableFuture<>();
    +           getKvStateIgnoringCertainExceptions(
    +                           resultFuture, client, jobId, queryName, key, 
keyTypeInfo,
    +                           stateDescriptor, failForUnknownKeyOrNamespace, 
executor);
                return resultFuture;
        }
     
    -   public static <T> void retryWithDelay(
    -                   final CompletableFuture<T> resultFuture,
    -                   final Supplier<CompletableFuture<T>> operation,
    -                   final int retries,
    -                   final Time retryDelay,
    -                   final ScheduledExecutor scheduledExecutor,
    -                   final boolean failIfUnknownKeyOrNamespace) {
    +   private static <K, S extends State, V> void 
getKvStateIgnoringCertainExceptions(
    +                   final CompletableFuture<S> resultFuture,
    +                   final QueryableStateClient client,
    +                   final JobID jobId,
    +                   final String queryName,
    +                   final K key,
    +                   final TypeInformation<K> keyTypeInfo,
    +                   final StateDescriptor<S, V> stateDescriptor,
    +                   final boolean failForUnknownKeyOrNamespace,
    +                   final ScheduledExecutor executor) throws 
InterruptedException {
     
                if (!resultFuture.isDone()) {
    -                   final CompletableFuture<T> operationResultFuture = 
operation.get();
    -                   operationResultFuture.whenCompleteAsync(
    -                                   (t, throwable) -> {
    -                                           if (throwable != null) {
    -                                                   if 
(throwable.getCause() instanceof CancellationException) {
    -                                                           
resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation 
future was cancelled.", throwable.getCause()));
    -                                                   } else if 
(throwable.getCause() instanceof AssertionError ||
    -                                                                   
(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)) {
    -                                                           
resultFuture.completeExceptionally(throwable.getCause());
    -                                                   } else {
    -                                                           if (retries > 
0) {
    -                                                                   final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
    -                                                                           
        () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, 
scheduledExecutor, failIfUnknownKeyOrNamespace),
    -                                                                           
        retryDelay.toMilliseconds(),
    -                                                                           
        TimeUnit.MILLISECONDS);
    -
    -                                                                   
resultFuture.whenComplete(
    -                                                                           
        (innerT, innerThrowable) -> scheduledFuture.cancel(false));
    -                                                           } else {
    -                                                                   
resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not 
complete the operation. Number of retries " +
    -                                                                           
        "has been exhausted.", throwable));
    -                                                           }
    -                                                   }
    -                                           } else {
    -                                                   
resultFuture.complete(t);
    +                   Thread.sleep(100L);
    +                   CompletableFuture<S> expected = 
client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, 
VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
    +                   expected.whenCompleteAsync((result, throwable) -> {
    +                           if (throwable != null) {
    +                                   if (
    +                                                   throwable.getCause() 
instanceof CancellationException ||
    +                                                   throwable.getCause() 
instanceof AssertionError ||
    +                                                   
(failForUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)
    +                                   ) {
    +                                           
resultFuture.completeExceptionally(throwable.getCause());
    +                                   } else {
    --- End diff --
    
    this branch may never exit and should use the `deadline` as an exit 
condition.


> Client blocks indefinitely when querying a non-existing state
> -------------------------------------------------------------
>
>                 Key: FLINK-8063
>                 URL: https://issues.apache.org/jira/browse/FLINK-8063
>             Project: Flink
>          Issue Type: Improvement
>          Components: Queryable State
>    Affects Versions: 1.4.0
>            Reporter: Chesnay Schepler
>            Assignee: Kostas Kloudas
>            Priority: Critical
>             Fix For: 1.4.0
>
>
> When querying for a non-existing state (as in, no state was registered under 
> queryableStateName) the client blocks indefinitely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to