[FLINK-7769][QS] Move queryable state outside the runtime.

Creates a separate for the queryable state and  moves the client
code outside the runtime. The Task Manager is now instantiating
the KvStateServer using reflection.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29a6e995
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29a6e995
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29a6e995

Branch: refs/heads/master
Commit: 29a6e9952ebb2c7349d25d3696e2ec1d7e8e620a
Parents: bc4638a
Author: kkloudas <[email protected]>
Authored: Wed Oct 4 19:11:09 2017 +0200
Committer: kkloudas <[email protected]>
Committed: Wed Oct 11 15:33:30 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |    6 +-
 .../streaming/state/RocksDBMapState.java        |    8 +-
 .../flink-queryable-state-java/pom.xml          |  143 +++
 .../flink/queryablestate/UnknownJobManager.java |   35 +
 .../queryablestate/UnknownKeyOrNamespace.java   |   31 +
 .../flink/queryablestate/UnknownKvStateID.java  |   35 +
 .../UnknownKvStateKeyGroupLocation.java         |   31 +
 .../AkkaKvStateLocationLookupService.java       |  325 +++++
 .../queryablestate/client/KvStateClient.java    |  583 +++++++++
 .../client/KvStateClientHandler.java            |  107 ++
 .../client/KvStateClientHandlerCallback.java    |   54 +
 .../client/KvStateLocationLookupService.java    |   51 +
 .../client/QueryableStateClient.java            |  590 +++++++++
 .../queryablestate/messages/KvStateRequest.java |   89 ++
 .../messages/KvStateRequestFailure.java         |   68 ++
 .../messages/KvStateRequestResult.java          |   74 ++
 .../network/messages/MessageSerializer.java     |  332 ++++++
 .../network/messages/MessageType.java           |   39 +
 .../queryablestate/server/ChunkedByteBuf.java   |   98 ++
 .../server/KvStateServerHandler.java            |  308 +++++
 .../server/KvStateServerImpl.java               |  230 ++++
 .../itcases/AbstractQueryableStateITCase.java   | 1128 ++++++++++++++++++
 .../itcases/HAAbstractQueryableStateITCase.java |  101 ++
 .../HAQueryableStateITCaseFsBackend.java        |   39 +
 .../HAQueryableStateITCaseRocksDBBackend.java   |   39 +
 .../KVStateRequestSerializerRocksDBTest.java    |  167 +++
 .../NonHAAbstractQueryableStateITCase.java      |   81 ++
 .../NonHAQueryableStateITCaseFsBackend.java     |   39 +
 ...NonHAQueryableStateITCaseRocksDBBackend.java |   39 +
 .../AkkaKvStateLocationLookupServiceTest.java   |  399 +++++++
 .../network/KvStateClientHandlerTest.java       |  117 ++
 .../network/KvStateClientTest.java              |  752 ++++++++++++
 .../network/KvStateRequestSerializerTest.java   |  214 ++++
 .../network/KvStateServerHandlerTest.java       |  728 +++++++++++
 .../network/KvStateServerTest.java              |  201 ++++
 .../network/QueryableStateClientTest.java       |  458 +++++++
 .../src/test/resources/log4j-test.properties    |   31 +
 flink-queryable-state/pom.xml                   |   54 +
 .../runtime/io/network/NetworkEnvironment.java  |    2 +-
 .../query/AkkaKvStateLocationLookupService.java |  322 -----
 .../flink/runtime/query/KvStateLocation.java    |   10 +-
 .../query/KvStateLocationLookupService.java     |   50 -
 .../flink/runtime/query/KvStateRegistry.java    |    1 -
 .../flink/runtime/query/KvStateServer.java      |   43 +
 .../runtime/query/KvStateServerAddress.java     |    6 +-
 .../runtime/query/QueryableStateClient.java     |  587 ---------
 .../runtime/query/QueryableStateUtils.java      |   89 ++
 .../flink/runtime/query/UnknownJobManager.java  |   33 -
 .../query/UnknownKvStateKeyGroupLocation.java   |   29 -
 .../runtime/query/netty/ChunkedByteBuf.java     |   98 --
 .../runtime/query/netty/KvStateClient.java      |  579 ---------
 .../query/netty/KvStateClientHandler.java       |  106 --
 .../netty/KvStateClientHandlerCallback.java     |   54 -
 .../query/netty/KvStateRequestStats.java        |    2 +
 .../runtime/query/netty/KvStateServer.java      |  239 ----
 .../query/netty/KvStateServerHandler.java       |  305 -----
 .../query/netty/UnknownKeyOrNamespace.java      |   31 -
 .../runtime/query/netty/UnknownKvStateID.java   |   35 -
 .../query/netty/message/KvStateRequest.java     |   89 --
 .../netty/message/KvStateRequestFailure.java    |   68 --
 .../netty/message/KvStateRequestResult.java     |   74 --
 .../netty/message/KvStateRequestSerializer.java |  568 ---------
 .../query/netty/message/KvStateRequestType.java |   40 -
 .../query/netty/message/KvStateSerializer.java  |  267 +++++
 .../flink/runtime/query/netty/package-info.java |   80 --
 .../flink/runtime/query/package-info.java       |   60 -
 .../runtime/state/heap/AbstractHeapState.java   |    6 +-
 .../flink/runtime/state/heap/HeapMapState.java  |    4 +-
 .../QueryableStateConfiguration.java            |    6 +-
 .../taskexecutor/TaskManagerServices.java       |   34 +-
 .../TaskManagerServicesConfiguration.java       |    9 +-
 .../AkkaKvStateLocationLookupServiceTest.java   |  398 ------
 .../runtime/query/QueryableStateClientTest.java |  449 -------
 .../query/netty/KvStateClientHandlerTest.java   |  115 --
 .../runtime/query/netty/KvStateClientTest.java  |  747 ------------
 .../query/netty/KvStateServerHandlerTest.java   |  721 -----------
 .../runtime/query/netty/KvStateServerTest.java  |  196 ---
 .../message/KvStateRequestSerializerTest.java   |  219 +---
 .../runtime/state/StateBackendTestBase.java     |   14 +-
 .../query/AbstractQueryableStateITCase.java     | 1128 ------------------
 .../query/HAAbstractQueryableStateITCase.java   |  102 --
 .../query/HAQueryableStateITCaseFsBackend.java  |   39 -
 .../HAQueryableStateITCaseRocksDBBackend.java   |   39 -
 .../KVStateRequestSerializerRocksDBTest.java    |  168 ---
 .../NonHAAbstractQueryableStateITCase.java      |   81 --
 .../NonHAQueryableStateITCaseFsBackend.java     |   39 -
 ...NonHAQueryableStateITCaseRocksDBBackend.java |   39 -
 pom.xml                                         |    1 +
 88 files changed, 8299 insertions(+), 7946 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index c061835..cf365b4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
@@ -125,8 +125,8 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
        public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
                Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
 
-               //TODO make KvStateRequestSerializer key-group aware to save 
this round trip and key-group computation
-               Tuple2<K, N> des = KvStateRequestSerializer.<K, 
N>deserializeKeyAndNamespace(
+               //TODO make KvStateSerializer key-group aware to save this 
round trip and key-group computation
+               Tuple2<K, N> des = KvStateSerializer.<K, 
N>deserializeKeyAndNamespace(
                                serializedKeyAndNamespace,
                                backend.getKeySerializer(),
                                namespaceSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 75c1651..421bb2e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
@@ -223,8 +223,8 @@ public class RocksDBMapState<K, N, UK, UV>
        public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
                Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
 
-               //TODO make KvStateRequestSerializer key-group aware to save 
this round trip and key-group computation
-               Tuple2<K, N> des = 
KvStateRequestSerializer.deserializeKeyAndNamespace(
+               //TODO make KvStateSerializer key-group aware to save this 
round trip and key-group computation
+               Tuple2<K, N> des = KvStateSerializer.deserializeKeyAndNamespace(
                                serializedKeyAndNamespace,
                                backend.getKeySerializer(),
                                namespaceSerializer);
@@ -248,7 +248,7 @@ public class RocksDBMapState<K, N, UK, UV>
                        return null;
                }
 
-               return KvStateRequestSerializer.serializeMap(new 
Iterable<Map.Entry<UK, UV>>() {
+               return KvStateSerializer.serializeMap(new 
Iterable<Map.Entry<UK, UV>>() {
                        @Override
                        public Iterator<Map.Entry<UK, UV>> iterator() {
                                return iterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/pom.xml 
b/flink-queryable-state/flink-queryable-state-java/pom.xml
new file mode 100644
index 0000000..63403df
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-queryable-state</artifactId>
+        <version>1.4-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-queryable-state-java_${scala.binary.version}</artifactId>
+    <name>flink-queryable-state-java</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <!-- core dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+       <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- ===================================================
+                                                               Testing
+                       =================================================== -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.data-artisans</groupId>
+            <artifactId>flakka-testkit_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
new file mode 100644
index 0000000..93f2ba5
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
+
+/**
+ * Exception to fail Future with if no JobManager is currently registered at
+ * the {@link KvStateLocationLookupService}.
+ */
+public class UnknownJobManager extends Exception {
+
+       private static final long serialVersionUID = 1L;
+
+       public UnknownJobManager() {
+               super("Unknown JobManager. Either the JobManager has not 
registered yet " +
+                               "or has lost leadership.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
new file mode 100644
index 0000000..e921e40
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+/**
+ * Thrown if the KvState does not hold any state for the given key or 
namespace.
+ */
+public class UnknownKeyOrNamespace extends IllegalStateException {
+
+       private static final long serialVersionUID = 1L;
+
+       public UnknownKeyOrNamespace() {
+               super("KvState does not hold any state for key/namespace.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
new file mode 100644
index 0000000..d5ff828
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Thrown if no KvState with the given ID cannot found by the server handler.
+ */
+public class UnknownKvStateID extends IllegalStateException {
+
+       private static final long serialVersionUID = 1L;
+
+       public UnknownKvStateID(KvStateID kvStateId) {
+               super("No KvState registered with ID " + 
Preconditions.checkNotNull(kvStateId, "KvStateID") +
+                               " at TaskManager.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
new file mode 100644
index 0000000..fd25fae
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.runtime.query.KvStateLocation;
+
+/**
+ * Exception thrown if there is no location information available for the given
+ * key group in a {@link KvStateLocation} instance.
+ */
+public class UnknownKvStateKeyGroupLocation extends Exception {
+
+       private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
new file mode 100644
index 0000000..f42e008
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.UnknownJobManager;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.Recover;
+import akka.pattern.Patterns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * Akka-based {@link KvStateLocationLookupService} that retrieves the current
+ * JobManager address and uses it for lookups.
+ */
+public class AkkaKvStateLocationLookupService implements 
KvStateLocationLookupService, LeaderRetrievalListener {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateLocationLookupService.class);
+
+       /** Future returned when no JobManager is available. */
+       private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = 
Futures.failed(new UnknownJobManager());
+
+       /** Leader retrieval service to retrieve the current job manager. */
+       private final LeaderRetrievalService leaderRetrievalService;
+
+       /** The actor system used to resolve the JobManager address. */
+       private final ActorSystem actorSystem;
+
+       /** Timeout for JobManager ask-requests. */
+       private final FiniteDuration askTimeout;
+
+       /** Retry strategy factory on future failures. */
+       private final LookupRetryStrategyFactory retryStrategyFactory;
+
+       /** Current job manager future. */
+       private volatile Future<ActorGateway> jobManagerFuture = 
UNKNOWN_JOB_MANAGER;
+
+       /**
+        * Creates the Akka-based {@link KvStateLocationLookupService}.
+        *
+        * @param leaderRetrievalService Leader retrieval service to use.
+        * @param actorSystem            Actor system to use.
+        * @param askTimeout             Timeout for JobManager ask-requests.
+        * @param retryStrategyFactory   Retry strategy if no JobManager 
available.
+        */
+       public AkkaKvStateLocationLookupService(
+                       LeaderRetrievalService leaderRetrievalService,
+                       ActorSystem actorSystem,
+                       FiniteDuration askTimeout,
+                       LookupRetryStrategyFactory retryStrategyFactory) {
+
+               this.leaderRetrievalService = 
Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
+               this.actorSystem = Preconditions.checkNotNull(actorSystem, 
"Actor system");
+               this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask 
Timeout");
+               this.retryStrategyFactory = 
Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
+       }
+
+       public void start() {
+               try {
+                       leaderRetrievalService.start(this);
+               } catch (Exception e) {
+                       LOG.error("Failed to start leader retrieval service", 
e);
+                       throw new RuntimeException(e);
+               }
+       }
+
+       public void shutDown() {
+               try {
+                       leaderRetrievalService.stop();
+               } catch (Exception e) {
+                       LOG.error("Failed to stop leader retrieval service", e);
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, 
final String registrationName) {
+               return getKvStateLookupInfo(jobId, registrationName, 
retryStrategyFactory.createRetryStrategy());
+       }
+
+       /**
+        * Returns a future holding the {@link KvStateLocation} for the given 
job
+        * and KvState registration name.
+        *
+        * <p>If there is currently no JobManager registered with the service, 
the
+        * request is retried. The retry behaviour is specified by the
+        * {@link LookupRetryStrategy} of the lookup service.
+        *
+        * @param jobId               JobID the KvState instance belongs to
+        * @param registrationName    Name under which the KvState has been 
registered
+        * @param lookupRetryStrategy Retry strategy to use for retries on 
UnknownJobManager failures.
+        * @return Future holding the {@link KvStateLocation}
+        */
+       @SuppressWarnings("unchecked")
+       private Future<KvStateLocation> getKvStateLookupInfo(
+                       final JobID jobId,
+                       final String registrationName,
+                       final LookupRetryStrategy lookupRetryStrategy) {
+
+               return jobManagerFuture
+                               .flatMap(new Mapper<ActorGateway, 
Future<Object>>() {
+                                       @Override
+                                       public Future<Object> 
apply(ActorGateway jobManager) {
+                                               // Lookup the KvStateLocation
+                                               Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, registrationName);
+                                               return jobManager.ask(msg, 
askTimeout);
+                                       }
+                               }, actorSystem.dispatcher())
+                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
+                               .recoverWith(new 
Recover<Future<KvStateLocation>>() {
+                                       @Override
+                                       public Future<KvStateLocation> 
recover(Throwable failure) throws Throwable {
+                                               // If the Future fails with 
UnknownJobManager, retry
+                                               // the request. Otherwise all 
Futures will be failed
+                                               // during the start up phase, 
when the JobManager did
+                                               // not notify this service yet 
or leadership is lost
+                                               // intermittently.
+                                               if (failure instanceof 
UnknownJobManager && lookupRetryStrategy.tryRetry()) {
+                                                       return Patterns.after(
+                                                                       
lookupRetryStrategy.getRetryDelay(),
+                                                                       
actorSystem.scheduler(),
+                                                                       
actorSystem.dispatcher(),
+                                                                       new 
Callable<Future<KvStateLocation>>() {
+                                                                               
@Override
+                                                                               
public Future<KvStateLocation> call() throws Exception {
+                                                                               
        return getKvStateLookupInfo(
+                                                                               
                        jobId,
+                                                                               
                        registrationName,
+                                                                               
                        lookupRetryStrategy);
+                                                                               
}
+                                                                       });
+                                               } else {
+                                                       return 
Futures.failed(failure);
+                                               }
+                                       }
+                               }, actorSystem.dispatcher());
+       }
+
+       @Override
+       public void notifyLeaderAddress(String leaderAddress, final UUID 
leaderSessionID) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Received leader address notification {}:{}", 
leaderAddress, leaderSessionID);
+               }
+
+               if (leaderAddress == null) {
+                       jobManagerFuture = UNKNOWN_JOB_MANAGER;
+               } else {
+                       jobManagerFuture = 
AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
+                                       .map(new Mapper<ActorRef, 
ActorGateway>() {
+                                               @Override
+                                               public ActorGateway 
apply(ActorRef actorRef) {
+                                                       return new 
AkkaActorGateway(actorRef, leaderSessionID);
+                                               }
+                                       }, actorSystem.dispatcher());
+               }
+       }
+
+       @Override
+       public void handleError(Exception exception) {
+               jobManagerFuture = Futures.failed(exception);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Retry strategy for failed lookups.
+        *
+        * <p>Usage:
+        * <pre>
+        * LookupRetryStrategy retryStrategy = 
LookupRetryStrategyFactory.create();
+        *
+        * if (retryStrategy.tryRetry()) {
+        *     // OK to retry
+        *     FiniteDuration retryDelay = retryStrategy.getRetryDelay();
+        * }
+        * </pre>
+        */
+       public interface LookupRetryStrategy {
+
+               /**
+                * Returns the current retry.
+                *
+                * @return Current retry delay.
+                */
+               FiniteDuration getRetryDelay();
+
+               /**
+                * Tries another retry and returns whether it is allowed or not.
+                *
+                * @return Whether it is allowed to do another restart or not.
+                */
+               boolean tryRetry();
+
+       }
+
+       /**
+        * Factory for retry strategies.
+        */
+       public interface LookupRetryStrategyFactory {
+
+               /**
+                * Creates a new retry strategy.
+                *
+                * @return The retry strategy.
+                */
+               LookupRetryStrategy createRetryStrategy();
+
+       }
+
+       /**
+        * Factory for disabled retries.
+        */
+       public static class DisabledLookupRetryStrategyFactory implements 
LookupRetryStrategyFactory {
+
+               private static final DisabledLookupRetryStrategy RETRY_STRATEGY 
= new DisabledLookupRetryStrategy();
+
+               @Override
+               public LookupRetryStrategy createRetryStrategy() {
+                       return RETRY_STRATEGY;
+               }
+
+               private static class DisabledLookupRetryStrategy implements 
LookupRetryStrategy {
+
+                       @Override
+                       public FiniteDuration getRetryDelay() {
+                               return FiniteDuration.Zero();
+                       }
+
+                       @Override
+                       public boolean tryRetry() {
+                               return false;
+                       }
+               }
+
+       }
+
+       /**
+        * Factory for fixed delay retries.
+        */
+       public static class FixedDelayLookupRetryStrategyFactory implements 
LookupRetryStrategyFactory {
+
+               private final int maxRetries;
+               private final FiniteDuration retryDelay;
+
+               FixedDelayLookupRetryStrategyFactory(int maxRetries, 
FiniteDuration retryDelay) {
+                       this.maxRetries = maxRetries;
+                       this.retryDelay = retryDelay;
+               }
+
+               @Override
+               public LookupRetryStrategy createRetryStrategy() {
+                       return new FixedDelayLookupRetryStrategy(maxRetries, 
retryDelay);
+               }
+
+               private static class FixedDelayLookupRetryStrategy implements 
LookupRetryStrategy {
+
+                       private final Object retryLock = new Object();
+                       private final int maxRetries;
+                       private final FiniteDuration retryDelay;
+                       private int numRetries;
+
+                       public FixedDelayLookupRetryStrategy(int maxRetries, 
FiniteDuration retryDelay) {
+                               Preconditions.checkArgument(maxRetries >= 0, 
"Negative number maximum retries");
+                               this.maxRetries = maxRetries;
+                               this.retryDelay = 
Preconditions.checkNotNull(retryDelay, "Retry delay");
+                       }
+
+                       @Override
+                       public FiniteDuration getRetryDelay() {
+                               synchronized (retryLock) {
+                                       return retryDelay;
+                               }
+                       }
+
+                       @Override
+                       public boolean tryRetry() {
+                               synchronized (retryLock) {
+                                       if (numRetries < maxRetries) {
+                                               numRetries++;
+                                               return true;
+                                       } else {
+                                               return false;
+                                       }
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
new file mode 100644
index 0000000..d456cd7
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+
+import akka.dispatch.Futures;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+/**
+ * Netty-based client querying {@link KvStateServer} instances.
+ *
+ * <p>This client can be used by multiple threads concurrently. Operations are
+ * executed asynchronously and return Futures to their result.
+ *
+ * <p>The incoming pipeline looks as follows:
+ * <pre>
+ * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
+ * </pre>
+ *
+ * <p>Received binary messages are expected to contain a frame length field. 
Netty's
+ * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame 
before
+ * giving it to our {@link KvStateClientHandler}.
+ *
+ * <p>Connections are established and closed by the client. The server only
+ * closes the connection on a fatal failure that cannot be recovered.
+ */
+public class KvStateClient {
+
+       /** Netty's Bootstrap. */
+       private final Bootstrap bootstrap;
+
+       /** Statistics tracker. */
+       private final KvStateRequestStats stats;
+
+       /** Established connections. */
+       private final ConcurrentHashMap<KvStateServerAddress, 
EstablishedConnection> establishedConnections =
+                       new ConcurrentHashMap<>();
+
+       /** Pending connections. */
+       private final ConcurrentHashMap<KvStateServerAddress, 
PendingConnection> pendingConnections =
+                       new ConcurrentHashMap<>();
+
+       /** Atomic shut down flag. */
+       private final AtomicBoolean shutDown = new AtomicBoolean();
+
+       /**
+        * Creates a client with the specified number of event loop threads.
+        *
+        * @param numEventLoopThreads Number of event loop threads (minimum 1).
+        */
+       public KvStateClient(int numEventLoopThreads, KvStateRequestStats 
stats) {
+               Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
+               NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
+
+               ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                               .setDaemon(true)
+                               .setNameFormat("Flink KvStateClient Event Loop 
Thread %d")
+                               .build();
+
+               NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
+
+               this.bootstrap = new Bootstrap()
+                               .group(nioGroup)
+                               .channel(NioSocketChannel.class)
+                               .option(ChannelOption.ALLOCATOR, bufferPool)
+                               .handler(new 
ChannelInitializer<SocketChannel>() {
+                                       @Override
+                                       protected void 
initChannel(SocketChannel ch) throws Exception {
+                                               ch.pipeline()
+                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+                                                               // 
ChunkedWriteHandler respects Channel writability
+                                                               .addLast(new 
ChunkedWriteHandler());
+                                       }
+                               });
+
+               this.stats = Preconditions.checkNotNull(stats, "Statistics 
tracker");
+       }
+
+       /**
+        * Returns a future holding the serialized request result.
+        *
+        * <p>If the server does not serve a KvState instance with the given ID,
+        * the Future will be failed with a {@link UnknownKvStateID}.
+        *
+        * <p>If the KvState instance does not hold any data for the given key
+        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+        *
+        * <p>All other failures are forwarded to the Future.
+        *
+        * @param serverAddress Address of the server to query
+        * @param kvStateId ID of the KvState instance to query
+        * @param serializedKeyAndNamespace Serialized key and namespace to 
query KvState instance with
+        * @return Future holding the serialized result
+        */
+       public Future<byte[]> getKvState(
+                       KvStateServerAddress serverAddress,
+                       KvStateID kvStateId,
+                       byte[] serializedKeyAndNamespace) {
+
+               if (shutDown.get()) {
+                       return Futures.failed(new IllegalStateException("Shut 
down"));
+               }
+
+               EstablishedConnection connection = 
establishedConnections.get(serverAddress);
+
+               if (connection != null) {
+                       return connection.getKvState(kvStateId, 
serializedKeyAndNamespace);
+               } else {
+                       PendingConnection pendingConnection = 
pendingConnections.get(serverAddress);
+                       if (pendingConnection != null) {
+                               // There was a race, use the existing pending 
connection.
+                               return pendingConnection.getKvState(kvStateId, 
serializedKeyAndNamespace);
+                       } else {
+                               // We try to connect to the server.
+                               PendingConnection pending = new 
PendingConnection(serverAddress);
+                               PendingConnection previous = 
pendingConnections.putIfAbsent(serverAddress, pending);
+
+                               if (previous == null) {
+                                       // OK, we are responsible to connect.
+                                       
bootstrap.connect(serverAddress.getHost(), serverAddress.getPort())
+                                                       .addListener(pending);
+
+                                       return pending.getKvState(kvStateId, 
serializedKeyAndNamespace);
+                               } else {
+                                       // There was a race, use the existing 
pending connection.
+                                       return previous.getKvState(kvStateId, 
serializedKeyAndNamespace);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Shuts down the client and closes all connections.
+        *
+        * <p>After a call to this method, all returned futures will be failed.
+        */
+       public void shutDown() {
+               if (shutDown.compareAndSet(false, true)) {
+                       for (Map.Entry<KvStateServerAddress, 
EstablishedConnection> conn : establishedConnections.entrySet()) {
+                               if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
+                                       conn.getValue().close();
+                               }
+                       }
+
+                       for (Map.Entry<KvStateServerAddress, PendingConnection> 
conn : pendingConnections.entrySet()) {
+                               if (pendingConnections.remove(conn.getKey()) != 
null) {
+                                       conn.getValue().close();
+                               }
+                       }
+
+                       if (bootstrap != null) {
+                               EventLoopGroup group = bootstrap.group();
+                               if (group != null) {
+                                       group.shutdownGracefully(0, 10, 
TimeUnit.SECONDS);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Closes the connection to the given server address if it exists.
+        *
+        * <p>If there is a request to the server a new connection will be 
established.
+        *
+        * @param serverAddress Target address of the connection to close
+        */
+       public void closeConnection(KvStateServerAddress serverAddress) {
+               PendingConnection pending = 
pendingConnections.get(serverAddress);
+               if (pending != null) {
+                       pending.close();
+               }
+
+               EstablishedConnection established = 
establishedConnections.remove(serverAddress);
+               if (established != null) {
+                       established.close();
+               }
+       }
+
+       /**
+        * A pending connection that is in the process of connecting.
+        */
+       private class PendingConnection implements ChannelFutureListener {
+
+               /** Lock to guard the connect call, channel hand in, etc. */
+               private final Object connectLock = new Object();
+
+               /** Address of the server we are connecting to. */
+               private final KvStateServerAddress serverAddress;
+
+               /** Queue of requests while connecting. */
+               private final ArrayDeque<PendingRequest> queuedRequests = new 
ArrayDeque<>();
+
+               /** The established connection after the connect succeeds. */
+               private EstablishedConnection established;
+
+               /** Closed flag. */
+               private boolean closed;
+
+               /** Failure cause if something goes wrong. */
+               private Throwable failureCause;
+
+               /**
+                * Creates a pending connection to the given server.
+                *
+                * @param serverAddress Address of the server to connect to.
+                */
+               private PendingConnection(KvStateServerAddress serverAddress) {
+                       this.serverAddress = serverAddress;
+               }
+
+               @Override
+               public void operationComplete(ChannelFuture future) throws 
Exception {
+                       // Callback from the Bootstrap's connect call.
+                       if (future.isSuccess()) {
+                               handInChannel(future.channel());
+                       } else {
+                               close(future.cause());
+                       }
+               }
+
+               /**
+                * Returns a future holding the serialized request result.
+                *
+                * <p>If the channel has been established, forward the call to 
the
+                * established channel, otherwise queue it for when the channel 
is
+                * handed in.
+                *
+                * @param kvStateId                 ID of the KvState instance 
to query
+                * @param serializedKeyAndNamespace Serialized key and 
namespace to query KvState instance
+                *                                  with
+                * @return Future holding the serialized result
+                */
+               public Future<byte[]> getKvState(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
+                       synchronized (connectLock) {
+                               if (failureCause != null) {
+                                       return Futures.failed(failureCause);
+                               } else if (closed) {
+                                       return Futures.failed(new 
ClosedChannelException());
+                               } else {
+                                       if (established != null) {
+                                               return 
established.getKvState(kvStateId, serializedKeyAndNamespace);
+                                       } else {
+                                               // Queue this and handle when 
connected
+                                               PendingRequest pending = new 
PendingRequest(kvStateId, serializedKeyAndNamespace);
+                                               queuedRequests.add(pending);
+                                               return pending.promise.future();
+                                       }
+                               }
+                       }
+               }
+
+               /**
+                * Hands in a channel after a successful connection.
+                *
+                * @param channel Channel to hand in
+                */
+               private void handInChannel(Channel channel) {
+                       synchronized (connectLock) {
+                               if (closed || failureCause != null) {
+                                       // Close the channel and we are done. 
Any queued requests
+                                       // are removed on the close/failure 
call and after that no
+                                       // new ones can be enqueued.
+                                       channel.close();
+                               } else {
+                                       established = new 
EstablishedConnection(serverAddress, channel);
+
+                                       PendingRequest pending;
+                                       while ((pending = 
queuedRequests.poll()) != null) {
+                                               Future<byte[]> resultFuture = 
established.getKvState(
+                                                               
pending.kvStateId,
+                                                               
pending.serializedKeyAndNamespace);
+
+                                               
pending.promise.completeWith(resultFuture);
+                                       }
+
+                                       // Publish the channel for the general 
public
+                                       
establishedConnections.put(serverAddress, established);
+                                       
pendingConnections.remove(serverAddress);
+
+                                       // Check shut down for possible race 
with shut down. We
+                                       // don't want any lingering connections 
after shut down,
+                                       // which can happen if we don't check 
this here.
+                                       if (shutDown.get()) {
+                                               if 
(establishedConnections.remove(serverAddress, established)) {
+                                                       established.close();
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               /**
+                * Close the connecting channel with a ClosedChannelException.
+                */
+               private void close() {
+                       close(new ClosedChannelException());
+               }
+
+               /**
+                * Close the connecting channel with an Exception (can be
+                * <code>null</code>) or forward to the established channel.
+                */
+               private void close(Throwable cause) {
+                       synchronized (connectLock) {
+                               if (!closed) {
+                                       if (failureCause == null) {
+                                               failureCause = cause;
+                                       }
+
+                                       if (established != null) {
+                                               established.close();
+                                       } else {
+                                               PendingRequest pending;
+                                               while ((pending = 
queuedRequests.poll()) != null) {
+                                                       
pending.promise.tryFailure(cause);
+                                               }
+                                       }
+
+                                       closed = true;
+                               }
+                       }
+               }
+
+               /**
+                * A pending request queued while the channel is connecting.
+                */
+               private final class PendingRequest {
+
+                       private final KvStateID kvStateId;
+                       private final byte[] serializedKeyAndNamespace;
+                       private final Promise<byte[]> promise;
+
+                       private PendingRequest(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
+                               this.kvStateId = kvStateId;
+                               this.serializedKeyAndNamespace = 
serializedKeyAndNamespace;
+                               this.promise = Futures.promise();
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       synchronized (connectLock) {
+                               return "PendingConnection{" +
+                                               "serverAddress=" + 
serverAddress +
+                                               ", queuedRequests=" + 
queuedRequests.size() +
+                                               ", established=" + (established 
!= null) +
+                                               ", closed=" + closed +
+                                               '}';
+                       }
+               }
+       }
+
+       /**
+        * An established connection that wraps the actual channel instance and 
is
+        * registered at the {@link KvStateClientHandler} for callbacks.
+        */
+       private class EstablishedConnection implements 
KvStateClientHandlerCallback {
+
+               /** Address of the server we are connected to. */
+               private final KvStateServerAddress serverAddress;
+
+               /** The actual TCP channel. */
+               private final Channel channel;
+
+               /** Pending requests keyed by request ID. */
+               private final ConcurrentHashMap<Long, PromiseAndTimestamp> 
pendingRequests = new ConcurrentHashMap<>();
+
+               /** Current request number used to assign unique request IDs. */
+               private final AtomicLong requestCount = new AtomicLong();
+
+               /** Reference to a failure that was reported by the channel. */
+               private final AtomicReference<Throwable> failureCause = new 
AtomicReference<>();
+
+               /**
+                * Creates an established connection with the given channel.
+                *
+                * @param serverAddress Address of the server connected to
+                * @param channel The actual TCP channel
+                */
+               EstablishedConnection(KvStateServerAddress serverAddress, 
Channel channel) {
+                       this.serverAddress = 
Preconditions.checkNotNull(serverAddress, "KvStateServerAddress");
+                       this.channel = Preconditions.checkNotNull(channel, 
"Channel");
+
+                       // Add the client handler with the callback
+                       channel.pipeline().addLast("KvStateClientHandler", new 
KvStateClientHandler(this));
+
+                       stats.reportActiveConnection();
+               }
+
+               /**
+                * Close the channel with a ClosedChannelException.
+                */
+               void close() {
+                       close(new ClosedChannelException());
+               }
+
+               /**
+                * Close the channel with a cause.
+                *
+                * @param cause The cause to close the channel with.
+                * @return Channel close future
+                */
+               private boolean close(Throwable cause) {
+                       if (failureCause.compareAndSet(null, cause)) {
+                               channel.close();
+                               stats.reportInactiveConnection();
+
+                               for (long requestId : pendingRequests.keySet()) 
{
+                                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
+                                       if (pending != null && 
pending.promise.tryFailure(cause)) {
+                                               stats.reportFailedRequest();
+                                       }
+                               }
+
+                               return true;
+                       }
+
+                       return false;
+               }
+
+               /**
+                * Returns a future holding the serialized request result.
+                *
+                * @param kvStateId                 ID of the KvState instance 
to query
+                * @param serializedKeyAndNamespace Serialized key and 
namespace to query KvState instance
+                *                                  with
+                * @return Future holding the serialized result
+                */
+               Future<byte[]> getKvState(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
+                       PromiseAndTimestamp requestPromiseTs = new 
PromiseAndTimestamp(
+                                       Futures.<byte[]>promise(),
+                                       System.nanoTime());
+
+                       try {
+                               final long requestId = 
requestCount.getAndIncrement();
+                               pendingRequests.put(requestId, 
requestPromiseTs);
+
+                               stats.reportRequest();
+
+                               ByteBuf buf = 
MessageSerializer.serializeKvStateRequest(
+                                               channel.alloc(),
+                                               requestId,
+                                               kvStateId,
+                                               serializedKeyAndNamespace);
+
+                               channel.writeAndFlush(buf).addListener(new 
ChannelFutureListener() {
+                                       @Override
+                                       public void 
operationComplete(ChannelFuture future) throws Exception {
+                                               if (!future.isSuccess()) {
+                                                       // Fail promise if not 
failed to write
+                                                       PromiseAndTimestamp 
pending = pendingRequests.remove(requestId);
+                                                       if (pending != null && 
pending.promise.tryFailure(future.cause())) {
+                                                               
stats.reportFailedRequest();
+                                                       }
+                                               }
+                                       }
+                               });
+
+                               // Check failure for possible race. We don't 
want any lingering
+                               // promises after a failure, which can happen 
if we don't check
+                               // this here. Note that close is treated as a 
failure as well.
+                               Throwable failure = failureCause.get();
+                               if (failure != null) {
+                                       // Remove from pending requests to 
guard against concurrent
+                                       // removal and to make sure that we 
only count it once as failed.
+                                       PromiseAndTimestamp p = 
pendingRequests.remove(requestId);
+                                       if (p != null && 
p.promise.tryFailure(failure)) {
+                                               stats.reportFailedRequest();
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               requestPromiseTs.promise.tryFailure(t);
+                       }
+
+                       return requestPromiseTs.promise.future();
+               }
+
+               @Override
+               public void onRequestResult(long requestId, byte[] 
serializedValue) {
+                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
+                       if (pending != null && 
pending.promise.trySuccess(serializedValue)) {
+                               long durationMillis = (System.nanoTime() - 
pending.timestamp) / 1_000_000;
+                               stats.reportSuccessfulRequest(durationMillis);
+                       }
+               }
+
+               @Override
+               public void onRequestFailure(long requestId, Throwable cause) {
+                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
+                       if (pending != null && 
pending.promise.tryFailure(cause)) {
+                               stats.reportFailedRequest();
+                       }
+               }
+
+               @Override
+               public void onFailure(Throwable cause) {
+                       if (close(cause)) {
+                               // Remove from established channels, otherwise 
future
+                               // requests will be handled by this failed 
channel.
+                               establishedConnections.remove(serverAddress, 
this);
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       return "EstablishedConnection{" +
+                                       "serverAddress=" + serverAddress +
+                                       ", channel=" + channel +
+                                       ", pendingRequests=" + 
pendingRequests.size() +
+                                       ", requestCount=" + requestCount +
+                                       ", failureCause=" + failureCause +
+                                       '}';
+               }
+
+               /**
+                * Pair of promise and a timestamp.
+                */
+               private class PromiseAndTimestamp {
+
+                       private final Promise<byte[]> promise;
+                       private final long timestamp;
+
+                       public PromiseAndTimestamp(Promise<byte[]> promise, 
long timestamp) {
+                               this.promise = promise;
+                               this.timestamp = timestamp;
+                       }
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
new file mode 100644
index 0000000..36a2b31
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * This handler expects responses from {@link KvStateServerHandler}.
+ *
+ * <p>It deserializes the response and calls the registered callback, which is
+ * responsible for actually handling the result (see {@link 
KvStateClient.EstablishedConnection}).
+ */
+public class KvStateClientHandler extends ChannelInboundHandlerAdapter {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientHandler.class);
+
+       private final KvStateClientHandlerCallback callback;
+
+       /**
+        * Creates a {@link KvStateClientHandler} with the callback.
+        *
+        * @param callback Callback for responses.
+        */
+       public KvStateClientHandler(KvStateClientHandlerCallback callback) {
+               this.callback = callback;
+       }
+
+       @Override
+       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+               try {
+                       ByteBuf buf = (ByteBuf) msg;
+                       MessageType msgType = 
MessageSerializer.deserializeHeader(buf);
+
+                       if (msgType == MessageType.REQUEST_RESULT) {
+                               KvStateRequestResult result = 
MessageSerializer.deserializeKvStateRequestResult(buf);
+                               callback.onRequestResult(result.getRequestId(), 
result.getSerializedResult());
+                       } else if (msgType == MessageType.REQUEST_FAILURE) {
+                               KvStateRequestFailure failure = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+                               
callback.onRequestFailure(failure.getRequestId(), failure.getCause());
+                       } else if (msgType == MessageType.SERVER_FAILURE) {
+                               throw 
MessageSerializer.deserializeServerFailure(buf);
+                       } else {
+                               throw new IllegalStateException("Unexpected 
response type '" + msgType + "'");
+                       }
+               } catch (Throwable t1) {
+                       try {
+                               callback.onFailure(t1);
+                       } catch (Throwable t2) {
+                               LOG.error("Failed to notify callback about 
failure", t2);
+                       }
+               } finally {
+                       ReferenceCountUtil.release(msg);
+               }
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+               try {
+                       callback.onFailure(cause);
+               } catch (Throwable t) {
+                       LOG.error("Failed to notify callback about failure", t);
+               }
+       }
+
+       @Override
+       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
+               // Only the client is expected to close the channel. Otherwise 
it
+               // indicates a failure. Note that this will be invoked in both 
cases
+               // though. If the callback closed the channel, the callback 
must be
+               // ignored.
+               try {
+                       callback.onFailure(new ClosedChannelException());
+               } catch (Throwable t) {
+                       LOG.error("Failed to notify callback about failure", t);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
new file mode 100644
index 0000000..98718fa
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+
+/**
+ * Callback for {@link KvStateClientHandler}.
+ */
+public interface KvStateClientHandlerCallback {
+
+       /**
+        * Called on a successful {@link KvStateRequest}.
+        *
+        * @param requestId       ID of the request
+        * @param serializedValue Serialized value for the request
+        */
+       void onRequestResult(long requestId, byte[] serializedValue);
+
+       /**
+        * Called on a failed {@link KvStateRequest}.
+        *
+        * @param requestId ID of the request
+        * @param cause     Cause of the request failure
+        */
+       void onRequestFailure(long requestId, Throwable cause);
+
+       /**
+        * Called on any failure, which is not related to a specific request.
+        *
+        * <p>This can be for example a caught Exception in the channel pipeline
+        * or an unexpected channel close.
+        *
+        * @param cause Cause of the failure
+        */
+       void onFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
new file mode 100644
index 0000000..635cbae
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.query.KvStateLocation;
+
+import scala.concurrent.Future;
+
+/**
+ * {@link KvStateLocation} lookup service.
+ */
+public interface KvStateLocationLookupService {
+
+       /**
+        * Starts the lookup service.
+        */
+       void start();
+
+       /**
+        * Shuts down the lookup service.
+        */
+       void shutDown();
+
+       /**
+        * Returns a future holding the {@link KvStateLocation} for the given 
job
+        * and KvState registration name.
+        *
+        * @param jobId            JobID the KvState instance belongs to
+        * @param registrationName Name under which the KvState has been 
registered
+        * @return Future holding the {@link KvStateLocation}
+        */
+       Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String 
registrationName);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
new file mode 100644
index 0000000..27257d7
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.Recover;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Client for queryable state.
+ *
+ * <p>You can mark state as queryable via {@link 
StateDescriptor#setQueryable(String)}.
+ * The state instance created from this descriptor will be published for 
queries
+ * when it's created on the TaskManagers and the location will be reported to
+ * the JobManager.
+ *
+ * <p>The client resolves the location of the requested KvState via the
+ * JobManager. Resolved locations are cached. When the server address of the
+ * requested KvState instance is determined, the client sends out a request to
+ * the server.
+ */
+public class QueryableStateClient {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(QueryableStateClient.class);
+
+       /**
+        * {@link KvStateLocation} lookup to resolve the address of KvState 
instances.
+        */
+       private final KvStateLocationLookupService lookupService;
+
+       /**
+        * Network client for queries against {@link KvStateServer} instances.
+        */
+       private final KvStateClient kvStateClient;
+
+       /**
+        * Execution context.
+        */
+       private final ExecutionContext executionContext;
+
+       /**
+        * Cache for {@link KvStateLocation} instances keyed by job and name.
+        */
+       private final ConcurrentMap<Tuple2<JobID, String>, 
Future<KvStateLocation>> lookupCache =
+                       new ConcurrentHashMap<>();
+
+       /** This is != null, if we started the actor system. */
+       private final ActorSystem actorSystem;
+
+       private ExecutionConfig executionConfig;
+
+       /**
+        * Creates a client from the given configuration.
+        *
+        * <p>This will create multiple Thread pools: one for the started actor
+        * system and another for the network client.
+        *
+        * @param config Configuration to use.
+        * @throws Exception Failures are forwarded
+        */
+       public QueryableStateClient(Configuration config) throws Exception {
+               this(config, 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
+                               config, Executors.directExecutor(), 
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+       }
+
+       /**
+        * Creates a client from the given configuration.
+        *
+        * <p>This will create multiple Thread pools: one for the started actor
+        * system and another for the network client.
+        *
+        * @param config Configuration to use.
+        * @param highAvailabilityServices Service factory for high 
availability services
+        * @throws Exception Failures are forwarded
+        *
+        * @deprecated This constructor is deprecated and stays only for 
backwards compatibility. Use the
+        * {@link #QueryableStateClient(Configuration)} instead.
+        */
+       @Deprecated
+       public QueryableStateClient(
+                       Configuration config,
+                       HighAvailabilityServices highAvailabilityServices) 
throws Exception {
+               Preconditions.checkNotNull(config, "Configuration");
+
+               // Create a leader retrieval service
+               LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+
+               // Get the ask timeout
+               String askTimeoutString = 
config.getString(AkkaOptions.ASK_TIMEOUT);
+
+               Duration timeout = FiniteDuration.apply(askTimeoutString);
+               if (!timeout.isFinite()) {
+                       throw new 
IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key()
+                                       + " is not a finite timeout ('" + 
askTimeoutString + "')");
+               }
+
+               FiniteDuration askTimeout = (FiniteDuration) timeout;
+
+               int lookupRetries = 
config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES);
+               int lookupRetryDelayMillis = 
config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY);
+
+               // Retries if no JobManager is around
+               AkkaKvStateLocationLookupService.LookupRetryStrategyFactory 
retryStrategy =
+                               new 
AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory(
+                                               lookupRetries,
+                                               
FiniteDuration.apply(lookupRetryDelayMillis, "ms"));
+
+               // Create the actor system
+               @SuppressWarnings("unchecked")
+               Option<Tuple2<String, Object>> remoting = new Some(new 
Tuple2<>("", 0));
+               this.actorSystem = AkkaUtils.createActorSystem(config, 
remoting);
+
+               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
+                               leaderRetrievalService,
+                               actorSystem,
+                               askTimeout,
+                               retryStrategy);
+
+               int numEventLoopThreads = 
config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS);
+
+               if (numEventLoopThreads == 0) {
+                       numEventLoopThreads = 
Runtime.getRuntime().availableProcessors();
+               }
+
+               // Create the network client
+               KvStateClient networkClient = new KvStateClient(
+                               numEventLoopThreads,
+                               new DisabledKvStateRequestStats());
+
+               this.lookupService = lookupService;
+               this.kvStateClient = networkClient;
+               this.executionContext = actorSystem.dispatcher();
+               this.executionConfig = new ExecutionConfig();
+
+               this.lookupService.start();
+       }
+
+       /** Gets the {@link ExecutionConfig}. */
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
+       /** Sets the {@link ExecutionConfig}. */
+       public void setExecutionConfig(ExecutionConfig config) {
+               this.executionConfig = config;
+       }
+
+       /**
+        * Creates a client.
+        *
+        * @param lookupService    Location lookup service
+        * @param kvStateClient    Network client for queries
+        * @param executionContext Execution context for futures
+        */
+       public QueryableStateClient(
+                       KvStateLocationLookupService lookupService,
+                       KvStateClient kvStateClient,
+                       ExecutionContext executionContext) {
+
+               this.lookupService = Preconditions.checkNotNull(lookupService, 
"KvStateLocationLookupService");
+               this.kvStateClient = Preconditions.checkNotNull(kvStateClient, 
"KvStateClient");
+               this.executionContext = 
Preconditions.checkNotNull(executionContext, "ExecutionContext");
+               this.actorSystem = null;
+
+               this.lookupService.start();
+       }
+
+       /**
+        * Returns the execution context of this client.
+        *
+        * @return The execution context used by the client.
+        */
+       public ExecutionContext getExecutionContext() {
+               return executionContext;
+       }
+
+       /**
+        * Shuts down the client and all components.
+        */
+       public void shutDown() {
+               try {
+                       lookupService.shutDown();
+               } catch (Throwable t) {
+                       LOG.error("Failed to shut down KvStateLookupService", 
t);
+               }
+
+               try {
+                       kvStateClient.shutDown();
+               } catch (Throwable t) {
+                       LOG.error("Failed to shut down KvStateClient", t);
+               }
+
+               if (actorSystem != null) {
+                       try {
+                               actorSystem.shutdown();
+                       } catch (Throwable t) {
+                               LOG.error("Failed to shut down ActorSystem", t);
+                       }
+               }
+       }
+
+       /**
+        * Returns a future holding the serialized request result.
+        *
+        * <p>If the server does not serve a KvState instance with the given ID,
+        * the Future will be failed with a {@link UnknownKvStateID}.
+        *
+        * <p>If the KvState instance does not hold any data for the given key
+        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+        *
+        * <p>All other failures are forwarded to the Future.
+        *
+        * @param jobId                     JobID of the job the queryable state
+        *                                  belongs to
+        * @param queryableStateName        Name under which the state is 
queryable
+        * @param keyHashCode               Integer hash code of the key 
(result of
+        *                                  a call to {@link Object#hashCode()}
+        * @param serializedKeyAndNamespace Serialized key and namespace to 
query
+        *                                  KvState instance with
+        * @return Future holding the serialized result
+        */
+       @SuppressWarnings("unchecked")
+       public Future<byte[]> getKvState(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final int keyHashCode,
+                       final byte[] serializedKeyAndNamespace) {
+
+               return getKvState(jobId, queryableStateName, keyHashCode, 
serializedKeyAndNamespace, false)
+                               .recoverWith(new Recover<Future<byte[]>>() {
+                                       @Override
+                                       public Future<byte[]> recover(Throwable 
failure) throws Throwable {
+                                               if (failure instanceof 
UnknownKvStateID ||
+                                                               failure 
instanceof UnknownKvStateKeyGroupLocation ||
+                                                               failure 
instanceof UnknownKvStateLocation ||
+                                                               failure 
instanceof ConnectException) {
+                                                       // These failures are 
likely to be caused by out-of-sync
+                                                       // KvStateLocation. 
Therefore we retry this query and
+                                                       // force look up the 
location.
+                                                       return getKvState(
+                                                                       jobId,
+                                                                       
queryableStateName,
+                                                                       
keyHashCode,
+                                                                       
serializedKeyAndNamespace,
+                                                                       true);
+                                               } else {
+                                                       return 
Futures.failed(failure);
+                                               }
+                                       }
+                               }, executionContext);
+       }
+
+       /**
+        * Returns a future holding the request result.
+        *
+        * <p>If the server does not serve a KvState instance with the given ID,
+        * the Future will be failed with a {@link UnknownKvStateID}.
+        *
+        * <p>If the KvState instance does not hold any data for the given key
+        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+        *
+        * <p>All other failures are forwarded to the Future.
+        *
+        * @param jobId                     JobID of the job the queryable 
state belongs to.
+        * @param queryableStateName        Name under which the state is 
queryable.
+        * @param key                               The key we are interested 
in.
+        * @param keyTypeHint                           A {@link TypeHint} used 
to extract the type of the key.
+        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
+        * @return Future holding the result.
+        */
+       @PublicEvolving
+       public <K, V> Future<V> getKvState(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final K key,
+                       final TypeHint<K> keyTypeHint,
+                       final StateDescriptor<?, V> stateDescriptor) {
+
+               Preconditions.checkNotNull(keyTypeHint);
+
+               TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
+               return getKvState(jobId, queryableStateName, key, keyTypeInfo, 
stateDescriptor);
+       }
+
+       /**
+        * Returns a future holding the request result.
+        *
+        * <p>If the server does not serve a KvState instance with the given ID,
+        * the Future will be failed with a {@link UnknownKvStateID}.
+        *
+        * <p>If the KvState instance does not hold any data for the given key
+        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+        *
+        * <p>All other failures are forwarded to the Future.
+        *
+        * @param jobId                     JobID of the job the queryable 
state belongs to.
+        * @param queryableStateName        Name under which the state is 
queryable.
+        * @param key                               The key we are interested 
in.
+        * @param keyTypeInfo                           The {@link 
TypeInformation} of the key.
+        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
+        * @return Future holding the result.
+        */
+       @PublicEvolving
+       public <K, V> Future<V> getKvState(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final K key,
+                       final TypeInformation<K> keyTypeInfo,
+                       final StateDescriptor<?, V> stateDescriptor) {
+
+               Preconditions.checkNotNull(keyTypeInfo);
+
+               return getKvState(jobId, queryableStateName, key, 
VoidNamespace.INSTANCE,
+                               keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor);
+       }
+
+       /**
+        * Returns a future holding the request result.
+        *
+        * <p>If the server does not serve a KvState instance with the given ID,
+        * the Future will be failed with a {@link UnknownKvStateID}.
+        *
+        * <p>If the KvState instance does not hold any data for the given key
+        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+        *
+        * <p>All other failures are forwarded to the Future.
+        *
+        * @param jobId                     JobID of the job the queryable 
state belongs to.
+        * @param queryableStateName        Name under which the state is 
queryable.
+        * @param key                               The key that the state we 
request is associated with.
+        * @param namespace                                     The namespace 
of the state.
+        * @param keyTypeInfo                           The {@link 
TypeInformation} of the keys.
+        * @param namespaceTypeInfo                     The {@link 
TypeInformation} of the namespace.
+        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
+        * @return Future holding the result.
+        */
+       @PublicEvolving
+       public <K, V, N> Future<V> getKvState(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final K key,
+                       final N namespace,
+                       final TypeInformation<K> keyTypeInfo,
+                       final TypeInformation<N> namespaceTypeInfo,
+                       final StateDescriptor<?, V> stateDescriptor) {
+
+               Preconditions.checkNotNull(stateDescriptor);
+
+               // initialize the value serializer based on the execution 
config.
+               stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+               TypeSerializer<V> stateSerializer = 
stateDescriptor.getSerializer();
+
+               return getKvState(jobId, queryableStateName, key,
+                               namespace, keyTypeInfo, namespaceTypeInfo, 
stateSerializer);
+       }
+
+       /**
+        * Returns a future holding the request result.
+        *
+        * <p>If the server does not serve a KvState instance with the given ID,
+        * the Future will be failed with a {@link UnknownKvStateID}.
+        *
+        * <p>If the KvState instance does not hold any data for the given key
+        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+        *
+        * <p>All other failures are forwarded to the Future.
+        *
+        * @param jobId                     JobID of the job the queryable 
state belongs to.
+        * @param queryableStateName        Name under which the state is 
queryable.
+        * @param key                               The key that the state we 
request is associated with.
+        * @param namespace                                     The namespace 
of the state.
+        * @param keyTypeInfo                           The {@link 
TypeInformation} of the keys.
+        * @param namespaceTypeInfo                     The {@link 
TypeInformation} of the namespace.
+        * @param stateSerializer                       The {@link 
TypeSerializer} of the state we want to query.
+        * @return Future holding the result.
+        */
+       @PublicEvolving
+       public <K, V, N> Future<V> getKvState(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final K key,
+                       final N namespace,
+                       final TypeInformation<K> keyTypeInfo,
+                       final TypeInformation<N> namespaceTypeInfo,
+                       final TypeSerializer<V> stateSerializer) {
+
+               Preconditions.checkNotNull(queryableStateName);
+
+               Preconditions.checkNotNull(key);
+               Preconditions.checkNotNull(namespace);
+
+               Preconditions.checkNotNull(keyTypeInfo);
+               Preconditions.checkNotNull(namespaceTypeInfo);
+               Preconditions.checkNotNull(stateSerializer);
+
+               if (stateSerializer instanceof ListSerializer) {
+                       throw new IllegalArgumentException("ListState is not 
supported out-of-the-box yet.");
+               }
+
+               TypeSerializer<K> keySerializer = 
keyTypeInfo.createSerializer(executionConfig);
+               TypeSerializer<N> namespaceSerializer = 
namespaceTypeInfo.createSerializer(executionConfig);
+
+               final byte[] serializedKeyAndNamespace;
+               try {
+                       serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
+                                       key,
+                                       keySerializer,
+                                       namespace,
+                                       namespaceSerializer);
+               } catch (IOException e) {
+                       return Futures.failed(e);
+               }
+
+               return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace)
+                               .flatMap(new Mapper<byte[], Future<V>>() {
+                                       @Override
+                                       public Future<V> apply(byte[] 
parameter) {
+                                               try {
+                                                       return 
Futures.successful(
+                                                                       
KvStateSerializer.deserializeValue(parameter, stateSerializer));
+                                               } catch (IOException e) {
+                                                       return 
Futures.failed(e);
+                                               }
+                                       }
+                               }, executionContext);
+       }
+
+       /**
+        * Returns a future holding the serialized request result.
+        *
+        * @param jobId                     JobID of the job the queryable state
+        *                                  belongs to
+        * @param queryableStateName        Name under which the state is 
queryable
+        * @param keyHashCode               Integer hash code of the key 
(result of
+        *                                  a call to {@link Object#hashCode()}
+        * @param serializedKeyAndNamespace Serialized key and namespace to 
query
+        *                                  KvState instance with
+        * @param forceLookup               Flag to force lookup of the {@link 
KvStateLocation}
+        * @return Future holding the serialized result
+        */
+       private Future<byte[]> getKvState(
+                       final JobID jobId,
+                       final String queryableStateName,
+                       final int keyHashCode,
+                       final byte[] serializedKeyAndNamespace,
+                       boolean forceLookup) {
+
+               return getKvStateLookupInfo(jobId, queryableStateName, 
forceLookup)
+                               .flatMap(new Mapper<KvStateLocation, 
Future<byte[]>>() {
+                                       @Override
+                                       public Future<byte[]> 
apply(KvStateLocation lookup) {
+                                               int keyGroupIndex = 
KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, 
lookup.getNumKeyGroups());
+
+                                               KvStateServerAddress 
serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
+                                               if (serverAddress == null) {
+                                                       return 
Futures.failed(new UnknownKvStateKeyGroupLocation());
+                                               } else {
+                                                       // Query server
+                                                       KvStateID kvStateId = 
lookup.getKvStateID(keyGroupIndex);
+                                                       return 
kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace);
+                                               }
+                                       }
+                               }, executionContext);
+       }
+
+       /**
+        * Lookup the {@link KvStateLocation} for the given job and queryable 
state
+        * name.
+        *
+        * <p>The job manager will be queried for the location only if forced 
or no
+        * cached location can be found. There are no guarantees about
+        *
+        * @param jobId              JobID the state instance belongs to.
+        * @param queryableStateName Name under which the state instance has 
been published.
+        * @param forceUpdate        Flag to indicate whether to force a update 
via the lookup service.
+        * @return Future holding the KvStateLocation
+        */
+       private Future<KvStateLocation> getKvStateLookupInfo(
+                       JobID jobId,
+                       final String queryableStateName,
+                       boolean forceUpdate) {
+
+               if (forceUpdate) {
+                       Future<KvStateLocation> lookupFuture = lookupService
+                                       .getKvStateLookupInfo(jobId, 
queryableStateName);
+                       lookupCache.put(new Tuple2<>(jobId, 
queryableStateName), lookupFuture);
+                       return lookupFuture;
+               } else {
+                       Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, 
queryableStateName);
+                       final Future<KvStateLocation> cachedFuture = 
lookupCache.get(cacheKey);
+
+                       if (cachedFuture == null) {
+                               Future<KvStateLocation> lookupFuture = 
lookupService
+                                               .getKvStateLookupInfo(jobId, 
queryableStateName);
+
+                               Future<KvStateLocation> previous = 
lookupCache.putIfAbsent(cacheKey, lookupFuture);
+                               if (previous == null) {
+                                       return lookupFuture;
+                               } else {
+                                       return previous;
+                               }
+                       } else {
+                               // do not retain futures which failed as they 
will remain in
+                               // the cache even if the error cause is not 
present any more
+                               // and a new lookup may succeed
+                               if (cachedFuture.isCompleted() &&
+                                               
cachedFuture.value().get().isFailure()) {
+                                       // issue a new lookup
+                                       Future<KvStateLocation> lookupFuture = 
lookupService
+                                                       
.getKvStateLookupInfo(jobId, queryableStateName);
+
+                                       // replace the existing one if it has 
not been replaced yet
+                                       // otherwise return the one in the cache
+                                       if (lookupCache.replace(cacheKey, 
cachedFuture, lookupFuture)) {
+                                               return lookupFuture;
+                                       } else {
+                                               return 
lookupCache.get(cacheKey);
+                                       }
+                               } else {
+                                       return cachedFuture;
+                               }
+                       }
+               }
+       }
+
+}

Reply via email to