http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..6df77c0
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -0,0 +1,1502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.QueryableStateStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.function.Supplier;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for queryable state integration tests with a configurable state 
backend.
+ */
+public abstract class AbstractQueryableStateTestBase extends TestLogger {
+
+       private static final int NO_OF_RETRIES = 100;
+       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10000L, TimeUnit.SECONDS);
+       private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
+
+       private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(4);
+       private final ScheduledExecutor executor = new 
ScheduledExecutorServiceAdapter(executorService);
+
+       /**
+        * State backend to use.
+        */
+       protected AbstractStateBackend stateBackend;
+
+       /**
+        * Shared between all the test. Make sure to have at least NUM_SLOTS
+        * available after your test finishes, e.g. cancel the job you 
submitted.
+        */
+       protected static FlinkMiniCluster cluster;
+
+       /**
+        * Client shared between all the test.
+        */
+       protected static QueryableStateClient client;
+
+       protected static int maxParallelism;
+
+       @Before
+       public void setUp() throws Exception {
+               // NOTE: do not use a shared instance for all tests as the 
tests may brake
+               this.stateBackend = createStateBackend();
+
+               Assert.assertNotNull(cluster);
+
+               maxParallelism = 
cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
1) *
+                               
cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
1);
+       }
+
+       /**
+        * Creates a state backend instance which is used in the {@link 
#setUp()} method before each
+        * test case.
+        *
+        * @return a state backend instance for each unit test
+        */
+       protected abstract AbstractStateBackend createStateBackend() throws 
Exception;
+
+       /**
+        * Runs a simple topology producing random (key, 1) pairs at the 
sources (where
+        * number of keys is in fixed in range 0...numKeys). The records are 
keyed and
+        * a reducing queryable state instance is created, which sums up the 
records.
+        *
+        * <p>After submitting the job in detached mode, the 
QueryableStateCLient is used
+        * to query the counts of each key in rounds until all keys have 
non-zero counts.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testQueryableState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final int numKeys = 256;
+
+               JobID jobId = null;
+
+               try {
+                       //
+                       // Test program
+                       //
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestKeyRangeSource(numKeys));
+
+                       // Reducing state
+                       ReducingStateDescriptor<Tuple2<Integer, Long>> 
reducingState = new ReducingStateDescriptor<>(
+                                       "any-name",
+                                       new SumReduce(),
+                                       source.getType());
+
+                       final String queryName = "hakuna-matata";
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
7143749578983540352L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).asQueryableState(queryName, reducingState);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       cluster.submitJobDetached(jobGraph);
+
+                       //
+                       // Start querying
+                       //
+                       jobId = jobGraph.getJobID();
+
+                       final AtomicLongArray counts = new 
AtomicLongArray(numKeys);
+
+                       boolean allNonZero = false;
+                       while (!allNonZero && deadline.hasTimeLeft()) {
+                               allNonZero = true;
+
+                               final 
List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new 
ArrayList<>(numKeys);
+
+                               for (int i = 0; i < numKeys; i++) {
+                                       final int key = i;
+
+                                       if (counts.get(key) > 0L) {
+                                               // Skip this one
+                                               continue;
+                                       } else {
+                                               allNonZero = false;
+                                       }
+
+                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = 
getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       queryName,
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       reducingState,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       result.thenAccept(response -> {
+                                               try {
+                                                       Tuple2<Integer, Long> 
res = response.get();
+                                                       counts.set(key, res.f1);
+                                                       assertEquals("Key 
mismatch", key, res.f0.intValue());
+                                               } catch (Exception e) {
+                                                       
Assert.fail(e.getMessage());
+                                               }
+                                       });
+
+                                       futures.add(result);
+                               }
+
+                               // wait for all the futures to complete
+                               CompletableFuture
+                                               .allOf(futures.toArray(new 
CompletableFuture<?>[futures.size()]))
+                                               
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+
+                       assertTrue("Not all keys are non-zero", allNonZero);
+
+                       // All should be non-zero
+                       for (int i = 0; i < numKeys; i++) {
+                               long count = counts.get(i);
+                               assertTrue("Count at position " + i + " is " + 
count, count > 0);
+                       }
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Tests that duplicate query registrations fail the job at the 
JobManager.
+        *
+        * <b>NOTE: </b> This test is only in the non-HA variant of the tests 
because
+        * in the HA mode we use the actual JM code which does not recognize the
+        * {@code NotifyWhenJobStatus} message.  *
+        */
+       @Test
+       public void testDuplicateRegistrationFailsJob() throws Exception {
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final int numKeys = 256;
+
+               JobID jobId = null;
+
+               try {
+                       //
+                       // Test program
+                       //
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestKeyRangeSource(numKeys));
+
+                       // Reducing state
+                       ReducingStateDescriptor<Tuple2<Integer, Long>> 
reducingState = new ReducingStateDescriptor<>(
+                                       "any-name",
+                                       new SumReduce(),
+                                       source.getType());
+
+                       final String queryName = "duplicate-me";
+
+                       final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> queryableState =
+                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
+                                               private static final long 
serialVersionUID = -4126824763829132959L;
+
+                                               @Override
+                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
+                                                       return value.f0;
+                                               }
+                                       }).asQueryableState(queryName, 
reducingState);
+
+                       final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> duplicate =
+                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
+                                               private static final long 
serialVersionUID = -6265024000462809436L;
+
+                                               @Override
+                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
+                                                       return value.f0;
+                                               }
+                                       }).asQueryableState(queryName);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       
CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = 
FutureUtils.toJava(
+                                       
cluster.getLeaderGateway(deadline.timeLeft())
+                                                       .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), 
deadline.timeLeft())
+                                                       
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       TestingJobManagerMessages.JobStatusIs jobStatus =
+                                       
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       assertEquals(JobStatus.FAILED, jobStatus.state());
+
+                       // Get the job and check the cause
+                       JobManagerMessages.JobFound jobFound = 
FutureUtils.toJava(
+                                       
cluster.getLeaderGateway(deadline.timeLeft())
+                                                       .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
+                                                       
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
+                                       .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+
+                       String failureCause = 
jobFound.executionGraph().getFailureCause().getExceptionAsString();
+
+                       assertTrue("Not instance of SuppressRestartsException", 
failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+                       int causedByIndex = failureCause.indexOf("Caused by: ");
+                       String subFailureCause = 
failureCause.substring(causedByIndex + "Caused by: ".length());
+                       assertTrue("Not caused by IllegalStateException", 
subFailureCause.startsWith("java.lang.IllegalStateException"));
+                       assertTrue("Exception does not contain registration 
name", subFailureCause.contains(queryName));
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               scala.concurrent.Future<CancellationSuccess> 
cancellation = cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
+
+                               Await.ready(cancellation, deadline.timeLeft());
+                       }
+               }
+       }
+
+       /**
+        * Tests simple value state queryable state instance. Each source emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The tests succeeds after each subtask index is queried with
+        * value numElements (the latest element updated the state).
+        */
+       @Test
+       public void testValueState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       // Value state
+                       ValueStateDescriptor<Tuple2<Integer, Long>> valueState 
= new ValueStateDescriptor<>(
+                                       "any",
+                                       source.getType());
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
7662520075515707428L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).asQueryableState("hakuna", valueState);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       executeValueQuery(deadline, client, jobId, "hakuna", 
valueState, numElements);
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Similar tests as {@link #testValueState()} but before submitting the
+        * job, we already issue one request which fails.
+        */
+       @Test
+       public void testQueryNonStartedJobState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                               .addSource(new 
TestAscendingValueSource(numElements));
+
+                       // Value state
+                       ValueStateDescriptor<Tuple2<Integer, Long>> valueState 
= new ValueStateDescriptor<>(
+                               "any",
+                               source.getType(),
+                               null);
+
+                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
+                               source.keyBy(new KeySelector<Tuple2<Integer, 
Long>, Integer>() {
+                                       private static final long 
serialVersionUID = 7480503339992214681L;
+
+                                       @Override
+                                       public Integer getKey(Tuple2<Integer, 
Long> value) throws Exception {
+                                               return value.f0;
+                                       }
+                               }).asQueryableState("hakuna", valueState);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       // Now query
+                       long expected = numElements;
+
+                       // query once
+                       client.getKvState(
+                                       jobId,
+                                       queryableState.getQueryableStateName(),
+                                       0,
+                                       VoidNamespace.INSTANCE,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       VoidNamespaceTypeInfo.INSTANCE,
+                                       valueState);
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       executeValueQuery(deadline, client, jobId, "hakuna", 
valueState, expected);
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Tests simple value state queryable state instance with a default 
value
+        * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
+        * tuples, the key is mapped to 1 but key 0 is queried which should 
throw
+        * a {@link UnknownKeyOrNamespaceException} exception.
+        *
+        * @throws UnknownKeyOrNamespaceException thrown due querying a 
non-existent key
+        */
+       @Test(expected = UnknownKeyOrNamespaceException.class)
+       public void testValueStateDefault() throws Throwable {
+
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env =
+                               
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       env.setRestartStrategy(RestartStrategies
+                               .fixedDelayRestart(Integer.MAX_VALUE, 1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                               .addSource(new 
TestAscendingValueSource(numElements));
+
+                       // Value state
+                       ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+                               new ValueStateDescriptor<>(
+                                       "any",
+                                       source.getType(),
+                                       Tuple2.of(0, 1337L));
+
+                       // only expose key "1"
+                       QueryableStateStream<Integer, Tuple2<Integer, Long>>
+                               queryableState =
+                               source.keyBy(
+                                       new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                                               private static final long 
serialVersionUID = 4509274556892655887L;
+
+                                               @Override
+                                               public Integer getKey(
+                                                       Tuple2<Integer, Long> 
value) throws
+                                                       Exception {
+                                                       return 1;
+                                               }
+                                       }).asQueryableState("hakuna", 
valueState);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+                       int key = 0;
+                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
future = getKvStateWithRetries(
+                                       client,
+                                       jobId,
+                                       queryableState.getQueryableStateName(),
+                                       key,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       valueState,
+                                       QUERY_RETRY_DELAY,
+                                       true,
+                                       executor);
+
+                       try {
+                               future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                       } catch (ExecutionException | CompletionException e) {
+                               // get() on a completedExceptionally future 
wraps the
+                               // exception in an ExecutionException.
+                               throw e.getCause();
+                       }
+               } finally {
+
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Tests simple value state queryable state instance. Each source emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The tests succeeds after each subtask index is queried with
+        * value numElements (the latest element updated the state).
+        *
+        * <p>This is the same as the simple value state test, but uses the API 
shortcut.
+        */
+       @Test
+       public void testValueStateShortcut() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       // Value state shortcut
+                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
+                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
+                                               private static final long 
serialVersionUID = 9168901838808830068L;
+
+                                               @Override
+                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
+                                                       return value.f0;
+                                               }
+                                       }).asQueryableState("matata");
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       final ValueStateDescriptor<Tuple2<Integer, Long>> 
stateDesc =
+                                       (ValueStateDescriptor<Tuple2<Integer, 
Long>>) queryableState.getStateDescriptor();
+                       executeValueQuery(deadline, client, jobId, "matata", 
stateDesc, numElements);
+               } finally {
+
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(
+                                               
cluster.getLeaderGateway(deadline.timeLeft())
+                                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Tests simple folding state queryable state instance. Each source 
emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The folding state sums these up and maps them to Strings. 
The
+        * test succeeds after each subtask index is queried with result 
n*(n+1)/2
+        * (as a String).
+        */
+       @Test
+       public void testFoldingState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final int numElements = 1024;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       // Folding state
+                       FoldingStateDescriptor<Tuple2<Integer, Long>, String> 
foldingState =
+                                       new FoldingStateDescriptor<>(
+                                                       "any",
+                                                       "0",
+                                                       new SumFold(),
+                                                       
StringSerializer.INSTANCE);
+
+                       QueryableStateStream<Integer, String> queryableState =
+                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
+                                               private static final long 
serialVersionUID = -842809958106747539L;
+
+                                               @Override
+                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
+                                                       return value.f0;
+                                               }
+                                       }).asQueryableState("pumba", 
foldingState);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+                       String expected = Integer.toString(numElements * 
(numElements + 1) / 2);
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       
CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = 
getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "pumba",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       foldingState,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       String value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       //assertEquals("Key mismatch", key, 
value.f0.intValue());
+                                       if (expected.equals(value)) {
+                                               success = true;
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Tests simple reducing state queryable state instance. Each source 
emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The reducing state instance sums these up. The test succeeds
+        * after each subtask index is queried with result n*(n+1)/2.
+        */
+       @Test
+       public void testReducingState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       // Reducing state
+                       ReducingStateDescriptor<Tuple2<Integer, Long>> 
reducingState =
+                                       new ReducingStateDescriptor<>(
+                                                       "any",
+                                                       new SumReduce(),
+                                                       source.getType());
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).asQueryableState("jungle", reducingState);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+                       long expected = numElements * (numElements + 1L) / 2L;
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = 
getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "jungle",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       reducingState,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       assertEquals("Key mismatch", key, 
value.f0.intValue());
+                                       if (expected == value.f1) {
+                                               success = true;
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Tests simple map state queryable state instance. Each source emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The map state instance sums the values up. The test succeeds
+        * after each subtask index is queried with result n*(n+1)/2.
+        */
+       @Test
+       public void testMapState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       final MapStateDescriptor<Integer, Tuple2<Integer, 
Long>> mapStateDescriptor = new MapStateDescriptor<>(
+                                       "timon",
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       source.getType());
+                       mapStateDescriptor.setQueryable("timon-queryable");
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).process(new ProcessFunction<Tuple2<Integer, Long>, 
Object>() {
+                               private static final long serialVersionUID = 
-805125545438296619L;
+
+                               private transient MapState<Integer, 
Tuple2<Integer, Long>> mapState;
+
+                               @Override
+                               public void open(Configuration parameters) 
throws Exception {
+                                       super.open(parameters);
+                                       mapState = 
getRuntimeContext().getMapState(mapStateDescriptor);
+                               }
+
+                               @Override
+                               public void processElement(Tuple2<Integer, 
Long> value, Context ctx, Collector<Object> out) throws Exception {
+                                       Tuple2<Integer, Long> v = 
mapState.get(value.f0);
+                                       if (v == null) {
+                                               v = new Tuple2<>(value.f0, 0L);
+                                       }
+                                       mapState.put(value.f0, new 
Tuple2<>(v.f0, v.f1 + value.f1));
+                               }
+                       });
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+                       long expected = numElements * (numElements + 1L) / 2L;
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       CompletableFuture<MapState<Integer, 
Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "timon-queryable",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       mapStateDescriptor,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key);
+                                       assertEquals("Key mismatch", key, 
value.f0.intValue());
+                                       if (expected == value.f1) {
+                                               success = true;
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
+        * Tests simple list state queryable state instance. Each source emits
+        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
+        * queried. The list state instance add the values to the list. The test
+        * succeeds after each subtask index is queried and the list contains
+        * the correct number of distinct elements.
+        */
+       @Test
+       public void testListState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       final ListStateDescriptor<Long> listStateDescriptor = 
new ListStateDescriptor<Long>(
+                                       "list",
+                                       BasicTypeInfo.LONG_TYPE_INFO);
+                       listStateDescriptor.setQueryable("list-queryable");
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).process(new ProcessFunction<Tuple2<Integer, Long>, 
Object>() {
+                               private static final long serialVersionUID = 
-805125545438296619L;
+
+                               private transient ListState<Long> listState;
+
+                               @Override
+                               public void open(Configuration parameters) 
throws Exception {
+                                       super.open(parameters);
+                                       listState = 
getRuntimeContext().getListState(listStateDescriptor);
+                               }
+
+                               @Override
+                               public void processElement(Tuple2<Integer, 
Long> value, Context ctx, Collector<Object> out) throws Exception {
+                                       listState.add(value.f1);
+                               }
+                       });
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+
+                       Map<Integer, Set<Long>> results = new HashMap<>();
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       CompletableFuture<ListState<Long>> 
future = getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "list-queryable",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       listStateDescriptor,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       Iterable<Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       Set<Long> res = new HashSet<>();
+                                       for (Long v: value) {
+                                               res.add(v);
+                                       }
+
+                                       // the source starts at 0, so +1
+                                       if (res.size() == numElements + 1L) {
+                                               success = true;
+                                               results.put(key, res);
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               Set<Long> values = results.get(key);
+                               for (long i = 0L; i <= numElements; i++) {
+                                       assertTrue(values.contains(i));
+                               }
+                       }
+
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       @Test
+       public void testAggregatingState() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       final AggregatingStateDescriptor<Tuple2<Integer, Long>, 
MutableString, String> aggrStateDescriptor =
+                                       new AggregatingStateDescriptor<>(
+                                                       "aggregates",
+                                                       new SumAggr(),
+                                                       MutableString.class);
+                       aggrStateDescriptor.setQueryable("aggr-queryable");
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
8470749712274833552L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).transform(
+                                       "TestAggregatingOperator",
+                                       BasicTypeInfo.STRING_TYPE_INFO,
+                                       new 
AggregatingTestOperator(aggrStateDescriptor)
+                       );
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // Now query
+
+                       for (int key = 0; key < maxParallelism; key++) {
+                               boolean success = false;
+                               while (deadline.hasTimeLeft() && !success) {
+                                       
CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = 
getKvStateWithRetries(
+                                                       client,
+                                                       jobId,
+                                                       "aggr-queryable",
+                                                       key,
+                                                       
BasicTypeInfo.INT_TYPE_INFO,
+                                                       aggrStateDescriptor,
+                                                       QUERY_RETRY_DELAY,
+                                                       false,
+                                                       executor);
+
+                                       String value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get();
+
+                                       if (Long.parseLong(value) == 
numElements * (numElements + 1L) / 2L) {
+                                               success = true;
+                                       } else {
+                                               // Retry
+                                               Thread.sleep(50L);
+                                       }
+                               }
+
+                               assertTrue("Did not succeed query", success);
+                       }
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /////                           Sources/UDFs Used in the Tests          
        //////
+
+       /**
+        * Test source producing (key, 0)..(key, maxValue) with key being the 
sub
+        * task index.
+        *
+        * <p>After all tuples have been emitted, the source waits to be 
cancelled
+        * and does not immediately finish.
+        */
+       private static class TestAscendingValueSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>> {
+
+               private static final long serialVersionUID = 
1459935229498173245L;
+
+               private final long maxValue;
+               private volatile boolean isRunning = true;
+
+               TestAscendingValueSource(long maxValue) {
+                       Preconditions.checkArgument(maxValue >= 0);
+                       this.maxValue = maxValue;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Long>> ctx) 
throws Exception {
+                       // f0 => key
+                       int key = getRuntimeContext().getIndexOfThisSubtask();
+                       Tuple2<Integer, Long> record = new Tuple2<>(key, 0L);
+
+                       long currentValue = 0;
+                       while (isRunning && currentValue <= maxValue) {
+                               synchronized (ctx.getCheckpointLock()) {
+                                       record.f1 = currentValue;
+                                       ctx.collect(record);
+                               }
+
+                               currentValue++;
+                       }
+
+                       while (isRunning) {
+                               synchronized (this) {
+                                       wait();
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+
+                       synchronized (this) {
+                               notifyAll();
+                       }
+               }
+
+       }
+
+       /**
+        * Test source producing (key, 1) tuples with random key in key range 
(numKeys).
+        */
+       private static class TestKeyRangeSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener 
{
+
+               private static final long serialVersionUID = 
-5744725196953582710L;
+
+               private static final AtomicLong LATEST_CHECKPOINT_ID = new 
AtomicLong();
+               private final int numKeys;
+               private final ThreadLocalRandom random = 
ThreadLocalRandom.current();
+               private volatile boolean isRunning = true;
+
+               TestKeyRangeSource(int numKeys) {
+                       this.numKeys = numKeys;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+                               LATEST_CHECKPOINT_ID.set(0L);
+                       }
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Long>> ctx) 
throws Exception {
+                       // f0 => key
+                       Tuple2<Integer, Long> record = new Tuple2<>(0, 1L);
+
+                       while (isRunning) {
+                               synchronized (ctx.getCheckpointLock()) {
+                                       record.f0 = random.nextInt(numKeys);
+                                       ctx.collect(record);
+                               }
+                               // mild slow down
+                               Thread.sleep(1L);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+                               LATEST_CHECKPOINT_ID.set(checkpointId);
+                       }
+               }
+       }
+
+       /**
+        * An operator that uses {@link AggregatingState}.
+        *
+        * <p>The operator exists for lack of possibility to get an
+        * {@link AggregatingState} from the {@link 
org.apache.flink.api.common.functions.RuntimeContext}.
+        * If this were not the case, we could have a {@link ProcessFunction}.
+        */
+       private static class AggregatingTestOperator
+                       extends AbstractStreamOperator<String>
+                       implements OneInputStreamOperator<Tuple2<Integer, 
Long>, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final AggregatingStateDescriptor<Tuple2<Integer, Long>, 
MutableString, String> stateDescriptor;
+               private transient AggregatingState<Tuple2<Integer, Long>, 
String> state;
+
+               
AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, 
MutableString, String> stateDesc) {
+                       this.stateDescriptor = stateDesc;
+               }
+
+               @Override
+               public void open() throws Exception {
+                       super.open();
+                       this.state = getKeyedStateBackend().getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       stateDescriptor);
+               }
+
+               @Override
+               public void processElement(StreamRecord<Tuple2<Integer, Long>> 
element) throws Exception {
+                       state.add(element.getValue());
+               }
+       }
+
+       /**
+        * Test {@link AggregateFunction} concatenating the already stored 
string with the long passed as argument.
+        */
+       private static class SumAggr implements 
AggregateFunction<Tuple2<Integer, Long>, MutableString, String> {
+
+               private static final long serialVersionUID = 
-6249227626701264599L;
+
+               @Override
+               public MutableString createAccumulator() {
+                       return new MutableString();
+               }
+
+               @Override
+               public void add(Tuple2<Integer, Long> value, MutableString 
accumulator) {
+                       long acc = Long.valueOf(accumulator.value);
+                       acc += value.f1;
+                       accumulator.value = Long.toString(acc);
+               }
+
+               @Override
+               public String getResult(MutableString accumulator) {
+                       return accumulator.value;
+               }
+
+               @Override
+               public MutableString merge(MutableString a, MutableString b) {
+                       MutableString nValue = new MutableString();
+                       nValue.value = Long.toString(Long.valueOf(a.value) + 
Long.valueOf(b.value));
+                       return nValue;
+               }
+       }
+
+       private static final class MutableString {
+               String value = "0";
+       }
+
+       /**
+        * Test {@link FoldFunction} concatenating the already stored string 
with the long passed as argument.
+        */
+       private static class SumFold implements FoldFunction<Tuple2<Integer, 
Long>, String> {
+               private static final long serialVersionUID = 
-6249227626701264599L;
+
+               @Override
+               public String fold(String accumulator, Tuple2<Integer, Long> 
value) throws Exception {
+                       long acc = Long.valueOf(accumulator);
+                       acc += value.f1;
+                       return Long.toString(acc);
+               }
+       }
+
+       /**
+        * Test {@link ReduceFunction} summing up its two arguments.
+        */
+       protected static class SumReduce implements 
ReduceFunction<Tuple2<Integer, Long>> {
+               private static final long serialVersionUID = 
-8651235077342052336L;
+
+               @Override
+               public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> 
value1, Tuple2<Integer, Long> value2) throws Exception {
+                       value1.f1 += value2.f1;
+                       return value1;
+               }
+       }
+
+       /////                           General Utility Methods                 
        //////
+
+       private static <K, S extends State, V> CompletableFuture<S> 
getKvStateWithRetries(
+                       final QueryableStateClient client,
+                       final JobID jobId,
+                       final String queryName,
+                       final K key,
+                       final TypeInformation<K> keyTypeInfo,
+                       final StateDescriptor<S, V> stateDescriptor,
+                       final Time retryDelay,
+                       final boolean failForUnknownKeyOrNamespace,
+                       final ScheduledExecutor executor) {
+               return retryWithDelay(
+                               () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor),
+                               NO_OF_RETRIES,
+                               retryDelay,
+                               executor,
+                               failForUnknownKeyOrNamespace);
+       }
+
+       private static <T> CompletableFuture<T> retryWithDelay(
+                       final Supplier<CompletableFuture<T>> operation,
+                       final int retries,
+                       final Time retryDelay,
+                       final ScheduledExecutor scheduledExecutor,
+                       final boolean failIfUnknownKeyOrNamespace) {
+
+               final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
+
+               retryWithDelay(
+                               resultFuture,
+                               operation,
+                               retries,
+                               retryDelay,
+                               scheduledExecutor,
+                               failIfUnknownKeyOrNamespace);
+
+               return resultFuture;
+       }
+
+       public static <T> void retryWithDelay(
+                       final CompletableFuture<T> resultFuture,
+                       final Supplier<CompletableFuture<T>> operation,
+                       final int retries,
+                       final Time retryDelay,
+                       final ScheduledExecutor scheduledExecutor,
+                       final boolean failIfUnknownKeyOrNamespace) {
+
+               if (!resultFuture.isDone()) {
+                       final CompletableFuture<T> operationResultFuture = 
operation.get();
+                       operationResultFuture.whenCompleteAsync(
+                                       (t, throwable) -> {
+                                               if (throwable != null) {
+                                                       if 
(throwable.getCause() instanceof CancellationException) {
+                                                               
resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation 
future was cancelled.", throwable.getCause()));
+                                                       } else if 
(throwable.getCause() instanceof AssertionError ||
+                                                                       
(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)) {
+                                                               
resultFuture.completeExceptionally(throwable.getCause());
+                                                       } else {
+                                                               if (retries > 
0) {
+                                                                       final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+                                                                               
        () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, 
scheduledExecutor, failIfUnknownKeyOrNamespace),
+                                                                               
        retryDelay.toMilliseconds(),
+                                                                               
        TimeUnit.MILLISECONDS);
+
+                                                                       
resultFuture.whenComplete(
+                                                                               
        (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+                                                               } else {
+                                                                       
resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not 
complete the operation. Number of retries " +
+                                                                               
        "has been exhausted.", throwable));
+                                                               }
+                                                       }
+                                               } else {
+                                                       
resultFuture.complete(t);
+                                               }
+                                       },
+                                       scheduledExecutor);
+
+                       resultFuture.whenComplete(
+                                       (t, throwable) -> 
operationResultFuture.cancel(false));
+               }
+       }
+
+       /**
+        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
+        * <tt>expected</tt> equals the value of the result tuple's second 
field.
+        */
+       private void executeValueQuery(
+                       final Deadline deadline,
+                       final QueryableStateClient client,
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final ValueStateDescriptor<Tuple2<Integer, Long>> 
stateDescriptor,
+                       final long expected) throws Exception {
+
+               for (int key = 0; key < maxParallelism; key++) {
+                       boolean success = false;
+                       while (deadline.hasTimeLeft() && !success) {
+                               CompletableFuture<ValueState<Tuple2<Integer, 
Long>>> future = getKvStateWithRetries(
+                                               client,
+                                               jobId,
+                                               queryableStateName,
+                                               key,
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               stateDescriptor,
+                                               QUERY_RETRY_DELAY,
+                                               false,
+                                               executor);
+
+                               Tuple2<Integer, Long> value = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value();
+
+                               assertEquals("Key mismatch", key, 
value.f0.intValue());
+                               if (expected == value.f1) {
+                                       success = true;
+                               } else {
+                                       // Retry
+                                       Thread.sleep(50L);
+                               }
+                       }
+
+                       assertTrue("Did not succeed query", success);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
deleted file mode 100644
index a90b956..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the NON-HA mode.
- */
-public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
-
-       private static final int NUM_JMS = 2;
-       private static final int NUM_TMS = 2;
-       private static final int NUM_SLOTS_PER_TM = 4;
-
-       private static TestingServer zkServer;
-       private static TemporaryFolder temporaryFolder;
-
-       @BeforeClass
-       public static void setup() {
-               try {
-                       zkServer = new TestingServer();
-                       temporaryFolder = new TemporaryFolder();
-                       temporaryFolder.create();
-
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
-                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
-                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
-                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + 
NUM_TMS));
-                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + 
NUM_TMS));
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
-                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
-
-                       cluster = new TestingCluster(config, false);
-                       cluster.start();
-
-                       // verify that we are in HA mode
-                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.ZOOKEEPER);
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() {
-               if (cluster != null) {
-                       cluster.stop();
-                       cluster.awaitTermination();
-               }
-
-               try {
-                       zkServer.stop();
-                       zkServer.close();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-
-               temporaryFolder.delete();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..ab75cf4
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateTestBase extends 
AbstractQueryableStateTestBase {
+
+       private static final int NUM_JMS = 2;
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 4;
+
+       private static TestingServer zkServer;
+       private static TemporaryFolder temporaryFolder;
+
+       public static void setup(int proxyPortRangeStart, int 
serverPortRangeStart) {
+               try {
+                       zkServer = new TestingServer();
+                       temporaryFolder = new TemporaryFolder();
+                       temporaryFolder.create();
+
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
+                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + 
"-" + (proxyPortRangeStart + NUM_TMS));
+                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart 
+ "-" + (serverPortRangeStart + NUM_TMS));
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+
+                       cluster = new TestingCluster(config, false);
+                       cluster.start();
+
+                       client = new QueryableStateClient("localhost", 
proxyPortRangeStart);
+
+                       // verify that we are in HA mode
+                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.ZOOKEEPER);
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               if (cluster != null) {
+                       cluster.stop();
+                       cluster.awaitTermination();
+               }
+
+               try {
+                       zkServer.stop();
+                       zkServer.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+
+               client.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
new file mode 100644
index 0000000..6f31e76
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
FsStateBackend}.
+ */
+public class HAQueryableStateFsBackendITCase extends 
HAAbstractQueryableStateTestBase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() {
+               setup(9064, 9069);
+       }
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index a2d3ad0..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
FsStateBackend}.
- */
-public class HAQueryableStateITCaseFsBackend extends 
HAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index fda1171..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
- */
-public class HAQueryableStateITCaseRocksDBBackend extends 
HAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
new file mode 100644
index 0000000..cae02e2
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
+ */
+public class HAQueryableStateRocksDBBackendITCase extends 
HAAbstractQueryableStateTestBase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() {
+               setup(9074, 9079);
+       }
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
deleted file mode 100644
index c258e70..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the HA mode.
- */
-public abstract class NonHAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
-
-       private static final int NUM_TMS = 2;
-       private static final int NUM_SLOTS_PER_TM = 4;
-
-       @BeforeClass
-       public static void setup() {
-               try {
-                       Configuration config = new Configuration();
-                       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
-                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
-                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069-" + (9069 + 
NUM_TMS));
-                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9062-" + (9062 + 
NUM_TMS));
-
-                       cluster = new TestingCluster(config, false);
-                       cluster.start(true);
-
-                       // verify that we are not in HA mode
-                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.NONE);
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() {
-               try {
-                       cluster.shutdown();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
new file mode 100644
index 0000000..2937a51
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateTestBase extends 
AbstractQueryableStateTestBase {
+
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 4;
+
+       public static void setup(int proxyPortRangeStart, int 
serverPortRangeStart) {
+               try {
+                       Configuration config = new Configuration();
+                       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
+                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + 
"-" + (proxyPortRangeStart + NUM_TMS));
+                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart 
+ "-" + (serverPortRangeStart + NUM_TMS));
+
+                       cluster = new TestingCluster(config, false);
+                       cluster.start(true);
+
+                       client = new QueryableStateClient("localhost", 
proxyPortRangeStart);
+
+                       // verify that we are not in HA mode
+                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.NONE);
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               try {
+                       cluster.shutdown();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               client.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
new file mode 100644
index 0000000..9457e0f
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
FsStateBackend}.
+ */
+public class NonHAQueryableStateFsBackendITCase extends 
NonHAAbstractQueryableStateTestBase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() {
+               setup(9084, 9089);
+       }
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index caa315a..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
FsStateBackend}.
- */
-public class NonHAQueryableStateITCaseFsBackend extends 
NonHAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index 10e9b57..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
- */
-public class NonHAQueryableStateITCaseRocksDBBackend extends 
NonHAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/614cc58a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
new file mode 100644
index 0000000..7778a94
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateRocksDBBackendITCase extends 
NonHAAbstractQueryableStateTestBase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() {
+               setup(9094, 9099);
+       }
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

Reply via email to