http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
deleted file mode 100644
index 3283295..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.queryablestate.client.state.ImmutableListState;
-import org.apache.flink.runtime.state.heap.HeapListState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableListState}.
- */
-public class ImmutableListStateTest {
-
-       private final ListStateDescriptor<Long> listStateDesc =
-                       new ListStateDescriptor<>("test", 
BasicTypeInfo.LONG_TYPE_INFO);
-
-       private ImmutableListState<Long> listState;
-
-       @Before
-       public void setUp() throws Exception {
-               if (!listStateDesc.isSerializerInitialized()) {
-                       listStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
-
-               List<Long> init = new ArrayList<>();
-               init.add(42L);
-
-               byte[] serInit = serializeInitValue(init);
-               listState = ImmutableListState.createState(listStateDesc, 
serInit);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testUpdate() {
-               List<Long> list = getStateContents();
-               assertEquals(1L, list.size());
-
-               long element = list.get(0);
-               assertEquals(42L, element);
-
-               listState.add(54L);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testClear() {
-               List<Long> list = getStateContents();
-               assertEquals(1L, list.size());
-
-               long element = list.get(0);
-               assertEquals(42L, element);
-
-               listState.clear();
-       }
-
-       /**
-        * Copied from {@link HeapListState#getSerializedValue(Object, Object)}.
-        */
-       private byte[] serializeInitValue(List<Long> toSerialize) throws 
IOException {
-               TypeSerializer<Long> serializer = 
listStateDesc.getElementSerializer();
-
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(baos);
-
-               // write the same as RocksDB writes lists, with one ',' 
separator
-               for (int i = 0; i < toSerialize.size(); i++) {
-                       serializer.serialize(toSerialize.get(i), view);
-                       if (i < toSerialize.size() - 1) {
-                               view.writeByte(',');
-                       }
-               }
-               view.flush();
-
-               return baos.toByteArray();
-       }
-
-       private List<Long> getStateContents() {
-               List<Long> list = new ArrayList<>();
-               for (Long elem: listState.get()) {
-                       list.add(elem);
-               }
-               return list;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
deleted file mode 100644
index 30a8a50..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableMapState;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests the {@link ImmutableMapState}.
- */
-public class ImmutableMapStateTest {
-
-       private final MapStateDescriptor<Long, Long> mapStateDesc =
-                       new MapStateDescriptor<>(
-                                       "test",
-                                       BasicTypeInfo.LONG_TYPE_INFO,
-                                       BasicTypeInfo.LONG_TYPE_INFO);
-
-       private ImmutableMapState<Long, Long> mapState;
-
-       @Before
-       public void setUp() throws Exception {
-               if (!mapStateDesc.isSerializerInitialized()) {
-                       mapStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
-
-               Map<Long, Long> initMap = new HashMap<>();
-               initMap.put(1L, 5L);
-               initMap.put(2L, 5L);
-
-               byte[] initSer = KvStateSerializer.serializeMap(
-                               initMap.entrySet(),
-                               
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
-
-               mapState = ImmutableMapState.createState(mapStateDesc, initSer);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testPut() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               mapState.put(2L, 54L);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testPutAll() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               Map<Long, Long> nMap = new HashMap<>();
-               nMap.put(1L, 7L);
-               nMap.put(2L, 7L);
-
-               mapState.putAll(nMap);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testUpdate() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               mapState.put(2L, 54L);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testIterator() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
-               while (iterator.hasNext()) {
-                       iterator.remove();
-               }
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testIterable() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
-               Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
-               while (iterator.hasNext()) {
-                       assertEquals(5L, (long) iterator.next().getValue());
-                       iterator.remove();
-               }
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testKeys() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               Iterator<Long> iterator = mapState.keys().iterator();
-               while (iterator.hasNext()) {
-                       iterator.remove();
-               }
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testValues() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               Iterator<Long> iterator = mapState.values().iterator();
-               while (iterator.hasNext()) {
-                       iterator.remove();
-               }
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testClear() {
-               assertTrue(mapState.contains(1L));
-               long value = mapState.get(1L);
-               assertEquals(5L, value);
-
-               assertTrue(mapState.contains(2L));
-               value = mapState.get(2L);
-               assertEquals(5L, value);
-
-               mapState.clear();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
deleted file mode 100644
index 9b1ecf8..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableReducingState}.
- */
-public class ImmutableReducingStateTest {
-
-       private final ReducingStateDescriptor<Long> reducingStateDesc =
-                       new ReducingStateDescriptor<>("test", new SumReduce(), 
BasicTypeInfo.LONG_TYPE_INFO);
-
-       private ImmutableReducingState<Long> reduceState;
-
-       @Before
-       public void setUp() throws Exception {
-               if (!reducingStateDesc.isSerializerInitialized()) {
-                       reducingStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
-
-               reduceState = ImmutableReducingState.createState(
-                               reducingStateDesc,
-                               
ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
-               );
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testUpdate() {
-               long value = reduceState.get();
-               assertEquals(42L, value);
-
-               reduceState.add(54L);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testClear() {
-               long value = reduceState.get();
-               assertEquals(42L, value);
-
-               reduceState.clear();
-       }
-
-       /**
-        * Test {@link ReduceFunction} summing up its two arguments.
-        */
-       private static class SumReduce implements ReduceFunction<Long> {
-
-               private static final long serialVersionUID = 
6041237513913189144L;
-
-               @Override
-               public Long reduce(Long value1, Long value2) throws Exception {
-                       return value1 + value2;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
deleted file mode 100644
index 5f7032d..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableValueState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableValueState}.
- */
-public class ImmutableValueStateTest {
-
-       private final ValueStateDescriptor<Long> valueStateDesc =
-                       new ValueStateDescriptor<>("test", 
BasicTypeInfo.LONG_TYPE_INFO);
-
-       private ImmutableValueState<Long> valueState;
-
-       @Before
-       public void setUp() throws Exception {
-               if (!valueStateDesc.isSerializerInitialized()) {
-                       valueStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
-
-               valueState = ImmutableValueState.createState(
-                               valueStateDesc,
-                               
ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
-               );
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testUpdate() {
-               long value = valueState.value();
-               assertEquals(42L, value);
-
-               valueState.update(54L);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testClear() {
-               long value = valueState.value();
-               assertEquals(42L, value);
-
-               valueState.clear();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
 
b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
deleted file mode 100644
index 10792cd..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
-log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml 
b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
new file mode 100644
index 0000000..f39498e
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-queryable-state</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
+       <name>flink-queryable-state-runtime</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-queryable-state-client-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- ===================================================
+                                                               Testing
+                       =================================================== -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.curator</groupId>
+                       <artifactId>curator-test</artifactId>
+                       <version>${curator.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
new file mode 100644
index 0000000..d434336
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
+import 
org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import 
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This handler acts as an internal (to the Flink cluster) client that receives
+ * the requests from external clients, executes them by contacting the Job 
Manager (if necessary) and
+ * the Task Manager holding the requested state, and forwards the answer back 
to the client.
+ */
+@Internal
[email protected]
+public class KvStateClientProxyHandler extends 
AbstractServerHandler<KvStateRequest, KvStateResponse> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientProxyHandler.class);
+
+       /** The proxy using this handler. */
+       private final KvStateClientProxy proxy;
+
+       /** A cache to hold the location of different states for which we have 
already seen requests. */
+       private final ConcurrentMap<Tuple2<JobID, String>, 
CompletableFuture<KvStateLocation>> lookupCache =
+                       new ConcurrentHashMap<>();
+
+       /**
+        * Network client to forward queries to {@link KvStateServerImpl state 
server}
+        * instances inside the cluster.
+        */
+       private final Client<KvStateInternalRequest, KvStateResponse> 
kvStateClient;
+
+       /**
+        * Create the handler used by the {@link KvStateClientProxyImpl}.
+        *
+        * @param proxy the {@link KvStateClientProxyImpl proxy} using the 
handler.
+        * @param queryExecutorThreads the number of threads used to process 
incoming requests.
+        * @param serializer the {@link MessageSerializer} used to (de-) 
serialize the different messages.
+        * @param stats server statistics collector.
+        */
+       public KvStateClientProxyHandler(
+                       final KvStateClientProxyImpl proxy,
+                       final int queryExecutorThreads,
+                       final MessageSerializer<KvStateRequest, 
KvStateResponse> serializer,
+                       final KvStateRequestStats stats) {
+
+               super(proxy, serializer, stats);
+               this.proxy = Preconditions.checkNotNull(proxy);
+               this.kvStateClient = createInternalClient(queryExecutorThreads);
+       }
+
+       private static Client<KvStateInternalRequest, KvStateResponse> 
createInternalClient(int threads) {
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> messageSerializer =
+                               new MessageSerializer<>(
+                                               new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+                                               new 
KvStateResponse.KvStateResponseDeserializer());
+
+               return new Client<>(
+                               "Queryable State Proxy Client",
+                               threads,
+                               messageSerializer,
+                               new DisabledKvStateRequestStats());
+       }
+
+       @Override
+       public CompletableFuture<KvStateResponse> handleRequest(
+                       final long requestId,
+                       final KvStateRequest request) {
+               CompletableFuture<KvStateResponse> response = new 
CompletableFuture<>();
+               executeActionAsync(response, request, false);
+               return response;
+       }
+
+       private void executeActionAsync(
+                       final CompletableFuture<KvStateResponse> result,
+                       final KvStateRequest request,
+                       final boolean update) {
+
+               if (!result.isDone()) {
+                       final CompletableFuture<KvStateResponse> 
operationFuture = getState(request, update);
+                       operationFuture.whenCompleteAsync(
+                                       (t, throwable) -> {
+                                               if (throwable != null) {
+                                                       if (throwable 
instanceof CancellationException) {
+                                                               
result.completeExceptionally(throwable);
+                                                       } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+                                                                       
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
+                                                                       
throwable.getCause() instanceof UnknownKvStateLocation ||
+                                                                       
throwable.getCause() instanceof ConnectException) {
+
+                                                               // These 
failures are likely to be caused by out-of-sync
+                                                               // 
KvStateLocation. Therefore we retry this query and
+                                                               // force look 
up the location.
+
+                                                               
executeActionAsync(result, request, true);
+                                                       } else {
+                                                               
result.completeExceptionally(throwable);
+                                                       }
+                                               } else {
+                                                       result.complete(t);
+                                               }
+                                       }, queryExecutor);
+
+                       result.whenComplete(
+                                       (t, throwable) -> 
operationFuture.cancel(false));
+               }
+       }
+
+       private CompletableFuture<KvStateResponse> getState(
+                       final KvStateRequest request,
+                       final boolean forceUpdate) {
+
+               return getKvStateLookupInfo(request.getJobId(), 
request.getStateName(), forceUpdate)
+                               .thenComposeAsync((Function<KvStateLocation, 
CompletableFuture<KvStateResponse>>) location -> {
+                                       final int keyGroupIndex = 
KeyGroupRangeAssignment.computeKeyGroupForKeyHash(
+                                                       
request.getKeyHashCode(), location.getNumKeyGroups());
+
+                                       final InetSocketAddress serverAddress = 
location.getKvStateServerAddress(keyGroupIndex);
+                                       if (serverAddress == null) {
+                                               return 
FutureUtils.getFailedFuture(new 
UnknownKvStateKeyGroupLocationException(getServerName()));
+                                       } else {
+                                               // Query server
+                                               final KvStateID kvStateId = 
location.getKvStateID(keyGroupIndex);
+                                               final KvStateInternalRequest 
internalRequest = new KvStateInternalRequest(
+                                                               kvStateId, 
request.getSerializedKeyAndNamespace());
+                                               return 
kvStateClient.sendRequest(serverAddress, internalRequest);
+                                       }
+                               }, queryExecutor);
+       }
+
+       /**
+        * Lookup the {@link KvStateLocation} for the given job and queryable 
state name.
+        *
+        * <p>The job manager will be queried for the location only if forced 
or no
+        * cached location can be found. There are no guarantees about
+        *
+        * @param jobId              JobID the state instance belongs to.
+        * @param queryableStateName Name under which the state instance has 
been published.
+        * @param forceUpdate        Flag to indicate whether to force a update 
via the lookup service.
+        * @return Future holding the KvStateLocation
+        */
+       private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final boolean forceUpdate) {
+
+               final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, 
queryableStateName);
+               final CompletableFuture<KvStateLocation> cachedFuture = 
lookupCache.get(cacheKey);
+
+               if (!forceUpdate && cachedFuture != null && 
!cachedFuture.isCompletedExceptionally()) {
+                       LOG.debug("Retrieving location for state={} of job={} 
from the cache.", jobId, queryableStateName);
+                       return cachedFuture;
+               }
+
+               LOG.debug("Retrieving location for state={} of job={} from the 
job manager.", jobId, queryableStateName);
+
+               return proxy.getJobManagerFuture().thenComposeAsync(
+                               jobManagerGateway -> {
+                                       final Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
+                                       final 
CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
+                                                       
jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+                                                                       
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
+
+                                       lookupCache.put(cacheKey, 
locationFuture);
+                                       return locationFuture;
+                               }, queryExecutor);
+       }
+
+       @Override
+       public void shutdown() {
+               kvStateClient.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
new file mode 100644
index 0000000..f473443
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.exceptions.UnknownJobManagerException;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default implementation of the {@link KvStateClientProxy}.
+ */
+@Internal
+public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, 
KvStateResponse> implements KvStateClientProxy {
+
+       private static final CompletableFuture<ActorGateway> 
UNKNOWN_JOB_MANAGER =
+                       FutureUtils.getFailedFuture(new 
UnknownJobManagerException());
+
+       /** Number of threads used to process incoming requests. */
+       private final int queryExecutorThreads;
+
+       /** Statistics collector. */
+       private final KvStateRequestStats stats;
+
+       private final Object leaderLock = new Object();
+
+       private CompletableFuture<ActorGateway> jobManagerFuture = 
UNKNOWN_JOB_MANAGER;
+
+       /**
+        * Creates the Queryable State Client Proxy.
+        *
+        * <p>The server is instantiated using reflection by the
+        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress,
 Iterator, int, int, KvStateRequestStats)
+        * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, 
int, int, KvStateRequestStats)}.
+        *
+        * <p>The server needs to be started via {@link #start()} in order to 
bind
+        * to the configured bind address.
+        *
+        * @param bindAddress the address to listen to.
+        * @param bindPortIterator the port range to try to bind to.
+        * @param numEventLoopThreads number of event loop threads.
+        * @param numQueryThreads number of query threads.
+        * @param stats the statistics collector.
+        */
+       public KvStateClientProxyImpl(
+                       final InetAddress bindAddress,
+                       final Iterator<Integer> bindPortIterator,
+                       final Integer numEventLoopThreads,
+                       final Integer numQueryThreads,
+                       final KvStateRequestStats stats) {
+
+               super("Queryable State Proxy Server", bindAddress, 
bindPortIterator, numEventLoopThreads, numQueryThreads);
+               Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
+               this.queryExecutorThreads = numQueryThreads;
+               this.stats = Preconditions.checkNotNull(stats);
+       }
+
+       @Override
+       public InetSocketAddress getServerAddress() {
+               return super.getServerAddress();
+       }
+
+       @Override
+       public void start() throws Throwable {
+               super.start();
+       }
+
+       @Override
+       public void shutdown() {
+               super.shutdown();
+       }
+
+       @Override
+       public void updateJobManager(CompletableFuture<ActorGateway> 
leadingJobManager) throws Exception {
+               synchronized (leaderLock) {
+                       if (leadingJobManager == null) {
+                               jobManagerFuture = UNKNOWN_JOB_MANAGER;
+                       } else {
+                               jobManagerFuture = leadingJobManager;
+                       }
+               }
+       }
+
+       @Override
+       public CompletableFuture<ActorGateway> getJobManagerFuture() {
+               synchronized (leaderLock) {
+                       return jobManagerFuture;
+               }
+       }
+
+       @Override
+       public AbstractServerHandler<KvStateRequest, KvStateResponse> 
initializeHandler() {
+               MessageSerializer<KvStateRequest, KvStateResponse> serializer =
+                               new MessageSerializer<>(
+                                               new 
KvStateRequest.KvStateRequestDeserializer(),
+                                               new 
KvStateResponse.KvStateResponseDeserializer());
+               return new KvStateClientProxyHandler(this, 
queryExecutorThreads, serializer, stats);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
new file mode 100644
index 0000000..8c8de59
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The request to be forwarded by the {@link 
org.apache.flink.runtime.query.KvStateClientProxy
+ * Queryable State Client Proxy} to the {@link 
org.apache.flink.runtime.query.KvStateServer State Server}
+ * of the Task Manager responsible for the requested state.
+ */
+@Internal
+public class KvStateInternalRequest extends MessageBody {
+
+       private final KvStateID kvStateId;
+       private final byte[] serializedKeyAndNamespace;
+
+       public KvStateInternalRequest(
+                       final KvStateID stateId,
+                       final byte[] serializedKeyAndNamespace) {
+
+               this.kvStateId = Preconditions.checkNotNull(stateId);
+               this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace);
+       }
+
+       public KvStateID getKvStateId() {
+               return kvStateId;
+       }
+
+       public byte[] getSerializedKeyAndNamespace() {
+               return serializedKeyAndNamespace;
+       }
+
+       @Override
+       public byte[] serialize() {
+
+               // KvStateId + sizeOf(serializedKeyAndNamespace) + 
serializedKeyAndNamespace
+               final int size = KvStateID.SIZE + Integer.BYTES + 
serializedKeyAndNamespace.length;
+
+               return ByteBuffer.allocate(size)
+                               .putLong(kvStateId.getLowerPart())
+                               .putLong(kvStateId.getUpperPart())
+                               .putInt(serializedKeyAndNamespace.length)
+                               .put(serializedKeyAndNamespace)
+                               .array();
+       }
+
+       /**
+        * A {@link MessageDeserializer deserializer} for {@link 
KvStateInternalRequest}.
+        */
+       public static class KvStateInternalRequestDeserializer implements 
MessageDeserializer<KvStateInternalRequest> {
+
+               @Override
+               public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
+                       KvStateID kvStateId = new KvStateID(buf.readLong(), 
buf.readLong());
+
+                       int length = buf.readInt();
+                       Preconditions.checkArgument(length >= 0,
+                                       "Negative length for key and namespace. 
" +
+                                                       "This indicates a 
serialization error.");
+
+                       byte[] serializedKeyAndNamespace = new byte[length];
+                       if (length > 0) {
+                               buf.readBytes(serializedKeyAndNamespace);
+                       }
+                       return new KvStateInternalRequest(kvStateId, 
serializedKeyAndNamespace);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
new file mode 100644
index 0000000..476f153
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.server;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler dispatches asynchronous tasks, which query {@link 
InternalKvState}
+ * instances and write the result to the channel.
+ *
+ * <p>The network threads receive the message, deserialize it and dispatch the
+ * query task. The actual query is handled in a separate thread as it might
+ * otherwise block the network threads (file I/O etc.).
+ */
+@Internal
[email protected]
+public class KvStateServerHandler extends 
AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServerHandler.class);
+
+       /** KvState registry holding references to the KvState instances. */
+       private final KvStateRegistry registry;
+
+       /**
+        * Create the handler used by the {@link KvStateServerImpl}.
+        *
+        * @param server the {@link KvStateServerImpl} using the handler.
+        * @param kvStateRegistry registry to query.
+        * @param serializer the {@link MessageSerializer} used to (de-) 
serialize the different messages.
+        * @param stats server statistics collector.
+        */
+       public KvStateServerHandler(
+                       final KvStateServerImpl server,
+                       final KvStateRegistry kvStateRegistry,
+                       final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer,
+                       final KvStateRequestStats stats) {
+
+               super(server, serializer, stats);
+               this.registry = Preconditions.checkNotNull(kvStateRegistry);
+       }
+
+       @Override
+       public CompletableFuture<KvStateResponse> handleRequest(final long 
requestId, final KvStateInternalRequest request) {
+               final CompletableFuture<KvStateResponse> responseFuture = new 
CompletableFuture<>();
+
+               try {
+                       final InternalKvState<?> kvState = 
registry.getKvState(request.getKvStateId());
+                       if (kvState == null) {
+                               responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
+                       } else {
+                               byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
+
+                               byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+                               if (serializedResult != null) {
+                                       responseFuture.complete(new 
KvStateResponse(serializedResult));
+                               } else {
+                                       
responseFuture.completeExceptionally(new 
UnknownKeyOrNamespaceException(getServerName()));
+                               }
+                       }
+                       return responseFuture;
+               } catch (Throwable t) {
+                       String errMsg = "Error while processing request with ID 
" + requestId +
+                                       ". Caused by: " + 
ExceptionUtils.stringifyException(t);
+                       responseFuture.completeExceptionally(new 
RuntimeException(errMsg));
+                       return responseFuture;
+               }
+       }
+
+       @Override
+       public void shutdown() {
+               // do nothing
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
new file mode 100644
index 0000000..fe07687
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.server;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+
+/**
+ * The default implementation of the {@link KvStateServer}.
+ */
+@Internal
+public class KvStateServerImpl extends 
AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements 
KvStateServer {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServerImpl.class);
+
+       /** The {@link KvStateRegistry} to query for state instances. */
+       private final KvStateRegistry kvStateRegistry;
+
+       private final KvStateRequestStats stats;
+
+       private MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer;
+
+       /**
+        * Creates the state server.
+        *
+        * <p>The server is instantiated using reflection by the
+        * {@link 
org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress,
 Iterator, int, int, KvStateRegistry, KvStateRequestStats)
+        * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, 
int, KvStateRegistry, KvStateRequestStats)}.
+        *
+        * <p>The server needs to be started via {@link #start()} in order to 
bind
+        * to the configured bind address.
+        *
+        * @param bindAddress the address to listen to.
+        * @param bindPortIterator the port range to try to bind to.
+        * @param numEventLoopThreads number of event loop threads.
+        * @param numQueryThreads number of query threads.
+        * @param kvStateRegistry {@link KvStateRegistry} to query for state 
instances.
+        * @param stats the statistics collector.
+        */
+       public KvStateServerImpl(
+                       final InetAddress bindAddress,
+                       final Iterator<Integer> bindPortIterator,
+                       final Integer numEventLoopThreads,
+                       final Integer numQueryThreads,
+                       final KvStateRegistry kvStateRegistry,
+                       final KvStateRequestStats stats) {
+
+               super("Queryable State Server", bindAddress, bindPortIterator, 
numEventLoopThreads, numQueryThreads);
+               this.stats = Preconditions.checkNotNull(stats);
+               this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
+       }
+
+       @Override
+       public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> 
initializeHandler() {
+               this.serializer = new MessageSerializer<>(
+                               new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+                               new 
KvStateResponse.KvStateResponseDeserializer());
+               return new KvStateServerHandler(this, kvStateRegistry, 
serializer, stats);
+       }
+
+       public MessageSerializer<KvStateInternalRequest, KvStateResponse> 
getSerializer() {
+               Preconditions.checkState(serializer != null, "Server " + 
getServerName() + " has not been started.");
+               return serializer;
+       }
+
+       @Override
+       public void start() throws Throwable {
+               super.start();
+       }
+
+       @Override
+       public InetSocketAddress getServerAddress() {
+               return super.getServerAddress();
+       }
+
+       @Override
+       public void shutdown() {
+               super.shutdown();
+       }
+}

Reply via email to