http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
index a7f65f3..7ff4ec6 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -34,8 +35,12 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -53,25 +58,27 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.OnSuccess;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.function.Supplier;
 
 import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
@@ -84,10 +91,12 @@ import static org.junit.Assert.assertTrue;
  */
 public abstract class AbstractQueryableStateITCase extends TestLogger {
 
-       protected static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10000, TimeUnit.SECONDS);
-       private static final FiniteDuration QUERY_RETRY_DELAY = new 
FiniteDuration(100, TimeUnit.MILLISECONDS);
+       private static final int NO_OF_RETRIES = 100;
+       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10000L, TimeUnit.SECONDS);
+       private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
 
-       protected static ActorSystem testActorSystem;
+       private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(4);
+       private final ScheduledExecutor executor = new 
ScheduledExecutorServiceAdapter(executorService);
 
        /**
         * State backend to use.
@@ -136,7 +145,9 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                final Deadline deadline = TEST_TIMEOUT.fromNow();
                final int numKeys = 256;
 
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
                JobID jobId = null;
 
@@ -150,7 +161,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                        .addSource(new 
TestKeyRangeSource(numKeys));
@@ -163,15 +174,14 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                        final String queryName = "hakuna-matata";
 
-                       final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = 7143749578983540352L;
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
7143749578983540352L;
 
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState(queryName, 
reducingState);
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).asQueryableState(queryName, reducingState);
 
                        // Submit the job graph
                        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -188,19 +198,19 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                        while (!allNonZero && deadline.hasTimeLeft()) {
                                allNonZero = true;
 
-                               final List<Future<Tuple2<Integer, Long>>> 
futures = new ArrayList<>(numKeys);
+                               final List<CompletableFuture<Tuple2<Integer, 
Long>>> futures = new ArrayList<>(numKeys);
 
                                for (int i = 0; i < numKeys; i++) {
                                        final int key = i;
 
-                                       if (counts.get(key) > 0) {
+                                       if (counts.get(key) > 0L) {
                                                // Skip this one
                                                continue;
                                        } else {
                                                allNonZero = false;
                                        }
 
-                                       Future<Tuple2<Integer, Long>> result = 
getKvStateWithRetries(
+                                       CompletableFuture<Tuple2<Integer, 
Long>> result = getKvStateWithRetries(
                                                        client,
                                                        jobId,
                                                        queryName,
@@ -208,24 +218,21 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        reducingState,
                                                        QUERY_RETRY_DELAY,
-                                                       false);
+                                                       false,
+                                                       executor);
 
-                                       result.onSuccess(new 
OnSuccess<Tuple2<Integer, Long>>() {
-                                               @Override
-                                               public void 
onSuccess(Tuple2<Integer, Long> result) throws Throwable {
-                                                       counts.set(key, 
result.f1);
-                                                       assertEquals("Key 
mismatch", key, result.f0.intValue());
-                                               }
-                                       }, testActorSystem.dispatcher());
+                                       result.thenAccept(res -> {
+                                               counts.set(key, res.f1);
+                                               assertEquals("Key mismatch", 
key, res.f0.intValue());
+                                       });
 
                                        futures.add(result);
                                }
 
-                               Future<Iterable<Tuple2<Integer, Long>>> 
futureSequence = Futures.sequence(
-                                               futures,
-                                               testActorSystem.dispatcher());
-
-                               Await.ready(futureSequence, 
deadline.timeLeft());
+                               // wait for all the futures to complete
+                               CompletableFuture
+                                               .allOf(futures.toArray(new 
CompletableFuture<?>[futures.size()]))
+                                               
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
                        assertTrue("Not all keys are non-zero", allNonZero);
@@ -238,15 +245,15 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
                                                
.getLeaderGateway(deadline.timeLeft())
                                                .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-                               Await.ready(cancellation, deadline.timeLeft());
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
-                       client.shutDown();
+                       client.shutdown();
                }
        }
 
@@ -274,7 +281,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                        .addSource(new 
TestKeyRangeSource(numKeys));
@@ -311,22 +318,23 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
                        jobId = jobGraph.getJobID();
 
-                       Future<TestingJobManagerMessages.JobStatusIs> 
failedFuture = cluster
-                                       .getLeaderGateway(deadline.timeLeft())
-                                       .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), 
deadline.timeLeft())
-                                       
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class));
+                       
CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = 
FutureUtils.toJava(
+                                       
cluster.getLeaderGateway(deadline.timeLeft())
+                                                       .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), 
deadline.timeLeft())
+                                                       
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
 
                        cluster.submitJobDetached(jobGraph);
 
-                       TestingJobManagerMessages.JobStatusIs jobStatus = 
Await.result(failedFuture, deadline.timeLeft());
+                       TestingJobManagerMessages.JobStatusIs jobStatus =
+                                       
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        assertEquals(JobStatus.FAILED, jobStatus.state());
 
                        // Get the job and check the cause
-                       JobManagerMessages.JobFound jobFound = Await.result(
+                       JobManagerMessages.JobFound jobFound = 
FutureUtils.toJava(
                                        
cluster.getLeaderGateway(deadline.timeLeft())
                                                        .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-                                                       
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)),
-                                       deadline.timeLeft());
+                                                       
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+                                       .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
 
                        String failureCause = 
jobFound.executionGraph().getFailureCause().getExceptionAsString();
 
@@ -338,10 +346,10 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
+                               scala.concurrent.Future<CancellationSuccess> 
cancellation = cluster
                                                
.getLeaderGateway(deadline.timeLeft())
                                                .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                                               
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
 
                                Await.ready(cancellation, deadline.timeLeft());
                        }
@@ -359,9 +367,11 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                // Config
                final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-               final int numElements = 1024;
+               final long numElements = 1024L;
 
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
                JobID jobId = null;
                try {
@@ -371,7 +381,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                        .addSource(new 
TestAscendingValueSource(numElements));
@@ -381,15 +391,14 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                                        "any",
                                        source.getType());
 
-                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = 7662520075515707428L;
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
7662520075515707428L;
 
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState("hakuna", 
valueState);
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).asQueryableState("hakuna", valueState);
 
                        // Submit the job graph
                        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -397,22 +406,19 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                        cluster.submitJobDetached(jobGraph);
 
-                       // Now query
-                       long expected = numElements;
-
-                       executeQuery(deadline, client, jobId, "hakuna", 
valueState, expected);
+                       executeQuery(deadline, client, jobId, "hakuna", 
valueState, numElements);
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
                                                
.getLeaderGateway(deadline.timeLeft())
                                                .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-                               Await.ready(cancellation, deadline.timeLeft());
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
-                       client.shutDown();
+                       client.shutdown();
                }
        }
 
@@ -425,9 +431,11 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                // Config
                final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-               final int numElements = 1024;
+               final long numElements = 1024L;
 
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
                JobID jobId = null;
                try {
@@ -437,7 +445,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                .addSource(new 
TestAscendingValueSource(numElements));
@@ -481,15 +489,15 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                       .getLeaderGateway(deadline.timeLeft())
-                                       .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                       
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-                               Await.ready(cancellation, deadline.timeLeft());
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
-                       client.shutDown();
+                       client.shutdown();
                }
        }
 
@@ -508,23 +516,25 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                for (int key = 0; key < maxParallelism; key++) {
                        boolean success = false;
                        while (deadline.hasTimeLeft() && !success) {
-                               Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
-                                       jobId,
-                                       queryableStateName,
-                                       key,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       stateDescriptor,
-                                       QUERY_RETRY_DELAY,
-                                       false);
+                               CompletableFuture<Tuple2<Integer, Long>> future 
= getKvStateWithRetries(
+                                               client,
+                                               jobId,
+                                               queryableStateName,
+                                               key,
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               stateDescriptor,
+                                               QUERY_RETRY_DELAY,
+                                               false,
+                                               executor);
 
-                               Tuple2<Integer, Long> value = 
Await.result(future, deadline.timeLeft());
+                               Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
                                assertEquals("Key mismatch", key, 
value.f0.intValue());
                                if (expected == value.f1) {
                                        success = true;
                                } else {
                                        // Retry
-                                       Thread.sleep(50);
+                                       Thread.sleep(50L);
                                }
                        }
 
@@ -554,16 +564,17 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                                                BasicTypeInfo.INT_TYPE_INFO,
                                                valueSerializer,
                                                QUERY_RETRY_DELAY,
-                                               false);
+                                               false,
+                                               executor);
 
-                               Tuple2<Integer, Long> value = 
Await.result(future, deadline.timeLeft());
+                               Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
                                assertEquals("Key mismatch", key, 
value.f0.intValue());
                                if (expected == value.f1) {
                                        success = true;
                                } else {
                                        // Retry
-                                       Thread.sleep(50);
+                                       Thread.sleep(50L);
                                }
                        }
 
@@ -575,20 +586,21 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
         * Tests simple value state queryable state instance with a default 
value
         * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
         * tuples, the key is mapped to 1 but key 0 is queried which should 
throw
-        * a {@link UnknownKeyOrNamespace} exception.
+        * a {@link UnknownKeyOrNamespaceException} exception.
         *
-        * @throws UnknownKeyOrNamespace thrown due querying a non-existent key
+        * @throws UnknownKeyOrNamespaceException thrown due querying a 
non-existent key
         */
-       @Test(expected = UnknownKeyOrNamespace.class)
-       public void testValueStateDefault() throws
-               Exception, UnknownKeyOrNamespace {
+       @Test(expected = UnknownKeyOrNamespaceException.class)
+       public void testValueStateDefault() throws Throwable {
 
                // Config
                final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-               final int numElements = 1024;
+               final long numElements = 1024L;
 
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
                JobID jobId = null;
                try {
@@ -600,7 +612,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // don't explicitly check that all slots are available 
before
                        // submitting.
                        env.setRestartStrategy(RestartStrategies
-                               .fixedDelayRestart(Integer.MAX_VALUE, 1000));
+                               .fixedDelayRestart(Integer.MAX_VALUE, 1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                .addSource(new 
TestAscendingValueSource(numElements));
@@ -635,30 +647,37 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                        // Now query
                        int key = 0;
-                       Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
-                               jobId,
-                               queryableState.getQueryableStateName(),
-                               key,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               valueState,
-                               QUERY_RETRY_DELAY,
-                               true);
-
-                       Await.result(future, deadline.timeLeft());
+                       CompletableFuture<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(
+                                       client,
+                                       jobId,
+                                       queryableState.getQueryableStateName(),
+                                       key,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       valueState,
+                                       QUERY_RETRY_DELAY,
+                                       true,
+                                       executor);
+
+                       try {
+                               future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                       } catch (ExecutionException | CompletionException e) {
+                               // get() on a completedExceptionally future 
wraps the
+                               // exception in an ExecutionException.
+                               throw e.getCause();
+                       }
                } finally {
+
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                       .getLeaderGateway(deadline.timeLeft())
-                                       .ask(new 
JobManagerMessages.CancelJob(jobId),
-                                               deadline.timeLeft())
-                                       
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(
-                                               CancellationSuccess.class));
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-                               Await.ready(cancellation, deadline.timeLeft());
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
-                       client.shutDown();
+                       client.shutdown();
                }
        }
 
@@ -675,9 +694,11 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                // Config
                final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-               final int numElements = 1024;
+               final long numElements = 1024L;
 
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
                JobID jobId = null;
                try {
@@ -687,7 +708,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
 
                        DataStream<Tuple2<Integer, Long>> source = env
                                        .addSource(new 
TestAscendingValueSource(numElements));
@@ -709,23 +730,21 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                        cluster.submitJobDetached(jobGraph);
 
-                       // Now query
-                       long expected = numElements;
-
                        executeQuery(deadline, client, jobId, "matata",
-                                       queryableState.getValueSerializer(), 
expected);
+                                       queryableState.getValueSerializer(), 
numElements);
                } finally {
+
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                               
.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(
+                                               
cluster.getLeaderGateway(deadline.timeLeft())
+                                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-                               Await.ready(cancellation, deadline.timeLeft());
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
-                       client.shutDown();
+                       client.shutdown();
                }
        }
 
@@ -743,7 +762,9 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                final int numElements = 1024;
 
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
                JobID jobId = null;
                try {
@@ -788,21 +809,23 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
-                                       Future<String> future = 
getKvStateWithRetries(client,
+                                       CompletableFuture<String> future = 
getKvStateWithRetries(
+                                                       client,
                                                        jobId,
                                                        
queryableState.getQueryableStateName(),
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        foldingState,
                                                        QUERY_RETRY_DELAY,
-                                                       false);
+                                                       false,
+                                                       executor);
 
-                                       String value = Await.result(future, 
deadline.timeLeft());
+                                       String value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                                        if (expected.equals(value)) {
                                                success = true;
                                        } else {
                                                // Retry
-                                               Thread.sleep(50);
+                                               Thread.sleep(50L);
                                        }
                                }
 
@@ -811,15 +834,15 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
                                                
.getLeaderGateway(deadline.timeLeft())
                                                .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-                               Await.ready(cancellation, deadline.timeLeft());
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
-                       client.shutDown();
+                       client.shutdown();
                }
        }
 
@@ -834,9 +857,11 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                // Config
                final Deadline deadline = TEST_TIMEOUT.fromNow();
 
-               final int numElements = 1024;
+               final long numElements = 1024L;
 
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+               final QueryableStateClient client = new QueryableStateClient(
+                               "localhost",
+                               
cluster.configuration().getInteger(QueryableStateOptions.SERVER_PORT));
 
                JobID jobId = null;
                try {
@@ -858,15 +883,14 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                                                        new SumReduce(),
                                                        source.getType());
 
-                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = 8470749712274833552L;
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
 
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState("jungle", 
reducingState);
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).asQueryableState("jungle", reducingState);
 
                        // Submit the job graph
                        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -877,117 +901,24 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                        // Wait until job is running
 
                        // Now query
-                       long expected = numElements * (numElements + 1) / 2;
+                       long expected = numElements * (numElements + 1L) / 2L;
 
                        executeQuery(deadline, client, jobId, "jungle", 
reducingState, expected);
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
                                                
.getLeaderGateway(deadline.timeLeft())
                                                .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
 
-                               Await.ready(cancellation, deadline.timeLeft());
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        }
 
-                       client.shutDown();
+                       client.shutdown();
                }
        }
 
-       private static <K, V> Future<V> getKvStateWithRetries(
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryName,
-                       final K key,
-                       final TypeInformation<K> keyTypeInfo,
-                       final TypeSerializer<V> valueTypeSerializer,
-                       final FiniteDuration retryDelay,
-                       final boolean failForUnknownKeyOrNamespace) {
-
-               return client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
valueTypeSerializer)
-                               .recoverWith(new Recover<Future<V>>() {
-                                       @Override
-                                       public Future<V> recover(Throwable 
failure) throws Throwable {
-                                               if (failure instanceof 
AssertionError) {
-                                                       return 
Futures.failed(failure);
-                                               } else if 
(failForUnknownKeyOrNamespace &&
-                                                               (failure 
instanceof UnknownKeyOrNamespace)) {
-                                                       return 
Futures.failed(failure);
-                                               } else {
-                                                       // At startup some 
failures are expected
-                                                       // due to races. Make 
sure that they don't
-                                                       // fail this test.
-                                                       return Patterns.after(
-                                                                       
retryDelay,
-                                                                       
testActorSystem.scheduler(),
-                                                                       
testActorSystem.dispatcher(),
-                                                                       new 
Callable<Future<V>>() {
-                                                                               
@Override
-                                                                               
public Future<V> call() throws Exception {
-                                                                               
        return getKvStateWithRetries(
-                                                                               
                        client,
-                                                                               
                        jobId,
-                                                                               
                        queryName,
-                                                                               
                        key,
-                                                                               
                        keyTypeInfo,
-                                                                               
                        valueTypeSerializer,
-                                                                               
                        retryDelay,
-                                                                               
                        failForUnknownKeyOrNamespace);
-                                                                               
}
-                                                                       });
-                                               }
-                                       }
-                               }, testActorSystem.dispatcher());
-
-       }
-
-       private static <K, V> Future<V> getKvStateWithRetries(
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryName,
-                       final K key,
-                       final TypeInformation<K> keyTypeInfo,
-                       final StateDescriptor<?, V> stateDescriptor,
-                       final FiniteDuration retryDelay,
-                       final boolean failForUnknownKeyOrNamespace) {
-
-               return client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor)
-                               .recoverWith(new Recover<Future<V>>() {
-                                       @Override
-                                       public Future<V> recover(Throwable 
failure) throws Throwable {
-                                               if (failure instanceof 
AssertionError) {
-                                                       return 
Futures.failed(failure);
-                                               } else if 
(failForUnknownKeyOrNamespace &&
-                                                               (failure 
instanceof UnknownKeyOrNamespace)) {
-                                                       return 
Futures.failed(failure);
-                                               } else {
-                                                       // At startup some 
failures are expected
-                                                       // due to races. Make 
sure that they don't
-                                                       // fail this test.
-                                                       return Patterns.after(
-                                                                       
retryDelay,
-                                                                       
testActorSystem.scheduler(),
-                                                                       
testActorSystem.dispatcher(),
-                                                                       new 
Callable<Future<V>>() {
-                                                                               
@Override
-                                                                               
public Future<V> call() throws Exception {
-                                                                               
        return getKvStateWithRetries(
-                                                                               
                        client,
-                                                                               
                        jobId,
-                                                                               
                        queryName,
-                                                                               
                        key,
-                                                                               
                        keyTypeInfo,
-                                                                               
                        stateDescriptor,
-                                                                               
                        retryDelay,
-                                                                               
                        failForUnknownKeyOrNamespace);
-                                                                               
}
-                                                                       });
-                                               }
-                                       }
-                               }, testActorSystem.dispatcher());
-       }
-
        /**
         * Test source producing (key, 0)..(key, maxValue) with key being the 
sub
         * task index.
@@ -1030,7 +961,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
 
                        while (isRunning) {
                                synchronized (this) {
-                                       this.wait();
+                                       wait();
                                }
                        }
                }
@@ -1040,7 +971,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        isRunning = false;
 
                        synchronized (this) {
-                               this.notifyAll();
+                               notifyAll();
                        }
                }
 
@@ -1125,4 +1056,105 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                }
        }
 
+       /////                           General Utility Methods                 
        //////
+
+       private static <K, V> Future<V> getKvStateWithRetries(
+                       final QueryableStateClient client,
+                       final JobID jobId,
+                       final String queryName,
+                       final K key,
+                       final TypeInformation<K> keyTypeInfo,
+                       final TypeSerializer<V> valueTypeSerializer,
+                       final Time retryDelay,
+                       final boolean failForUnknownKeyOrNamespace,
+                       final ScheduledExecutor executor) {
+
+               return retryWithDelay(
+                               () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
valueTypeSerializer),
+                               NO_OF_RETRIES,
+                               retryDelay,
+                               executor,
+                               failForUnknownKeyOrNamespace);
+       }
+
+       private static <K, V> CompletableFuture<V> getKvStateWithRetries(
+                       final QueryableStateClient client,
+                       final JobID jobId,
+                       final String queryName,
+                       final K key,
+                       final TypeInformation<K> keyTypeInfo,
+                       final StateDescriptor<?, V> stateDescriptor,
+                       final Time retryDelay,
+                       final boolean failForUnknownKeyOrNamespace,
+                       final ScheduledExecutor executor) {
+               return retryWithDelay(
+                               () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor),
+                               NO_OF_RETRIES,
+                               retryDelay,
+                               executor,
+                               failForUnknownKeyOrNamespace);
+       }
+
+       private static <T> CompletableFuture<T> retryWithDelay(
+                       final Supplier<CompletableFuture<T>> operation,
+                       final int retries,
+                       final Time retryDelay,
+                       final ScheduledExecutor scheduledExecutor,
+                       final boolean failIfUnknownKeyOrNamespace) {
+
+               final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
+
+               retryWithDelay(
+                               resultFuture,
+                               operation,
+                               retries,
+                               retryDelay,
+                               scheduledExecutor,
+                               failIfUnknownKeyOrNamespace);
+
+               return resultFuture;
+       }
+
+       public static <T> void retryWithDelay(
+                       final CompletableFuture<T> resultFuture,
+                       final Supplier<CompletableFuture<T>> operation,
+                       final int retries,
+                       final Time retryDelay,
+                       final ScheduledExecutor scheduledExecutor,
+                       final boolean failIfUnknownKeyOrNamespace) {
+
+               if (!resultFuture.isDone()) {
+                       final CompletableFuture<T> operationResultFuture = 
operation.get();
+                       operationResultFuture.whenCompleteAsync(
+                                       (t, throwable) -> {
+                                               if (throwable != null) {
+                                                       if 
(throwable.getCause() instanceof CancellationException) {
+                                                               
resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation 
future was cancelled.", throwable.getCause()));
+                                                       } else if 
(throwable.getCause() instanceof AssertionError ||
+                                                                       
(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)) {
+                                                               
resultFuture.completeExceptionally(throwable.getCause());
+                                                       } else {
+                                                               if (retries > 
0) {
+                                                                       final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+                                                                               
        () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, 
scheduledExecutor, failIfUnknownKeyOrNamespace),
+                                                                               
        retryDelay.toMilliseconds(),
+                                                                               
        TimeUnit.MILLISECONDS);
+
+                                                                       
resultFuture.whenComplete(
+                                                                               
        (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+                                                               } else {
+                                                                       
resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not 
complete the operation. Number of retries " +
+                                                                               
        "has been exhausted.", throwable));
+                                                               }
+                                                       }
+                                               } else {
+                                                       
resultFuture.complete(t);
+                                               }
+                                       },
+                                       scheduledExecutor);
+
+                       resultFuture.whenComplete(
+                                       (t, throwable) -> 
operationResultFuture.cancel(false));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
index 15a5ff6..a2a9678 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 
@@ -40,7 +39,7 @@ import static org.junit.Assert.fail;
 public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
 
        private static final int NUM_JMS = 2;
-       private static final int NUM_TMS = 4;
+       private static final int NUM_TMS = 1;
        private static final int NUM_SLOTS_PER_TM = 4;
 
        private static TestingServer zkServer;
@@ -67,8 +66,6 @@ public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableSt
                        cluster = new TestingCluster(config, false);
                        cluster.start();
 
-                       testActorSystem = AkkaUtils.createDefaultActorSystem();
-
                        // verify that we are in HA mode
                        Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.ZOOKEEPER);
 
@@ -85,9 +82,6 @@ public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableSt
                        cluster.awaitTermination();
                }
 
-               testActorSystem.shutdown();
-               testActorSystem.awaitTermination();
-
                try {
                        zkServer.stop();
                        zkServer.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
index c52acc8..1173d0d 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 
@@ -37,7 +36,7 @@ import static org.junit.Assert.fail;
  */
 public abstract class NonHAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
 
-       private static final int NUM_TMS = 2;
+       private static final int NUM_TMS = 1;
        private static final int NUM_SLOTS_PER_TM = 4;
 
        @BeforeClass
@@ -48,14 +47,13 @@ public abstract class NonHAAbstractQueryableStateITCase 
extends AbstractQueryabl
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
                        
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+                       config.setInteger(QueryableStateOptions.SERVER_PORT, 
9069);
                        config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
                        
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
 
                        cluster = new TestingCluster(config, false);
                        cluster.start(true);
 
-                       testActorSystem = AkkaUtils.createDefaultActorSystem();
-
                        // verify that we are not in HA mode
                        Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.NONE);
 
@@ -73,9 +71,5 @@ public abstract class NonHAAbstractQueryableStateITCase 
extends AbstractQueryabl
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-
-               if (testActorSystem != null) {
-                       testActorSystem.shutdown();
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
deleted file mode 100644
index d9a41a1..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownJobManager;
-import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link AkkaKvStateLocationLookupService}.
- */
-public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
-
-       /** The default timeout. */
-       private static final FiniteDuration TIMEOUT = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-       /** Test actor system shared between the tests. */
-       private static ActorSystem testActorSystem;
-
-       @BeforeClass
-       public static void setUp() throws Exception {
-               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-       }
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (testActorSystem != null) {
-                       testActorSystem.shutdown();
-               }
-       }
-
-       /**
-        * Tests responses if no leader notification has been reported or 
leadership
-        * has been lost (leaderAddress = <code>null</code>).
-        */
-       @Test
-       public void testNoJobManagerRegistered() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
-                       null,
-                       null);
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-               lookupService.start();
-
-               //
-               // No leader registered initially => fail with UnknownJobManager
-               //
-               try {
-                       JobID jobId = new JobID();
-                       String name = "coffee";
-
-                       Future<KvStateLocation> locationFuture = 
lookupService.getKvStateLookupInfo(jobId, name);
-
-                       Await.result(locationFuture, TIMEOUT);
-                       fail("Did not throw expected Exception");
-               } catch (UnknownJobManager ignored) {
-                       // Expected
-               }
-
-               assertEquals("Received unexpected lookup", 0, received.size());
-
-               //
-               // Leader registration => communicate with new leader
-               //
-               UUID leaderSessionId = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
-               KvStateLocation expected = new KvStateLocation(new JobID(), new 
JobVertexID(), 8282, "tea");
-
-               ActorRef testActor = LookupResponseActor.create(received, 
leaderSessionId, expected);
-
-               String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, 
testActor);
-
-               // Notify the service about a leader
-               leaderRetrievalService.notifyListener(testActorAddress, 
leaderSessionId);
-
-               JobID jobId = new JobID();
-               String name = "tea";
-
-               // Verify that the leader response is handled
-               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
-               assertEquals(expected, location);
-
-               // Verify that the correct message was sent to the leader
-               assertEquals(1, received.size());
-
-               verifyLookupMsg(received.poll(), jobId, name);
-
-               //
-               // Leader loss => fail with UnknownJobManager
-               //
-               leaderRetrievalService.notifyListener(null, null);
-
-               try {
-                       Future<KvStateLocation> locationFuture = lookupService
-                                       .getKvStateLookupInfo(new JobID(), 
"coffee");
-
-                       Await.result(locationFuture, TIMEOUT);
-                       fail("Did not throw expected Exception");
-               } catch (UnknownJobManager ignored) {
-                       // Expected
-               }
-
-               // No new messages received
-               assertEquals(0, received.size());
-       }
-
-       /**
-        * Tests that messages are properly decorated with the leader session 
ID.
-        */
-       @Test
-       public void testLeaderSessionIdChange() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
-                       "localhost",
-                       HighAvailabilityServices.DEFAULT_LEADER_ID);
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-               lookupService.start();
-
-               // Create test actors with random leader session IDs
-               KvStateLocation expected1 = new KvStateLocation(new JobID(), 
new JobVertexID(), 8282, "salt");
-               UUID leaderSessionId1 = UUID.randomUUID();
-               ActorRef testActor1 = LookupResponseActor.create(received, 
leaderSessionId1, expected1);
-               String testActorAddress1 = 
AkkaUtils.getAkkaURL(testActorSystem, testActor1);
-
-               KvStateLocation expected2 = new KvStateLocation(new JobID(), 
new JobVertexID(), 22321, "pepper");
-               UUID leaderSessionId2 = UUID.randomUUID();
-               ActorRef testActor2 = LookupResponseActor.create(received, 
leaderSessionId1, expected2);
-               String testActorAddress2 = 
AkkaUtils.getAkkaURL(testActorSystem, testActor2);
-
-               JobID jobId = new JobID();
-
-               //
-               // Notify about first leader
-               //
-               leaderRetrievalService.notifyListener(testActorAddress1, 
leaderSessionId1);
-
-               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
-               assertEquals(expected1, location);
-
-               assertEquals(1, received.size());
-               verifyLookupMsg(received.poll(), jobId, "rock");
-
-               //
-               // Notify about second leader
-               //
-               leaderRetrievalService.notifyListener(testActorAddress2, 
leaderSessionId2);
-
-               location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
-               assertEquals(expected2, location);
-
-               assertEquals(1, received.size());
-               verifyLookupMsg(received.poll(), jobId, "roll");
-       }
-
-       /**
-        * Tests that lookups are retried when no leader notification is 
available.
-        */
-       @Test
-       public void testRetryOnUnknownJobManager() throws Exception {
-               final 
Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = 
new LinkedBlockingQueue<>();
-
-               AkkaKvStateLocationLookupService.LookupRetryStrategyFactory 
retryStrategy =
-                               new 
AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() {
-                                       @Override
-                                       public 
AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
-                                               return retryStrategies.poll();
-                                       }
-                               };
-
-               final TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService(
-                       null,
-                       null);
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               retryStrategy);
-
-               lookupService.start();
-
-               //
-               // Test call to retry
-               //
-               final AtomicBoolean hasRetried = new AtomicBoolean();
-               retryStrategies.add(
-                               new 
AkkaKvStateLocationLookupService.LookupRetryStrategy() {
-                                       @Override
-                                       public FiniteDuration getRetryDelay() {
-                                               return FiniteDuration.Zero();
-                                       }
-
-                                       @Override
-                                       public boolean tryRetry() {
-                                               if 
(hasRetried.compareAndSet(false, true)) {
-                                                       return true;
-                                               }
-                                               return false;
-                                       }
-                               });
-
-               Future<KvStateLocation> locationFuture = 
lookupService.getKvStateLookupInfo(new JobID(), "yessir");
-
-               Await.ready(locationFuture, TIMEOUT);
-               assertTrue("Did not retry ", hasRetried.get());
-
-               //
-               // Test leader notification after retry
-               //
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               KvStateLocation expected = new KvStateLocation(new JobID(), new 
JobVertexID(), 12122, "garlic");
-               ActorRef testActor = LookupResponseActor.create(received, null, 
expected);
-               final String testActorAddress = 
AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-               retryStrategies.add(new 
AkkaKvStateLocationLookupService.LookupRetryStrategy() {
-                       @Override
-                       public FiniteDuration getRetryDelay() {
-                               return FiniteDuration.apply(100, 
TimeUnit.MILLISECONDS);
-                       }
-
-                       @Override
-                       public boolean tryRetry() {
-                               
leaderRetrievalService.notifyListener(testActorAddress, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-                               return true;
-                       }
-               });
-
-               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), 
TIMEOUT);
-               assertEquals(expected, location);
-       }
-
-       @Test
-       public void testUnexpectedResponseType() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
-                       "localhost",
-                       HighAvailabilityServices.DEFAULT_LEADER_ID);
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-               lookupService.start();
-
-               // Create test actors with random leader session IDs
-               String expected = "unexpected-response-type";
-               ActorRef testActor = LookupResponseActor.create(received, null, 
expected);
-               String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, 
testActor);
-
-               leaderRetrievalService.notifyListener(testActorAddress, null);
-
-               try {
-                       Await.result(lookupService.getKvStateLookupInfo(new 
JobID(), "spicy"), TIMEOUT);
-                       fail("Did not throw expected Exception");
-               } catch (Throwable ignored) {
-                       // Expected
-               }
-       }
-
-       private static final class LookupResponseActor extends 
FlinkUntypedActor {
-
-               /** Received lookup messages. */
-               private final Queue<LookupKvStateLocation> receivedLookups;
-
-               /** Responses on KvStateMessage.LookupKvStateLocation messages. 
*/
-               private final Queue<Object> lookupResponses;
-
-               /** The leader session ID. */
-               private UUID leaderSessionId;
-
-               public LookupResponseActor(
-                               Queue<LookupKvStateLocation> receivedLookups,
-                               UUID leaderSessionId, Object... 
lookupResponses) {
-
-                       this.receivedLookups = 
Preconditions.checkNotNull(receivedLookups, "Received lookups");
-                       this.leaderSessionId = leaderSessionId;
-                       this.lookupResponses = new ArrayDeque<>();
-
-                       if (lookupResponses != null) {
-                               for (Object resp : lookupResponses) {
-                                       this.lookupResponses.add(resp);
-                               }
-                       }
-               }
-
-               @Override
-               public void handleMessage(Object message) throws Exception {
-                       if (message instanceof LookupKvStateLocation) {
-                               // Add to received lookups queue
-                               receivedLookups.add((LookupKvStateLocation) 
message);
-
-                               Object msg = lookupResponses.poll();
-                               if (msg != null) {
-                                       if (msg instanceof Throwable) {
-                                               sender().tell(new 
Status.Failure((Throwable) msg), self());
-                                       } else {
-                                               sender().tell(new 
Status.Success(msg), self());
-                                       }
-                               }
-                       } else if (message instanceof UUID) {
-                               this.leaderSessionId = (UUID) message;
-                       } else {
-                               LOG.debug("Received unhandled message: {}", 
message);
-                       }
-               }
-
-               @Override
-               protected UUID getLeaderSessionID() {
-                       return leaderSessionId;
-               }
-
-               private static ActorRef create(
-                               Queue<LookupKvStateLocation> receivedLookups,
-                               UUID leaderSessionId,
-                               Object... lookupResponses) {
-
-                       return testActorSystem.actorOf(Props.create(
-                                       LookupResponseActor.class,
-                                       receivedLookups,
-                                       leaderSessionId,
-                                       lookupResponses));
-               }
-       }
-
-       private static void verifyLookupMsg(
-                       LookupKvStateLocation lookUpMsg,
-                       JobID expectedJobId,
-                       String expectedName) {
-
-               assertNotNull(lookUpMsg);
-               assertEquals(expectedJobId, lookUpMsg.getJobId());
-               assertEquals(expectedName, lookUpMsg.getRegistrationName());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
new file mode 100644
index 0000000..b6f855e
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -0,0 +1,784 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link Client}.
+ */
+public class ClientTest {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
+
+       // Thread pool for client bootstrap (shared between tests)
+       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
+
+       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10L, TimeUnit.SECONDS);
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (NIO_GROUP != null) {
+                       NIO_GROUP.shutdownGracefully();
+               }
+       }
+
+       /**
+        * Tests simple queries, of which half succeed and half fail.
+        */
+       @Test
+       public void testSimpleRequests() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               Client<KvStateInternalRequest, KvStateResponse> client = null;
+               Channel serverChannel = null;
+
+               try {
+                       client = new Client<>("Test Client", 1, serializer, 
stats);
+
+                       // Random result
+                       final byte[] expected = new byte[1024];
+                       ThreadLocalRandom.current().nextBytes(expected);
+
+                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
+                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
+                                       channel.set(ctx.channel());
+                               }
+
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       received.add((ByteBuf) msg);
+                               }
+                       });
+
+                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       long numQueries = 1024L;
+
+                       List<CompletableFuture<KvStateResponse>> futures = new 
ArrayList<>();
+                       for (long i = 0L; i < numQueries; i++) {
+                               KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                               futures.add(client.sendRequest(serverAddress, 
request));
+                       }
+
+                       // Respond to messages
+                       Exception testException = new 
RuntimeException("Expected test Exception");
+
+                       for (long i = 0L; i < numQueries; i++) {
+                               ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                               assertNotNull("Receive timed out", buf);
+
+                               Channel ch = channel.get();
+                               assertNotNull("Channel not active", ch);
+
+                               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+                               long requestId = 
MessageSerializer.getRequestId(buf);
+                               KvStateInternalRequest deserRequest = 
serializer.deserializeRequest(buf);
+
+                               buf.release();
+
+                               if (i % 2L == 0L) {
+                                       ByteBuf response = 
MessageSerializer.serializeResponse(
+                                                       serverChannel.alloc(),
+                                                       requestId,
+                                                       new 
KvStateResponse(expected));
+
+                                       ch.writeAndFlush(response);
+                               } else {
+                                       ByteBuf response = 
MessageSerializer.serializeRequestFailure(
+                                                       serverChannel.alloc(),
+                                                       requestId,
+                                                       testException);
+
+                                       ch.writeAndFlush(response);
+                               }
+                       }
+
+                       for (long i = 0L; i < numQueries; i++) {
+
+                               if (i % 2L == 0L) {
+                                       KvStateResponse serializedResult = 
futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                                       assertArrayEquals(expected, 
serializedResult.getContent());
+                               } else {
+                                       try {
+                                               futures.get((int) 
i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                                               fail("Did not throw expected 
Exception");
+                                       } catch (ExecutionException e) {
+
+                                               if (!(e.getCause() instanceof 
RuntimeException)) {
+                                                       fail("Did not throw 
expected Exception");
+                                               }
+                                               // else expected
+                                       }
+                               }
+                       }
+
+                       assertEquals(numQueries, stats.getNumRequests());
+                       long expectedRequests = numQueries / 2L;
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != expectedRequests ||
+                                       stats.getNumFailed() != 
expectedRequests)) {
+                               Thread.sleep(100L);
+                       }
+
+                       assertEquals(expectedRequests, 
stats.getNumSuccessful());
+                       assertEquals(expectedRequests, stats.getNumFailed());
+               } finally {
+                       if (client != null) {
+                               client.shutdown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests that a request to an unavailable host is failed with 
ConnectException.
+        */
+       @Test
+       public void testRequestUnavailableHost() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               Client<KvStateInternalRequest, KvStateResponse> client = null;
+
+               try {
+                       client = new Client<>("Test Client", 1, serializer, 
stats);
+
+                       int availablePort = NetUtils.getAvailablePort();
+
+                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(
+                                       InetAddress.getLocalHost(),
+                                       availablePort);
+
+                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                       CompletableFuture<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
+
+                       try {
+                               future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                               fail("Did not throw expected ConnectException");
+                       } catch (ExecutionException e) {
+                               if (!(e.getCause() instanceof 
ConnectException)) {
+                                       fail("Did not throw expected 
ConnectException");
+                               }
+                               // else expected
+                       }
+               } finally {
+                       if (client != null) {
+                               client.shutdown();
+                       }
+
+                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Multiple threads concurrently fire queries.
+        */
+       @Test
+       public void testConcurrentQueries() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ExecutorService executor = null;
+               Client<KvStateInternalRequest, KvStateResponse> client = null;
+               Channel serverChannel = null;
+
+               final byte[] serializedResult = new byte[1024];
+               ThreadLocalRandom.current().nextBytes(serializedResult);
+
+               try {
+                       int numQueryTasks = 4;
+                       final int numQueriesPerTask = 1024;
+
+                       executor = Executors.newFixedThreadPool(numQueryTasks);
+
+                       client = new Client<>("Test Client", 1, serializer, 
stats);
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       ByteBuf buf = (ByteBuf) msg;
+                                       assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+                                       long requestId = 
MessageSerializer.getRequestId(buf);
+                                       KvStateInternalRequest request = 
serializer.deserializeRequest(buf);
+
+                                       buf.release();
+
+                                       KvStateResponse response = new 
KvStateResponse(serializedResult);
+                                       ByteBuf serResponse = 
MessageSerializer.serializeResponse(
+                                                       ctx.alloc(),
+                                                       requestId,
+                                                       response);
+
+                                       
ctx.channel().writeAndFlush(serResponse);
+                               }
+                       });
+
+                       final KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       final Client<KvStateInternalRequest, KvStateResponse> 
finalClient = client;
+                       Callable<List<CompletableFuture<KvStateResponse>>> 
queryTask = () -> {
+                               List<CompletableFuture<KvStateResponse>> 
results = new ArrayList<>(numQueriesPerTask);
+
+                               for (int i = 0; i < numQueriesPerTask; i++) {
+                                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                                       
results.add(finalClient.sendRequest(serverAddress, request));
+                               }
+
+                               return results;
+                       };
+
+                       // Submit query tasks
+                       List<Future<List<CompletableFuture<KvStateResponse>>>> 
futures = new ArrayList<>();
+                       for (int i = 0; i < numQueryTasks; i++) {
+                               futures.add(executor.submit(queryTask));
+                       }
+
+                       // Verify results
+                       for (Future<List<CompletableFuture<KvStateResponse>>> 
future : futures) {
+                               List<CompletableFuture<KvStateResponse>> 
results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                               for (CompletableFuture<KvStateResponse> result 
: results) {
+                                       KvStateResponse actual = 
result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                                       assertArrayEquals(serializedResult, 
actual.getContent());
+                               }
+                       }
+
+                       int totalQueries = numQueryTasks * numQueriesPerTask;
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
stats.getNumSuccessful() != totalQueries) {
+                               Thread.sleep(100L);
+                       }
+
+                       assertEquals(totalQueries, stats.getNumRequests());
+                       assertEquals(totalQueries, stats.getNumSuccessful());
+               } finally {
+                       if (executor != null) {
+                               executor.shutdown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       if (client != null) {
+                               client.shutdown();
+                       }
+
+                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests that a server failure closes the connection and removes it from
+        * the established connections.
+        */
+       @Test
+       public void testFailureClosesChannel() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               Client<KvStateInternalRequest, KvStateResponse> client = null;
+               Channel serverChannel = null;
+
+               try {
+                       client = new Client<>("Test Client", 1, serializer, 
stats);
+
+                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
+                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
+                                       channel.set(ctx.channel());
+                               }
+
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       received.add((ByteBuf) msg);
+                               }
+                       });
+
+                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       // Requests
+                       List<Future<KvStateResponse>> futures = new 
ArrayList<>();
+                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+
+                       futures.add(client.sendRequest(serverAddress, request));
+                       futures.add(client.sendRequest(serverAddress, request));
+
+                       ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       assertNotNull("Receive timed out", buf);
+                       buf.release();
+
+                       buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                       assertNotNull("Receive timed out", buf);
+                       buf.release();
+
+                       assertEquals(1L, stats.getNumConnections());
+
+                       Channel ch = channel.get();
+                       assertNotNull("Channel not active", ch);
+
+                       // Respond with failure
+                       
ch.writeAndFlush(MessageSerializer.serializeServerFailure(
+                                       serverChannel.alloc(),
+                                       new RuntimeException("Expected test 
server failure")));
+
+                       try {
+                               
futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                               fail("Did not throw expected server failure");
+                       } catch (ExecutionException e) {
+
+                               if (!(e.getCause() instanceof 
RuntimeException)) {
+                                       fail("Did not throw expected 
Exception");
+                               }
+                               // Expected
+                       }
+
+                       try {
+                               
futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                               fail("Did not throw expected server failure");
+                       } catch (ExecutionException e) {
+
+                               if (!(e.getCause() instanceof 
RuntimeException)) {
+                                       fail("Did not throw expected 
Exception");
+                               }
+                               // Expected
+                       }
+
+                       assertEquals(0L, stats.getNumConnections());
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) {
+                               Thread.sleep(100L);
+                       }
+
+                       assertEquals(2L, stats.getNumRequests());
+                       assertEquals(0L, stats.getNumSuccessful());
+                       assertEquals(2L, stats.getNumFailed());
+               } finally {
+                       if (client != null) {
+                               client.shutdown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests that a server channel close, closes the connection and removes 
it
+        * from the established connections.
+        */
+       @Test
+       public void testServerClosesChannel() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               Client<KvStateInternalRequest, KvStateResponse> client = null;
+               Channel serverChannel = null;
+
+               try {
+                       client = new Client<>("Test Client", 1, serializer, 
stats);
+
+                       final AtomicBoolean received = new AtomicBoolean();
+                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
+                                       channel.set(ctx.channel());
+                               }
+
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       received.set(true);
+                               }
+                       });
+
+                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       // Requests
+                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                       Future<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
+
+                       while (!received.get() && deadline.hasTimeLeft()) {
+                               Thread.sleep(50L);
+                       }
+                       assertTrue("Receive timed out", received.get());
+
+                       assertEquals(1, stats.getNumConnections());
+
+                       
channel.get().close().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+
+                       try {
+                               future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                               fail("Did not throw expected server failure");
+                       } catch (ExecutionException e) {
+                               if (!(e.getCause() instanceof 
ClosedChannelException)) {
+                                       fail("Did not throw expected 
Exception");
+                               }
+                               // Expected
+                       }
+
+                       assertEquals(0L, stats.getNumConnections());
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) {
+                               Thread.sleep(100L);
+                       }
+
+                       assertEquals(1L, stats.getNumRequests());
+                       assertEquals(0L, stats.getNumSuccessful());
+                       assertEquals(1L, stats.getNumFailed());
+               } finally {
+                       if (client != null) {
+                               client.shutdown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests multiple clients querying multiple servers until 100k queries 
have
+        * been processed. At this point, the client is shut down and its 
verified
+        * that all ongoing requests are failed.
+        */
+       @Test
+       public void testClientServerIntegration() throws Exception {
+               // Config
+               final int numServers = 2;
+               final int numServerEventLoopThreads = 2;
+               final int numServerQueryThreads = 2;
+
+               final int numClientEventLoopThreads = 4;
+               final int numClientsTasks = 8;
+
+               final int batchSize = 16;
+
+               final int numKeyGroups = 1;
+
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               KvStateRegistry dummyRegistry = new KvStateRegistry();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(dummyRegistry);
+
+               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
+
+               final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
+
+               AtomicKvStateRequestStats clientStats = new 
AtomicKvStateRequestStats();
+
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               Client<KvStateInternalRequest, KvStateResponse> client = null;
+               ExecutorService clientTaskExecutor = null;
+               final KvStateServerImpl[] server = new 
KvStateServerImpl[numServers];
+
+               try {
+                       client = new Client<>("Test Client", 
numClientEventLoopThreads, serializer, clientStats);
+                       clientTaskExecutor = 
Executors.newFixedThreadPool(numClientsTasks);
+
+                       // Create state
+                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+                       desc.setQueryable("any");
+
+                       // Create servers
+                       KvStateRegistry[] registry = new 
KvStateRegistry[numServers];
+                       AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
+                       final KvStateID[] ids = new KvStateID[numServers];
+
+                       for (int i = 0; i < numServers; i++) {
+                               registry[i] = new KvStateRegistry();
+                               serverStats[i] = new 
AtomicKvStateRequestStats();
+                               server[i] = new KvStateServerImpl(
+                                               InetAddress.getLocalHost(),
+                                               0,
+                                               numServerEventLoopThreads,
+                                               numServerQueryThreads,
+                                               registry[i],
+                                               serverStats[i]);
+
+                               server[i].start();
+
+                               backend.setCurrentKey(1010 + i);
+
+                               // Value per server
+                               ValueState<Integer> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE,
+                                               
VoidNamespaceSerializer.INSTANCE,
+                                               desc);
+
+                               state.update(201 + i);
+
+                               // we know it must be a KvStat but this is not 
exposed to the user via State
+                               InternalKvState<?> kvState = 
(InternalKvState<?>) state;
+
+                               // Register KvState (one state instance for all 
server)
+                               ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+                       }
+
+                       final Client<KvStateInternalRequest, KvStateResponse> 
finalClient = client;
+                       Callable<Void> queryTask = () -> {
+                               while (true) {
+                                       if (Thread.interrupted()) {
+                                               throw new 
InterruptedException();
+                                       }
+
+                                       // Random server permutation
+                                       List<Integer> random = new 
ArrayList<>();
+                                       for (int j = 0; j < batchSize; j++) {
+                                               random.add(j);
+                                       }
+                                       Collections.shuffle(random);
+
+                                       // Dispatch queries
+                                       List<Future<KvStateResponse>> futures = 
new ArrayList<>(batchSize);
+
+                                       for (int j = 0; j < batchSize; j++) {
+                                               int targetServer = 
random.get(j) % numServers;
+
+                                               byte[] 
serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+                                                               1010 + 
targetServer,
+                                                               
IntSerializer.INSTANCE,
+                                                               
VoidNamespace.INSTANCE,
+                                                               
VoidNamespaceSerializer.INSTANCE);
+
+                                               KvStateInternalRequest request 
= new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
+                                               
futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), 
request));
+                                       }
+
+                                       // Verify results
+                                       for (int j = 0; j < batchSize; j++) {
+                                               int targetServer = 
random.get(j) % numServers;
+
+                                               Future<KvStateResponse> future 
= futures.get(j);
+                                               byte[] buf = 
future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).getContent();
+                                               int value = 
KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
+                                               assertEquals(201L + 
targetServer, value);
+                                       }
+                               }
+                       };
+
+                       // Submit tasks
+                       List<Future<Void>> taskFutures = new ArrayList<>();
+                       for (int i = 0; i < numClientsTasks; i++) {
+                               
taskFutures.add(clientTaskExecutor.submit(queryTask));
+                       }
+
+                       long numRequests;
+                       while ((numRequests = clientStats.getNumRequests()) < 
100_000L) {
+                               Thread.sleep(100L);
+                               LOG.info("Number of requests {}/100_000", 
numRequests);
+                       }
+
+                       // Shut down
+                       client.shutdown();
+
+                       for (Future<Void> future : taskFutures) {
+                               try {
+                                       future.get();
+                                       fail("Did not throw expected Exception 
after shut down");
+                               } catch (ExecutionException t) {
+                                       if (t.getCause().getCause() instanceof 
ClosedChannelException ||
+                                                       t.getCause().getCause() 
instanceof IllegalStateException) {
+                                               // Expected
+                                       } else {
+                                               t.printStackTrace();
+                                               fail("Failed with unexpected 
Exception type: " + t.getClass().getName());
+                                       }
+                               }
+                       }
+
+                       assertEquals("Connection leak (client)", 0L, 
clientStats.getNumConnections());
+                       for (int i = 0; i < numServers; i++) {
+                               boolean success = false;
+                               int numRetries = 0;
+                               while (!success) {
+                                       try {
+                                               assertEquals("Connection leak 
(server)", 0L, serverStats[i].getNumConnections());
+                                               success = true;
+                                       } catch (Throwable t) {
+                                               if (numRetries < 10) {
+                                                       LOG.info("Retrying 
connection leak check (server)");
+                                                       
Thread.sleep((numRetries + 1) * 50L);
+                                                       numRetries++;
+                                               } else {
+                                                       throw t;
+                                               }
+                                       }
+                               }
+                       }
+               } finally {
+                       if (client != null) {
+                               client.shutdown();
+                       }
+
+                       for (int i = 0; i < numServers; i++) {
+                               if (server[i] != null) {
+                                       server[i].shutdown();
+                               }
+                       }
+
+                       if (clientTaskExecutor != null) {
+                               clientTaskExecutor.shutdown();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private Channel createServerChannel(final ChannelHandler... handlers) 
throws UnknownHostException, InterruptedException {
+               ServerBootstrap bootstrap = new ServerBootstrap()
+                               // Bind address and port
+                               .localAddress(InetAddress.getLocalHost(), 0)
+                               // NIO server channels
+                               .group(NIO_GROUP)
+                               .channel(NioServerSocketChannel.class)
+                               // See initializer for pipeline details
+                               .childHandler(new 
ChannelInitializer<SocketChannel>() {
+                                       @Override
+                                       protected void 
initChannel(SocketChannel ch) throws Exception {
+                                               ch.pipeline()
+                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+                                                               
.addLast(handlers);
+                                       }
+                               });
+
+               return bootstrap.bind().sync().channel();
+       }
+
+       private KvStateServerAddress getKvStateServerAddress(Channel 
serverChannel) {
+               InetSocketAddress localAddress = (InetSocketAddress) 
serverChannel.localAddress();
+
+               return new KvStateServerAddress(localAddress.getAddress(), 
localAddress.getPort());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
index 0b97bda..cb490aa 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.queryablestate.network;
 
-import org.apache.flink.queryablestate.client.KvStateClientHandler;
-import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 /**
- * Tests for {@link KvStateClientHandler}.
+ * Tests for {@link ClientHandler}.
  */
 public class KvStateClientHandlerTest {
 
@@ -47,28 +47,30 @@ public class KvStateClientHandlerTest {
         */
        @Test
        public void testReadCallbacksAndBufferRecycling() throws Exception {
-               KvStateClientHandlerCallback callback = 
mock(KvStateClientHandlerCallback.class);
+               final ClientHandlerCallback<KvStateResponse> callback = 
mock(ClientHandlerCallback.class);
 
-               EmbeddedChannel channel = new EmbeddedChannel(new 
KvStateClientHandler(callback));
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+               final EmbeddedChannel channel = new EmbeddedChannel(new 
ClientHandler<>("Test Client", serializer, callback));
+
+               final byte[] content = new byte[0];
+               final KvStateResponse response = new KvStateResponse(content);
 
                //
                // Request success
                //
-               ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
-                               channel.alloc(),
-                               1222112277,
-                               new byte[0]);
+               ByteBuf buf = 
MessageSerializer.serializeResponse(channel.alloc(), 1222112277L, response);
                buf.skipBytes(4); // skip frame length
 
                // Verify callback
                channel.writeInbound(buf);
-               verify(callback, times(1)).onRequestResult(eq(1222112277L), 
any(byte[].class));
+               verify(callback, times(1)).onRequestResult(eq(1222112277L), 
any(KvStateResponse.class));
                assertEquals("Buffer not recycled", 0, buf.refCnt());
 
                //
                // Request failure
                //
-               buf = MessageSerializer.serializeKvStateRequestFailure(
+               buf = MessageSerializer.serializeRequestFailure(
                                channel.alloc(),
                                1222112278,
                                new RuntimeException("Expected test 
Exception"));

Reply via email to