http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..15a5ff6
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
+
+       private static final int NUM_JMS = 2;
+       private static final int NUM_TMS = 4;
+       private static final int NUM_SLOTS_PER_TM = 4;
+
+       private static TestingServer zkServer;
+       private static TemporaryFolder temporaryFolder;
+
+       @BeforeClass
+       public static void setup() {
+               try {
+                       zkServer = new TestingServer();
+                       temporaryFolder = new TemporaryFolder();
+                       temporaryFolder.create();
+
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
+                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+
+                       cluster = new TestingCluster(config, false);
+                       cluster.start();
+
+                       testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+                       // verify that we are in HA mode
+                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.ZOOKEEPER);
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               if (cluster != null) {
+                       cluster.stop();
+                       cluster.awaitTermination();
+               }
+
+               testActorSystem.shutdown();
+               testActorSystem.awaitTermination();
+
+               try {
+                       zkServer.stop();
+                       zkServer.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+
+               temporaryFolder.delete();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..a2d3ad0
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
FsStateBackend}.
+ */
+public class HAQueryableStateITCaseFsBackend extends 
HAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..fda1171
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
+ */
+public class HAQueryableStateITCaseRocksDBBackend extends 
HAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
new file mode 100644
index 0000000..907e8a3
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for the serialization and deserialization using
+ * the KvStateSerializer with a RocksDB state back-end.
+ */
+public final class KVStateRequestSerializerRocksDBTest {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * Extension of {@link RocksDBKeyedStateBackend} to make {@link
+        * #createListState(TypeSerializer, ListStateDescriptor)} public for 
use in
+        * the tests.
+        *
+        * @param <K> key type
+        */
+       static final class RocksDBKeyedStateBackend2<K> extends 
RocksDBKeyedStateBackend<K> {
+
+               RocksDBKeyedStateBackend2(
+                               final String operatorIdentifier,
+                               final ClassLoader userCodeClassLoader,
+                               final File instanceBasePath,
+                               final DBOptions dbOptions,
+                               final ColumnFamilyOptions columnFamilyOptions,
+                               final TaskKvStateRegistry kvStateRegistry,
+                               final TypeSerializer<K> keySerializer,
+                               final int numberOfKeyGroups,
+                               final KeyGroupRange keyGroupRange,
+                               final ExecutionConfig executionConfig) throws 
Exception {
+
+                       super(operatorIdentifier, userCodeClassLoader,
+                               instanceBasePath,
+                               dbOptions, columnFamilyOptions, 
kvStateRegistry, keySerializer,
+                               numberOfKeyGroups, keyGroupRange, 
executionConfig, false);
+               }
+
+               @Override
+               public <N, T> InternalListState<N, T> createListState(
+                       final TypeSerializer<N> namespaceSerializer,
+                       final ListStateDescriptor<T> stateDesc) throws 
Exception {
+
+                       return super.createListState(namespaceSerializer, 
stateDesc);
+               }
+       }
+
+       /**
+        * Tests list serialization and deserialization match.
+        *
+        * @see KvStateRequestSerializerTest#testListSerialization()
+        * KvStateRequestSerializerTest#testListSerialization() using the heap 
state back-end
+        * test
+        */
+       @Test
+       public void testListSerialization() throws Exception {
+               final long key = 0L;
+
+               // objects for RocksDB state list serialisation
+               DBOptions dbOptions = 
PredefinedOptions.DEFAULT.createDBOptions();
+               dbOptions.setCreateIfMissing(true);
+               ColumnFamilyOptions columnFamilyOptions = 
PredefinedOptions.DEFAULT.createColumnOptions();
+               final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend 
=
+                       new RocksDBKeyedStateBackend2<>(
+                               "no-op",
+                               ClassLoader.getSystemClassLoader(),
+                               temporaryFolder.getRoot(),
+                               dbOptions,
+                               columnFamilyOptions,
+                               mock(TaskKvStateRegistry.class),
+                               LongSerializer.INSTANCE,
+                               1, new KeyGroupRange(0, 0),
+                               new ExecutionConfig()
+                       );
+               longHeapKeyedStateBackend.restore(null);
+               longHeapKeyedStateBackend.setCurrentKey(key);
+
+               final InternalListState<VoidNamespace, Long> listState = 
longHeapKeyedStateBackend
+                       .createListState(VoidNamespaceSerializer.INSTANCE,
+                               new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
+
+               KvStateRequestSerializerTest.testListSerialization(key, 
listState);
+       }
+
+       /**
+        * Tests map serialization and deserialization match.
+        *
+        * @see KvStateRequestSerializerTest#testMapSerialization()
+        * KvStateRequestSerializerTest#testMapSerialization() using the heap 
state back-end
+        * test
+        */
+       @Test
+       public void testMapSerialization() throws Exception {
+               final long key = 0L;
+
+               // objects for RocksDB state list serialisation
+               DBOptions dbOptions = 
PredefinedOptions.DEFAULT.createDBOptions();
+               dbOptions.setCreateIfMissing(true);
+               ColumnFamilyOptions columnFamilyOptions = 
PredefinedOptions.DEFAULT.createColumnOptions();
+               final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+                       new RocksDBKeyedStateBackend<>(
+                               "no-op",
+                               ClassLoader.getSystemClassLoader(),
+                               temporaryFolder.getRoot(),
+                               dbOptions,
+                               columnFamilyOptions,
+                               mock(TaskKvStateRegistry.class),
+                               LongSerializer.INSTANCE,
+                               1, new KeyGroupRange(0, 0),
+                               new ExecutionConfig(),
+                               false);
+               longHeapKeyedStateBackend.restore(null);
+               longHeapKeyedStateBackend.setCurrentKey(key);
+
+               final InternalMapState<VoidNamespace, Long, String> mapState = 
(InternalMapState<VoidNamespace, Long, String>)
+                               longHeapKeyedStateBackend.getPartitionedState(
+                                               VoidNamespace.INSTANCE,
+                                               
VoidNamespaceSerializer.INSTANCE,
+                                               new 
MapStateDescriptor<>("test", LongSerializer.INSTANCE, 
StringSerializer.INSTANCE));
+
+               KvStateRequestSerializerTest.testMapSerialization(key, 
mapState);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..c52acc8
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
+
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 4;
+
+       @BeforeClass
+       public static void setup() {
+               try {
+                       Configuration config = new Configuration();
+                       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
+                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+
+                       cluster = new TestingCluster(config, false);
+                       cluster.start(true);
+
+                       testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+                       // verify that we are not in HA mode
+                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.NONE);
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               try {
+                       cluster.shutdown();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+
+               if (testActorSystem != null) {
+                       testActorSystem.shutdown();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..caa315a
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
FsStateBackend}.
+ */
+public class NonHAQueryableStateITCaseFsBackend extends 
NonHAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..10e9b57
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateITCaseRocksDBBackend extends 
NonHAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
new file mode 100644
index 0000000..d9a41a1
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownJobManager;
+import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link AkkaKvStateLocationLookupService}.
+ */
+public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
+
+       /** The default timeout. */
+       private static final FiniteDuration TIMEOUT = new FiniteDuration(10, 
TimeUnit.SECONDS);
+
+       /** Test actor system shared between the tests. */
+       private static ActorSystem testActorSystem;
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (testActorSystem != null) {
+                       testActorSystem.shutdown();
+               }
+       }
+
+       /**
+        * Tests responses if no leader notification has been reported or 
leadership
+        * has been lost (leaderAddress = <code>null</code>).
+        */
+       @Test
+       public void testNoJobManagerRegistered() throws Exception {
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       null,
+                       null);
+               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
+
+               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
+                               leaderRetrievalService,
+                               testActorSystem,
+                               TIMEOUT,
+                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+               lookupService.start();
+
+               //
+               // No leader registered initially => fail with UnknownJobManager
+               //
+               try {
+                       JobID jobId = new JobID();
+                       String name = "coffee";
+
+                       Future<KvStateLocation> locationFuture = 
lookupService.getKvStateLookupInfo(jobId, name);
+
+                       Await.result(locationFuture, TIMEOUT);
+                       fail("Did not throw expected Exception");
+               } catch (UnknownJobManager ignored) {
+                       // Expected
+               }
+
+               assertEquals("Received unexpected lookup", 0, received.size());
+
+               //
+               // Leader registration => communicate with new leader
+               //
+               UUID leaderSessionId = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
+               KvStateLocation expected = new KvStateLocation(new JobID(), new 
JobVertexID(), 8282, "tea");
+
+               ActorRef testActor = LookupResponseActor.create(received, 
leaderSessionId, expected);
+
+               String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, 
testActor);
+
+               // Notify the service about a leader
+               leaderRetrievalService.notifyListener(testActorAddress, 
leaderSessionId);
+
+               JobID jobId = new JobID();
+               String name = "tea";
+
+               // Verify that the leader response is handled
+               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
+               assertEquals(expected, location);
+
+               // Verify that the correct message was sent to the leader
+               assertEquals(1, received.size());
+
+               verifyLookupMsg(received.poll(), jobId, name);
+
+               //
+               // Leader loss => fail with UnknownJobManager
+               //
+               leaderRetrievalService.notifyListener(null, null);
+
+               try {
+                       Future<KvStateLocation> locationFuture = lookupService
+                                       .getKvStateLookupInfo(new JobID(), 
"coffee");
+
+                       Await.result(locationFuture, TIMEOUT);
+                       fail("Did not throw expected Exception");
+               } catch (UnknownJobManager ignored) {
+                       // Expected
+               }
+
+               // No new messages received
+               assertEquals(0, received.size());
+       }
+
+       /**
+        * Tests that messages are properly decorated with the leader session 
ID.
+        */
+       @Test
+       public void testLeaderSessionIdChange() throws Exception {
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
+               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
+
+               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
+                               leaderRetrievalService,
+                               testActorSystem,
+                               TIMEOUT,
+                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+               lookupService.start();
+
+               // Create test actors with random leader session IDs
+               KvStateLocation expected1 = new KvStateLocation(new JobID(), 
new JobVertexID(), 8282, "salt");
+               UUID leaderSessionId1 = UUID.randomUUID();
+               ActorRef testActor1 = LookupResponseActor.create(received, 
leaderSessionId1, expected1);
+               String testActorAddress1 = 
AkkaUtils.getAkkaURL(testActorSystem, testActor1);
+
+               KvStateLocation expected2 = new KvStateLocation(new JobID(), 
new JobVertexID(), 22321, "pepper");
+               UUID leaderSessionId2 = UUID.randomUUID();
+               ActorRef testActor2 = LookupResponseActor.create(received, 
leaderSessionId1, expected2);
+               String testActorAddress2 = 
AkkaUtils.getAkkaURL(testActorSystem, testActor2);
+
+               JobID jobId = new JobID();
+
+               //
+               // Notify about first leader
+               //
+               leaderRetrievalService.notifyListener(testActorAddress1, 
leaderSessionId1);
+
+               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
+               assertEquals(expected1, location);
+
+               assertEquals(1, received.size());
+               verifyLookupMsg(received.poll(), jobId, "rock");
+
+               //
+               // Notify about second leader
+               //
+               leaderRetrievalService.notifyListener(testActorAddress2, 
leaderSessionId2);
+
+               location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
+               assertEquals(expected2, location);
+
+               assertEquals(1, received.size());
+               verifyLookupMsg(received.poll(), jobId, "roll");
+       }
+
+       /**
+        * Tests that lookups are retried when no leader notification is 
available.
+        */
+       @Test
+       public void testRetryOnUnknownJobManager() throws Exception {
+               final 
Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = 
new LinkedBlockingQueue<>();
+
+               AkkaKvStateLocationLookupService.LookupRetryStrategyFactory 
retryStrategy =
+                               new 
AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() {
+                                       @Override
+                                       public 
AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
+                                               return retryStrategies.poll();
+                                       }
+                               };
+
+               final TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       null,
+                       null);
+
+               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
+                               leaderRetrievalService,
+                               testActorSystem,
+                               TIMEOUT,
+                               retryStrategy);
+
+               lookupService.start();
+
+               //
+               // Test call to retry
+               //
+               final AtomicBoolean hasRetried = new AtomicBoolean();
+               retryStrategies.add(
+                               new 
AkkaKvStateLocationLookupService.LookupRetryStrategy() {
+                                       @Override
+                                       public FiniteDuration getRetryDelay() {
+                                               return FiniteDuration.Zero();
+                                       }
+
+                                       @Override
+                                       public boolean tryRetry() {
+                                               if 
(hasRetried.compareAndSet(false, true)) {
+                                                       return true;
+                                               }
+                                               return false;
+                                       }
+                               });
+
+               Future<KvStateLocation> locationFuture = 
lookupService.getKvStateLookupInfo(new JobID(), "yessir");
+
+               Await.ready(locationFuture, TIMEOUT);
+               assertTrue("Did not retry ", hasRetried.get());
+
+               //
+               // Test leader notification after retry
+               //
+               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
+
+               KvStateLocation expected = new KvStateLocation(new JobID(), new 
JobVertexID(), 12122, "garlic");
+               ActorRef testActor = LookupResponseActor.create(received, null, 
expected);
+               final String testActorAddress = 
AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+               retryStrategies.add(new 
AkkaKvStateLocationLookupService.LookupRetryStrategy() {
+                       @Override
+                       public FiniteDuration getRetryDelay() {
+                               return FiniteDuration.apply(100, 
TimeUnit.MILLISECONDS);
+                       }
+
+                       @Override
+                       public boolean tryRetry() {
+                               
leaderRetrievalService.notifyListener(testActorAddress, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+                               return true;
+                       }
+               });
+
+               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), 
TIMEOUT);
+               assertEquals(expected, location);
+       }
+
+       @Test
+       public void testUnexpectedResponseType() throws Exception {
+               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
+                       "localhost",
+                       HighAvailabilityServices.DEFAULT_LEADER_ID);
+               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
+
+               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
+                               leaderRetrievalService,
+                               testActorSystem,
+                               TIMEOUT,
+                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+               lookupService.start();
+
+               // Create test actors with random leader session IDs
+               String expected = "unexpected-response-type";
+               ActorRef testActor = LookupResponseActor.create(received, null, 
expected);
+               String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, 
testActor);
+
+               leaderRetrievalService.notifyListener(testActorAddress, null);
+
+               try {
+                       Await.result(lookupService.getKvStateLookupInfo(new 
JobID(), "spicy"), TIMEOUT);
+                       fail("Did not throw expected Exception");
+               } catch (Throwable ignored) {
+                       // Expected
+               }
+       }
+
+       private static final class LookupResponseActor extends 
FlinkUntypedActor {
+
+               /** Received lookup messages. */
+               private final Queue<LookupKvStateLocation> receivedLookups;
+
+               /** Responses on KvStateMessage.LookupKvStateLocation messages. 
*/
+               private final Queue<Object> lookupResponses;
+
+               /** The leader session ID. */
+               private UUID leaderSessionId;
+
+               public LookupResponseActor(
+                               Queue<LookupKvStateLocation> receivedLookups,
+                               UUID leaderSessionId, Object... 
lookupResponses) {
+
+                       this.receivedLookups = 
Preconditions.checkNotNull(receivedLookups, "Received lookups");
+                       this.leaderSessionId = leaderSessionId;
+                       this.lookupResponses = new ArrayDeque<>();
+
+                       if (lookupResponses != null) {
+                               for (Object resp : lookupResponses) {
+                                       this.lookupResponses.add(resp);
+                               }
+                       }
+               }
+
+               @Override
+               public void handleMessage(Object message) throws Exception {
+                       if (message instanceof LookupKvStateLocation) {
+                               // Add to received lookups queue
+                               receivedLookups.add((LookupKvStateLocation) 
message);
+
+                               Object msg = lookupResponses.poll();
+                               if (msg != null) {
+                                       if (msg instanceof Throwable) {
+                                               sender().tell(new 
Status.Failure((Throwable) msg), self());
+                                       } else {
+                                               sender().tell(new 
Status.Success(msg), self());
+                                       }
+                               }
+                       } else if (message instanceof UUID) {
+                               this.leaderSessionId = (UUID) message;
+                       } else {
+                               LOG.debug("Received unhandled message: {}", 
message);
+                       }
+               }
+
+               @Override
+               protected UUID getLeaderSessionID() {
+                       return leaderSessionId;
+               }
+
+               private static ActorRef create(
+                               Queue<LookupKvStateLocation> receivedLookups,
+                               UUID leaderSessionId,
+                               Object... lookupResponses) {
+
+                       return testActorSystem.actorOf(Props.create(
+                                       LookupResponseActor.class,
+                                       receivedLookups,
+                                       leaderSessionId,
+                                       lookupResponses));
+               }
+       }
+
+       private static void verifyLookupMsg(
+                       LookupKvStateLocation lookUpMsg,
+                       JobID expectedJobId,
+                       String expectedName) {
+
+               assertNotNull(lookUpMsg);
+               assertEquals(expectedJobId, lookUpMsg.getJobId());
+               assertEquals(expectedName, lookUpMsg.getRegistrationName());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
new file mode 100644
index 0000000..0b97bda
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.client.KvStateClientHandler;
+import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+
+import org.junit.Test;
+
+import java.nio.channels.ClosedChannelException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link KvStateClientHandler}.
+ */
+public class KvStateClientHandlerTest {
+
+       /**
+        * Tests that on reads the expected callback methods are called and read
+        * buffers are recycled.
+        */
+       @Test
+       public void testReadCallbacksAndBufferRecycling() throws Exception {
+               KvStateClientHandlerCallback callback = 
mock(KvStateClientHandlerCallback.class);
+
+               EmbeddedChannel channel = new EmbeddedChannel(new 
KvStateClientHandler(callback));
+
+               //
+               // Request success
+               //
+               ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+                               channel.alloc(),
+                               1222112277,
+                               new byte[0]);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify callback
+               channel.writeInbound(buf);
+               verify(callback, times(1)).onRequestResult(eq(1222112277L), 
any(byte[].class));
+               assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+               //
+               // Request failure
+               //
+               buf = MessageSerializer.serializeKvStateRequestFailure(
+                               channel.alloc(),
+                               1222112278,
+                               new RuntimeException("Expected test 
Exception"));
+               buf.skipBytes(4); // skip frame length
+
+               // Verify callback
+               channel.writeInbound(buf);
+               verify(callback, times(1)).onRequestFailure(eq(1222112278L), 
any(RuntimeException.class));
+               assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+               //
+               // Server failure
+               //
+               buf = MessageSerializer.serializeServerFailure(
+                               channel.alloc(),
+                               new RuntimeException("Expected test 
Exception"));
+               buf.skipBytes(4); // skip frame length
+
+               // Verify callback
+               channel.writeInbound(buf);
+               verify(callback, 
times(1)).onFailure(any(RuntimeException.class));
+
+               //
+               // Unexpected messages
+               //
+               buf = channel.alloc().buffer(4).writeInt(1223823);
+
+               // Verify callback
+               channel.writeInbound(buf);
+               verify(callback, 
times(2)).onFailure(any(IllegalStateException.class));
+               assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+               //
+               // Exception caught
+               //
+               channel.pipeline().fireExceptionCaught(new 
RuntimeException("Expected test Exception"));
+               verify(callback, 
times(3)).onFailure(any(RuntimeException.class));
+
+               //
+               // Channel inactive
+               //
+               channel.pipeline().fireChannelInactive();
+               verify(callback, 
times(4)).onFailure(any(ClosedChannelException.class));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
new file mode 100644
index 0000000..a2850b3
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
@@ -0,0 +1,752 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.client.KvStateClient;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KvStateClient}.
+ */
+public class KvStateClientTest {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientTest.class);
+
+       // Thread pool for client bootstrap (shared between tests)
+       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
+
+       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(100, TimeUnit.SECONDS);
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (NIO_GROUP != null) {
+                       NIO_GROUP.shutdownGracefully();
+               }
+       }
+
+       /**
+        * Tests simple queries, of which half succeed and half fail.
+        */
+       @Test
+       public void testSimpleRequests() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateClient client = null;
+               Channel serverChannel = null;
+
+               try {
+                       client = new KvStateClient(1, stats);
+
+                       // Random result
+                       final byte[] expected = new byte[1024];
+                       ThreadLocalRandom.current().nextBytes(expected);
+
+                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
+                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
+                                       channel.set(ctx.channel());
+                               }
+
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       received.add((ByteBuf) msg);
+                               }
+                       });
+
+                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       List<Future<byte[]>> futures = new ArrayList<>();
+
+                       int numQueries = 1024;
+
+                       for (int i = 0; i < numQueries; i++) {
+                               futures.add(client.getKvState(serverAddress, 
new KvStateID(), new byte[0]));
+                       }
+
+                       // Respond to messages
+                       Exception testException = new 
RuntimeException("Expected test Exception");
+
+                       for (int i = 0; i < numQueries; i++) {
+                               ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                               assertNotNull("Receive timed out", buf);
+
+                               Channel ch = channel.get();
+                               assertNotNull("Channel not active", ch);
+
+                               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+                               KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
+
+                               buf.release();
+
+                               if (i % 2 == 0) {
+                                       ByteBuf response = 
MessageSerializer.serializeKvStateRequestResult(
+                                                       serverChannel.alloc(),
+                                                       request.getRequestId(),
+                                                       expected);
+
+                                       ch.writeAndFlush(response);
+                               } else {
+                                       ByteBuf response = 
MessageSerializer.serializeKvStateRequestFailure(
+                                                       serverChannel.alloc(),
+                                                       request.getRequestId(),
+                                                       testException);
+
+                                       ch.writeAndFlush(response);
+                               }
+                       }
+
+                       for (int i = 0; i < numQueries; i++) {
+                               if (i % 2 == 0) {
+                                       byte[] serializedResult = 
Await.result(futures.get(i), deadline.timeLeft());
+                                       assertArrayEquals(expected, 
serializedResult);
+                               } else {
+                                       try {
+                                               Await.result(futures.get(i), 
deadline.timeLeft());
+                                               fail("Did not throw expected 
Exception");
+                                       } catch (RuntimeException ignored) {
+                                               // Expected
+                                       }
+                               }
+                       }
+
+                       assertEquals(numQueries, stats.getNumRequests());
+                       int expectedRequests = numQueries / 2;
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != expectedRequests ||
+                                       stats.getNumFailed() != 
expectedRequests)) {
+                               Thread.sleep(100);
+                       }
+
+                       assertEquals(expectedRequests, 
stats.getNumSuccessful());
+                       assertEquals(expectedRequests, stats.getNumFailed());
+               } finally {
+                       if (client != null) {
+                               client.shutDown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests that a request to an unavailable host is failed with 
ConnectException.
+        */
+       @Test
+       public void testRequestUnavailableHost() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+               KvStateClient client = null;
+
+               try {
+                       client = new KvStateClient(1, stats);
+
+                       int availablePort = NetUtils.getAvailablePort();
+
+                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(
+                                       InetAddress.getLocalHost(),
+                                       availablePort);
+
+                       Future<byte[]> future = 
client.getKvState(serverAddress, new KvStateID(), new byte[0]);
+
+                       try {
+                               Await.result(future, deadline.timeLeft());
+                               fail("Did not throw expected ConnectException");
+                       } catch (ConnectException ignored) {
+                               // Expected
+                       }
+               } finally {
+                       if (client != null) {
+                               client.shutDown();
+                       }
+
+                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Multiple threads concurrently fire queries.
+        */
+       @Test
+       public void testConcurrentQueries() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               ExecutorService executor = null;
+               KvStateClient client = null;
+               Channel serverChannel = null;
+
+               final byte[] serializedResult = new byte[1024];
+               ThreadLocalRandom.current().nextBytes(serializedResult);
+
+               try {
+                       int numQueryTasks = 4;
+                       final int numQueriesPerTask = 1024;
+
+                       executor = Executors.newFixedThreadPool(numQueryTasks);
+
+                       client = new KvStateClient(1, stats);
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       ByteBuf buf = (ByteBuf) msg;
+                                       assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+                                       KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
+
+                                       buf.release();
+
+                                       ByteBuf response = 
MessageSerializer.serializeKvStateRequestResult(
+                                                       ctx.alloc(),
+                                                       request.getRequestId(),
+                                                       serializedResult);
+
+                                       ctx.channel().writeAndFlush(response);
+                               }
+                       });
+
+                       final KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       final KvStateClient finalClient = client;
+                       Callable<List<Future<byte[]>>> queryTask = new 
Callable<List<Future<byte[]>>>() {
+                               @Override
+                               public List<Future<byte[]>> call() throws 
Exception {
+                                       List<Future<byte[]>> results = new 
ArrayList<>(numQueriesPerTask);
+
+                                       for (int i = 0; i < numQueriesPerTask; 
i++) {
+                                               
results.add(finalClient.getKvState(
+                                                               serverAddress,
+                                                               new KvStateID(),
+                                                               new byte[0]));
+                                       }
+
+                                       return results;
+                               }
+                       };
+
+                       // Submit query tasks
+                       List<java.util.concurrent.Future<List<Future<byte[]>>>> 
futures = new ArrayList<>();
+                       for (int i = 0; i < numQueryTasks; i++) {
+                               futures.add(executor.submit(queryTask));
+                       }
+
+                       // Verify results
+                       for (java.util.concurrent.Future<List<Future<byte[]>>> 
future : futures) {
+                               List<Future<byte[]>> results = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                               for (Future<byte[]> result : results) {
+                                       byte[] actual = Await.result(result, 
deadline.timeLeft());
+                                       assertArrayEquals(serializedResult, 
actual);
+                               }
+                       }
+
+                       int totalQueries = numQueryTasks * numQueriesPerTask;
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
stats.getNumSuccessful() != totalQueries) {
+                               Thread.sleep(100);
+                       }
+
+                       assertEquals(totalQueries, stats.getNumRequests());
+                       assertEquals(totalQueries, stats.getNumSuccessful());
+               } finally {
+                       if (executor != null) {
+                               executor.shutdown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       if (client != null) {
+                               client.shutDown();
+                       }
+
+                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests that a server failure closes the connection and removes it from
+        * the established connections.
+        */
+       @Test
+       public void testFailureClosesChannel() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateClient client = null;
+               Channel serverChannel = null;
+
+               try {
+                       client = new KvStateClient(1, stats);
+
+                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
+                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
+                                       channel.set(ctx.channel());
+                               }
+
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       received.add((ByteBuf) msg);
+                               }
+                       });
+
+                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       // Requests
+                       List<Future<byte[]>> futures = new ArrayList<>();
+                       futures.add(client.getKvState(serverAddress, new 
KvStateID(), new byte[0]));
+                       futures.add(client.getKvState(serverAddress, new 
KvStateID(), new byte[0]));
+
+                       ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       assertNotNull("Receive timed out", buf);
+                       buf.release();
+
+                       buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                       assertNotNull("Receive timed out", buf);
+                       buf.release();
+
+                       assertEquals(1, stats.getNumConnections());
+
+                       Channel ch = channel.get();
+                       assertNotNull("Channel not active", ch);
+
+                       // Respond with failure
+                       
ch.writeAndFlush(MessageSerializer.serializeServerFailure(
+                                       serverChannel.alloc(),
+                                       new RuntimeException("Expected test 
server failure")));
+
+                       try {
+                               Await.result(futures.remove(0), 
deadline.timeLeft());
+                               fail("Did not throw expected server failure");
+                       } catch (RuntimeException ignored) {
+                               // Expected
+                       }
+
+                       try {
+                               Await.result(futures.remove(0), 
deadline.timeLeft());
+                               fail("Did not throw expected server failure");
+                       } catch (RuntimeException ignored) {
+                               // Expected
+                       }
+
+                       assertEquals(0, stats.getNumConnections());
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0 ||
+                                       stats.getNumFailed() != 2)) {
+                               Thread.sleep(100);
+                       }
+
+                       assertEquals(2, stats.getNumRequests());
+                       assertEquals(0, stats.getNumSuccessful());
+                       assertEquals(2, stats.getNumFailed());
+               } finally {
+                       if (client != null) {
+                               client.shutDown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests that a server channel close, closes the connection and removes 
it
+        * from the established connections.
+        */
+       @Test
+       public void testServerClosesChannel() throws Exception {
+               Deadline deadline = TEST_TIMEOUT.fromNow();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateClient client = null;
+               Channel serverChannel = null;
+
+               try {
+                       client = new KvStateClient(1, stats);
+
+                       final AtomicBoolean received = new AtomicBoolean();
+                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
+
+                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
+                               @Override
+                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
+                                       channel.set(ctx.channel());
+                               }
+
+                               @Override
+                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
+                                       received.set(true);
+                               }
+                       });
+
+                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
+
+                       // Requests
+                       Future<byte[]> future = 
client.getKvState(serverAddress, new KvStateID(), new byte[0]);
+
+                       while (!received.get() && deadline.hasTimeLeft()) {
+                               Thread.sleep(50);
+                       }
+                       assertTrue("Receive timed out", received.get());
+
+                       assertEquals(1, stats.getNumConnections());
+
+                       
channel.get().close().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+
+                       try {
+                               Await.result(future, deadline.timeLeft());
+                               fail("Did not throw expected server failure");
+                       } catch (ClosedChannelException ignored) {
+                               // Expected
+                       }
+
+                       assertEquals(0, stats.getNumConnections());
+
+                       // Counts can take some time to propagate
+                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0 ||
+                                       stats.getNumFailed() != 1)) {
+                               Thread.sleep(100);
+                       }
+
+                       assertEquals(1, stats.getNumRequests());
+                       assertEquals(0, stats.getNumSuccessful());
+                       assertEquals(1, stats.getNumFailed());
+               } finally {
+                       if (client != null) {
+                               client.shutDown();
+                       }
+
+                       if (serverChannel != null) {
+                               serverChannel.close();
+                       }
+
+                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
+               }
+       }
+
+       /**
+        * Tests multiple clients querying multiple servers until 100k queries 
have
+        * been processed. At this point, the client is shut down and its 
verified
+        * that all ongoing requests are failed.
+        */
+       @Test
+       public void testClientServerIntegration() throws Exception {
+               // Config
+               final int numServers = 2;
+               final int numServerEventLoopThreads = 2;
+               final int numServerQueryThreads = 2;
+
+               final int numClientEventLoopThreads = 4;
+               final int numClientsTasks = 8;
+
+               final int batchSize = 16;
+
+               final int numKeyGroups = 1;
+
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               KvStateRegistry dummyRegistry = new KvStateRegistry();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(dummyRegistry);
+
+               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
+
+               final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
+
+               AtomicKvStateRequestStats clientStats = new 
AtomicKvStateRequestStats();
+
+               KvStateClient client = null;
+               ExecutorService clientTaskExecutor = null;
+               final KvStateServer[] server = new KvStateServer[numServers];
+
+               try {
+                       client = new KvStateClient(numClientEventLoopThreads, 
clientStats);
+                       clientTaskExecutor = 
Executors.newFixedThreadPool(numClientsTasks);
+
+                       // Create state
+                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+                       desc.setQueryable("any");
+
+                       // Create servers
+                       KvStateRegistry[] registry = new 
KvStateRegistry[numServers];
+                       AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
+                       final KvStateID[] ids = new KvStateID[numServers];
+
+                       for (int i = 0; i < numServers; i++) {
+                               registry[i] = new KvStateRegistry();
+                               serverStats[i] = new 
AtomicKvStateRequestStats();
+                               server[i] = new KvStateServerImpl(
+                                               InetAddress.getLocalHost(),
+                                               0,
+                                               numServerEventLoopThreads,
+                                               numServerQueryThreads,
+                                               registry[i],
+                                               serverStats[i]);
+
+                               server[i].start();
+
+                               backend.setCurrentKey(1010 + i);
+
+                               // Value per server
+                               ValueState<Integer> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE,
+                                               
VoidNamespaceSerializer.INSTANCE,
+                                               desc);
+
+                               state.update(201 + i);
+
+                               // we know it must be a KvStat but this is not 
exposed to the user via State
+                               InternalKvState<?> kvState = 
(InternalKvState<?>) state;
+
+                               // Register KvState (one state instance for all 
server)
+                               ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+                       }
+
+                       final KvStateClient finalClient = client;
+                       Callable<Void> queryTask = new Callable<Void>() {
+                               @Override
+                               public Void call() throws Exception {
+                                       while (true) {
+                                               if (Thread.interrupted()) {
+                                                       throw new 
InterruptedException();
+                                               }
+
+                                               // Random server permutation
+                                               List<Integer> random = new 
ArrayList<>();
+                                               for (int j = 0; j < batchSize; 
j++) {
+                                                       random.add(j);
+                                               }
+                                               Collections.shuffle(random);
+
+                                               // Dispatch queries
+                                               List<Future<byte[]>> futures = 
new ArrayList<>(batchSize);
+
+                                               for (int j = 0; j < batchSize; 
j++) {
+                                                       int targetServer = 
random.get(j) % numServers;
+
+                                                       byte[] 
serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+                                                                       1010 + 
targetServer,
+                                                                       
IntSerializer.INSTANCE,
+                                                                       
VoidNamespace.INSTANCE,
+                                                                       
VoidNamespaceSerializer.INSTANCE);
+
+                                                       
futures.add(finalClient.getKvState(
+                                                                       
server[targetServer].getAddress(),
+                                                                       
ids[targetServer],
+                                                                       
serializedKeyAndNamespace));
+                                               }
+
+                                               // Verify results
+                                               for (int j = 0; j < batchSize; 
j++) {
+                                                       int targetServer = 
random.get(j) % numServers;
+
+                                                       Future<byte[]> future = 
futures.get(j);
+                                                       byte[] buf = 
Await.result(future, timeout);
+                                                       int value = 
KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
+                                                       assertEquals(201 + 
targetServer, value);
+                                               }
+                                       }
+                               }
+                       };
+
+                       // Submit tasks
+                       List<java.util.concurrent.Future<Void>> taskFutures = 
new ArrayList<>();
+                       for (int i = 0; i < numClientsTasks; i++) {
+                               
taskFutures.add(clientTaskExecutor.submit(queryTask));
+                       }
+
+                       long numRequests;
+                       while ((numRequests = clientStats.getNumRequests()) < 
100_000) {
+                               Thread.sleep(100);
+                               LOG.info("Number of requests {}/100_000", 
numRequests);
+                       }
+
+                       // Shut down
+                       client.shutDown();
+
+                       for (java.util.concurrent.Future<Void> future : 
taskFutures) {
+                               try {
+                                       future.get();
+                                       fail("Did not throw expected Exception 
after shut down");
+                               } catch (ExecutionException t) {
+                                       if (t.getCause() instanceof 
ClosedChannelException ||
+                                                       t.getCause() instanceof 
IllegalStateException) {
+                                               // Expected
+                                       } else {
+                                               t.printStackTrace();
+                                               fail("Failed with unexpected 
Exception type: " + t.getClass().getName());
+                                       }
+                               }
+                       }
+
+                       assertEquals("Connection leak (client)", 0, 
clientStats.getNumConnections());
+                       for (int i = 0; i < numServers; i++) {
+                               boolean success = false;
+                               int numRetries = 0;
+                               while (!success) {
+                                       try {
+                                               assertEquals("Connection leak 
(server)", 0, serverStats[i].getNumConnections());
+                                               success = true;
+                                       } catch (Throwable t) {
+                                               if (numRetries < 10) {
+                                                       LOG.info("Retrying 
connection leak check (server)");
+                                                       
Thread.sleep((numRetries + 1) * 50);
+                                                       numRetries++;
+                                               } else {
+                                                       throw t;
+                                               }
+                                       }
+                               }
+                       }
+               } finally {
+                       if (client != null) {
+                               client.shutDown();
+                       }
+
+                       for (int i = 0; i < numServers; i++) {
+                               if (server[i] != null) {
+                                       server[i].shutDown();
+                               }
+                       }
+
+                       if (clientTaskExecutor != null) {
+                               clientTaskExecutor.shutdown();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private Channel createServerChannel(final ChannelHandler... handlers) 
throws UnknownHostException, InterruptedException {
+               ServerBootstrap bootstrap = new ServerBootstrap()
+                               // Bind address and port
+                               .localAddress(InetAddress.getLocalHost(), 0)
+                               // NIO server channels
+                               .group(NIO_GROUP)
+                               .channel(NioServerSocketChannel.class)
+                               // See initializer for pipeline details
+                               .childHandler(new 
ChannelInitializer<SocketChannel>() {
+                                       @Override
+                                       protected void 
initChannel(SocketChannel ch) throws Exception {
+                                               ch.pipeline()
+                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+                                                               
.addLast(handlers);
+                                       }
+                               });
+
+               return bootstrap.bind().sync().channel();
+       }
+
+       private KvStateServerAddress getKvStateServerAddress(Channel 
serverChannel) {
+               InetSocketAddress localAddress = (InetSocketAddress) 
serverChannel.localAddress();
+
+               return new KvStateServerAddress(localAddress.getAddress(), 
localAddress.getPort());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
new file mode 100644
index 0000000..f28ca68
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KvStateSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class KvStateRequestSerializerTest {
+
+       private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+       @Parameterized.Parameters
+       public static Collection<Boolean> parameters() {
+               return Arrays.asList(false, true);
+       }
+
+       @Parameterized.Parameter
+       public boolean async;
+
+       /**
+        * Tests KvState request serialization.
+        */
+       @Test
+       public void testKvStateRequestSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 1337L;
+               KvStateID kvStateId = new KvStateID();
+               byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+               ByteBuf buf = MessageSerializer.serializeKvStateRequest(
+                               alloc,
+                               requestId,
+                               kvStateId,
+                               serializedKeyAndNamespace);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(requestId, request.getRequestId());
+               assertEquals(kvStateId, request.getKvStateId());
+               assertArrayEquals(serializedKeyAndNamespace, 
request.getSerializedKeyAndNamespace());
+       }
+
+       /**
+        * Tests KvState request serialization with zero-length serialized key 
and namespace.
+        */
+       @Test
+       public void 
testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception 
{
+               byte[] serializedKeyAndNamespace = new byte[0];
+
+               ByteBuf buf = MessageSerializer.serializeKvStateRequest(
+                               alloc,
+                               1823,
+                               new KvStateID(),
+                               serializedKeyAndNamespace);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertArrayEquals(serializedKeyAndNamespace, 
request.getSerializedKeyAndNamespace());
+       }
+
+       /**
+        * Tests that we don't try to be smart about <code>null</code> key and 
namespace.
+        * They should be treated explicitly.
+        */
+       @Test(expected = NullPointerException.class)
+       public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
+               new KvStateRequest(0, new KvStateID(), null);
+       }
+
+       /**
+        * Tests KvState request result serialization.
+        */
+       @Test
+       public void testKvStateRequestResultSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 72727278L;
+               byte[] serializedResult = randomByteArray(1024);
+
+               ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+                               alloc,
+                               requestId,
+                               serializedResult);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestResult request = 
MessageSerializer.deserializeKvStateRequestResult(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(requestId, request.getRequestId());
+
+               assertArrayEquals(serializedResult, 
request.getSerializedResult());
+       }
+
+       /**
+        * Tests KvState request result serialization with zero-length 
serialized result.
+        */
+       @Test
+       public void 
testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws 
Exception {
+               byte[] serializedResult = new byte[0];
+
+               ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+                               alloc,
+                               72727278,
+                               serializedResult);
+
+               int frameLength = buf.readInt();
+
+               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestResult request = 
MessageSerializer.deserializeKvStateRequestResult(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertArrayEquals(serializedResult, 
request.getSerializedResult());
+       }
+
+       /**
+        * Tests that we don't try to be smart about <code>null</code> results.
+        * They should be treated explicitly.
+        */
+       @Test(expected = NullPointerException.class)
+       public void testNullPointerExceptionOnNullSerializedResult() throws 
Exception {
+               new KvStateRequestResult(0, null);
+       }
+
+       /**
+        * Tests KvState request failure serialization.
+        */
+       @Test
+       public void testKvStateRequestFailureSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 1111222L;
+               IllegalStateException cause = new 
IllegalStateException("Expected test");
+
+               ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure(
+                               alloc,
+                               requestId,
+                               cause);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestFailure request = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(requestId, request.getRequestId());
+               assertEquals(cause.getClass(), request.getCause().getClass());
+               assertEquals(cause.getMessage(), 
request.getCause().getMessage());
+       }
+
+       /**
+        * Tests KvState server failure serialization.
+        */
+       @Test
+       public void testServerFailureSerialization() throws Exception {
+               IllegalStateException cause = new 
IllegalStateException("Expected test");
+
+               ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, 
cause);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               Throwable request = 
MessageSerializer.deserializeServerFailure(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(cause.getClass(), request.getClass());
+               assertEquals(cause.getMessage(), request.getMessage());
+       }
+
+       private byte[] randomByteArray(int capacity) {
+               byte[] bytes = new byte[capacity];
+               ThreadLocalRandom.current().nextBytes(bytes);
+               return bytes;
+       }
+}

Reply via email to