http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
deleted file mode 100644
index 4b73fbb..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A {@link InternalKvState} instance request for a specific key and namespace.
- */
-public final class KvStateRequest {
-
-       /** ID for this request. */
-       private final long requestId;
-
-       /** ID of the requested KvState instance. */
-       private final KvStateID kvStateId;
-
-       /** Serialized key and namespace to request from the KvState instance. 
*/
-       private final byte[] serializedKeyAndNamespace;
-
-       /**
-        * Creates a KvState instance request.
-        *
-        * @param requestId                 ID for this request
-        * @param kvStateId                 ID of the requested KvState instance
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
request from the KvState
-        *                                  instance
-        */
-       KvStateRequest(long requestId, KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-               this.requestId = requestId;
-               this.kvStateId = Preconditions.checkNotNull(kvStateId, 
"KvStateID");
-               this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and 
namespace");
-       }
-
-       /**
-        * Returns the request ID.
-        *
-        * @return Request ID
-        */
-       public long getRequestId() {
-               return requestId;
-       }
-
-       /**
-        * Returns the ID of the requested KvState instance.
-        *
-        * @return ID of the requested KvState instance
-        */
-       public KvStateID getKvStateId() {
-               return kvStateId;
-       }
-
-       /**
-        * Returns the serialized key and namespace to request from the KvState
-        * instance.
-        *
-        * @return Serialized key and namespace to request from the KvState 
instance
-        */
-       public byte[] getSerializedKeyAndNamespace() {
-               return serializedKeyAndNamespace;
-       }
-
-       @Override
-       public String toString() {
-               return "KvStateRequest{" +
-                               "requestId=" + requestId +
-                               ", kvStateId=" + kvStateId +
-                               ", serializedKeyAndNamespace.length=" + 
serializedKeyAndNamespace.length +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
deleted file mode 100644
index 06a3ce8..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-/**
- * A failure response to a {@link KvStateRequest}.
- */
-public final class KvStateRequestFailure {
-
-       /** ID of the request responding to. */
-       private final long requestId;
-
-       /** Failure cause. Not allowed to be a user type. */
-       private final Throwable cause;
-
-       /**
-        * Creates a failure response to a {@link KvStateRequest}.
-        *
-        * @param requestId ID for the request responding to
-        * @param cause     Failure cause (not allowed to be a user type)
-        */
-       KvStateRequestFailure(long requestId, Throwable cause) {
-               this.requestId = requestId;
-               this.cause = cause;
-       }
-
-       /**
-        * Returns the request ID responding to.
-        *
-        * @return Request ID responding to
-        */
-       public long getRequestId() {
-               return requestId;
-       }
-
-       /**
-        * Returns the failure cause.
-        *
-        * @return Failure cause
-        */
-       public Throwable getCause() {
-               return cause;
-       }
-
-       @Override
-       public String toString() {
-               return "KvStateRequestFailure{" +
-                               "requestId=" + requestId +
-                               ", cause=" + cause +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
deleted file mode 100644
index 2bd8a36..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.util.Preconditions;
-
-/**
- * A successful response to a {@link KvStateRequest} containing the serialized
- * result for the requested key and namespace.
- */
-public final class KvStateRequestResult {
-
-       /** ID of the request responding to. */
-       private final long requestId;
-
-       /**
-        * Serialized result for the requested key and namespace. If no result 
was
-        * available for the specified key and namespace, this is 
<code>null</code>.
-        */
-       private final byte[] serializedResult;
-
-       /**
-        * Creates a successful {@link KvStateRequestResult} response.
-        *
-        * @param requestId        ID of the request responding to
-        * @param serializedResult Serialized result or <code>null</code> if 
none
-        */
-       KvStateRequestResult(long requestId, byte[] serializedResult) {
-               this.requestId = requestId;
-               this.serializedResult = 
Preconditions.checkNotNull(serializedResult, "Serialization result");
-       }
-
-       /**
-        * Returns the request ID responding to.
-        *
-        * @return Request ID responding to
-        */
-       public long getRequestId() {
-               return requestId;
-       }
-
-       /**
-        * Returns the serialized result or <code>null</code> if none available.
-        *
-        * @return Serialized result or <code>null</code> if none available.
-        */
-       public byte[] getSerializedResult() {
-               return serializedResult;
-       }
-
-       @Override
-       public String toString() {
-               return "KvStateRequestResult{" +
-                               "requestId=" + requestId +
-                               ", serializedResult.length=" + 
serializedResult.length +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
deleted file mode 100644
index 68f06e3..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.netty.KvStateClient;
-import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Serialization and deserialization of messages exchanged between
- * {@link KvStateClient} and {@link KvStateServer}.
- *
- * <p>The binary messages have the following format:
- *
- * <pre>
- *                     <------ Frame ------------------------->
- *                    +----------------------------------------+
- *                    |        HEADER (8)      | PAYLOAD (VAR) |
- * +------------------+----------------------------------------+
- * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
- * +------------------+----------------------------------------+
- * </pre>
- *
- * <p>The concrete content of a message depends on the {@link 
KvStateRequestType}.
- */
-public final class KvStateRequestSerializer {
-
-       /** The serialization version ID. */
-       private static final int VERSION = 0x79a1b710;
-
-       /** Byte length of the header. */
-       private static final int HEADER_LENGTH = 8;
-
-       // 
------------------------------------------------------------------------
-       // Serialization
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Allocates a buffer and serializes the KvState request into it.
-        *
-        * @param alloc                     ByteBuf allocator for the buffer to
-        *                                  serialize message into
-        * @param requestId                 ID for this request
-        * @param kvStateId                 ID of the requested KvState instance
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
request
-        *                                  from the KvState instance.
-        * @return Serialized KvState request message
-        */
-       public static ByteBuf serializeKvStateRequest(
-                       ByteBufAllocator alloc,
-                       long requestId,
-                       KvStateID kvStateId,
-                       byte[] serializedKeyAndNamespace) {
-
-               // Header + request ID + KvState ID + Serialized namespace
-               int frameLength = HEADER_LENGTH + 8 + (8 + 8) + (4 + 
serializedKeyAndNamespace.length);
-               ByteBuf buf = alloc.ioBuffer(frameLength + 4); // +4 for frame 
length
-
-               buf.writeInt(frameLength);
-
-               writeHeader(buf, KvStateRequestType.REQUEST);
-
-               buf.writeLong(requestId);
-               buf.writeLong(kvStateId.getLowerPart());
-               buf.writeLong(kvStateId.getUpperPart());
-               buf.writeInt(serializedKeyAndNamespace.length);
-               buf.writeBytes(serializedKeyAndNamespace);
-
-               return buf;
-       }
-
-       /**
-        * Allocates a buffer and serializes the KvState request result into it.
-        *
-        * @param alloc             ByteBuf allocator for the buffer to 
serialize message into
-        * @param requestId         ID for this request
-        * @param serializedResult  Serialized Result
-        * @return Serialized KvState request result message
-        */
-       public static ByteBuf serializeKvStateRequestResult(
-                       ByteBufAllocator alloc,
-                       long requestId,
-                       byte[] serializedResult) {
-
-               Preconditions.checkNotNull(serializedResult, "Serialized 
result");
-
-               // Header + request ID + serialized result
-               int frameLength = HEADER_LENGTH + 8 + 4 + 
serializedResult.length;
-
-               ByteBuf buf = alloc.ioBuffer(frameLength);
-
-               buf.writeInt(frameLength);
-               writeHeader(buf, KvStateRequestType.REQUEST_RESULT);
-               buf.writeLong(requestId);
-
-               buf.writeInt(serializedResult.length);
-               buf.writeBytes(serializedResult);
-
-               return buf;
-       }
-
-       /**
-        * Allocates a buffer and serializes the KvState request failure into 
it.
-        *
-        * @param alloc ByteBuf allocator for the buffer to serialize message 
into
-        * @param requestId ID of the request responding to
-        * @param cause Failure cause
-        * @return Serialized KvState request failure message
-        * @throws IOException Serialization failures are forwarded
-        */
-       public static ByteBuf serializeKvStateRequestFailure(
-                       ByteBufAllocator alloc,
-                       long requestId,
-                       Throwable cause) throws IOException {
-
-               ByteBuf buf = alloc.ioBuffer();
-
-               // Frame length is set at the end
-               buf.writeInt(0);
-
-               writeHeader(buf, KvStateRequestType.REQUEST_FAILURE);
-
-               // Message
-               buf.writeLong(requestId);
-
-               try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
-                               ObjectOutputStream out = new 
ObjectOutputStream(bbos)) {
-
-                       out.writeObject(cause);
-               }
-
-               // Set frame length
-               int frameLength = buf.readableBytes() - 4;
-               buf.setInt(0, frameLength);
-
-               return buf;
-       }
-
-       /**
-        * Allocates a buffer and serializes the server failure into it.
-        *
-        * <p>The cause must not be or contain any user types as causes.
-        *
-        * @param alloc ByteBuf allocator for the buffer to serialize message 
into
-        * @param cause Failure cause
-        * @return Serialized server failure message
-        * @throws IOException Serialization failures are forwarded
-        */
-       public static ByteBuf serializeServerFailure(ByteBufAllocator alloc, 
Throwable cause) throws IOException {
-               ByteBuf buf = alloc.ioBuffer();
-
-               // Frame length is set at end
-               buf.writeInt(0);
-
-               writeHeader(buf, KvStateRequestType.SERVER_FAILURE);
-
-               try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
-                               ObjectOutputStream out = new 
ObjectOutputStream(bbos)) {
-
-                       out.writeObject(cause);
-               }
-
-               // Set frame length
-               int frameLength = buf.readableBytes() - 4;
-               buf.setInt(0, frameLength);
-
-               return buf;
-       }
-
-       // 
------------------------------------------------------------------------
-       // Deserialization
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Deserializes the header and returns the request type.
-        *
-        * @param buf Buffer to deserialize (expected to be at header position)
-        * @return Deserialzied request type
-        * @throws IllegalArgumentException If unexpected message version or 
message type
-        */
-       public static KvStateRequestType deserializeHeader(ByteBuf buf) {
-               // Check the version
-               int version = buf.readInt();
-               if (version != VERSION) {
-                       throw new IllegalArgumentException("Illegal message 
version " + version +
-                                       ". Expected: " + VERSION + ".");
-               }
-
-               // Get the message type
-               int msgType = buf.readInt();
-               KvStateRequestType[] values = KvStateRequestType.values();
-               if (msgType >= 0 && msgType < values.length) {
-                       return values[msgType];
-               } else {
-                       throw new IllegalArgumentException("Illegal message 
type with index " + msgType);
-               }
-       }
-
-       /**
-        * Deserializes the KvState request message.
-        *
-        * <p><strong>Important</strong>: the returned buffer is sliced from the
-        * incoming ByteBuf stream and retained. Therefore, it needs to be 
recycled
-        * by the consumer.
-        *
-        * @param buf Buffer to deserialize (expected to be positioned after 
header)
-        * @return Deserialized KvStateRequest
-        */
-       public static KvStateRequest deserializeKvStateRequest(ByteBuf buf) {
-               long requestId = buf.readLong();
-               KvStateID kvStateId = new KvStateID(buf.readLong(), 
buf.readLong());
-
-               // Serialized key and namespace
-               int length = buf.readInt();
-
-               if (length < 0) {
-                       throw new IllegalArgumentException("Negative length for 
serialized key and namespace. " +
-                                       "This indicates a serialization 
error.");
-               }
-
-               // Copy the buffer in order to be able to safely recycle the 
ByteBuf
-               byte[] serializedKeyAndNamespace = new byte[length];
-               if (length > 0) {
-                       buf.readBytes(serializedKeyAndNamespace);
-               }
-
-               return new KvStateRequest(requestId, kvStateId, 
serializedKeyAndNamespace);
-       }
-
-       /**
-        * Deserializes the KvState request result.
-        *
-        * @param buf Buffer to deserialize (expected to be positioned after 
header)
-        * @return Deserialized KvStateRequestResult
-        */
-       public static KvStateRequestResult 
deserializeKvStateRequestResult(ByteBuf buf) {
-               long requestId = buf.readLong();
-
-               // Serialized KvState
-               int length = buf.readInt();
-
-               if (length < 0) {
-                       throw new IllegalArgumentException("Negative length for 
serialized result. " +
-                                       "This indicates a serialization 
error.");
-               }
-
-               byte[] serializedValue = new byte[length];
-
-               if (length > 0) {
-                       buf.readBytes(serializedValue);
-               }
-
-               return new KvStateRequestResult(requestId, serializedValue);
-       }
-
-       /**
-        * Deserializes the KvState request failure.
-        *
-        * @param buf Buffer to deserialize (expected to be positioned after 
header)
-        * @return Deserialized KvStateRequestFailure
-        */
-       public static KvStateRequestFailure 
deserializeKvStateRequestFailure(ByteBuf buf) throws IOException, 
ClassNotFoundException {
-               long requestId = buf.readLong();
-
-               Throwable cause;
-               try (ByteBufInputStream bbis = new ByteBufInputStream(buf);
-                               ObjectInputStream in = new 
ObjectInputStream(bbis)) {
-
-                       cause = (Throwable) in.readObject();
-               }
-
-               return new KvStateRequestFailure(requestId, cause);
-       }
-
-       /**
-        * Deserializes the KvState request failure.
-        *
-        * @param buf Buffer to deserialize (expected to be positioned after 
header)
-        * @return Deserialized KvStateRequestFailure
-        * @throws IOException            Serialization failure are forwarded
-        * @throws ClassNotFoundException If Exception type can not be loaded
-        */
-       public static Throwable deserializeServerFailure(ByteBuf buf) throws 
IOException, ClassNotFoundException {
-               try (ByteBufInputStream bbis = new ByteBufInputStream(buf);
-                               ObjectInputStream in = new 
ObjectInputStream(bbis)) {
-
-                       return (Throwable) in.readObject();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Generic serialization utils
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Serializes the key and namespace into a {@link ByteBuffer}.
-        *
-        * <p>The serialized format matches the RocksDB state backend key 
format, i.e.
-        * the key and namespace don't have to be deserialized for RocksDB 
lookups.
-        *
-        * @param key                 Key to serialize
-        * @param keySerializer       Serializer for the key
-        * @param namespace           Namespace to serialize
-        * @param namespaceSerializer Serializer for the namespace
-        * @param <K>                 Key type
-        * @param <N>                 Namespace type
-        * @return Buffer holding the serialized key and namespace
-        * @throws IOException Serialization errors are forwarded
-        */
-       public static <K, N> byte[] serializeKeyAndNamespace(
-                       K key,
-                       TypeSerializer<K> keySerializer,
-                       N namespace,
-                       TypeSerializer<N> namespaceSerializer) throws 
IOException {
-
-               DataOutputSerializer dos = new DataOutputSerializer(32);
-
-               keySerializer.serialize(key, dos);
-               dos.writeByte(42);
-               namespaceSerializer.serialize(namespace, dos);
-
-               return dos.getCopyOfBuffer();
-       }
-
-       /**
-        * Deserializes the key and namespace into a {@link Tuple2}.
-        *
-        * @param serializedKeyAndNamespace Serialized key and namespace
-        * @param keySerializer             Serializer for the key
-        * @param namespaceSerializer       Serializer for the namespace
-        * @param <K>                       Key type
-        * @param <N>                       Namespace
-        * @return Tuple2 holding deserialized key and namespace
-        * @throws IOException              if the deserialization fails for 
any reason
-        */
-       public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
-                       byte[] serializedKeyAndNamespace,
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer) throws 
IOException {
-
-               DataInputDeserializer dis = new DataInputDeserializer(
-                               serializedKeyAndNamespace,
-                               0,
-                               serializedKeyAndNamespace.length);
-
-               try {
-                       K key = keySerializer.deserialize(dis);
-                       byte magicNumber = dis.readByte();
-                       if (magicNumber != 42) {
-                               throw new IOException("Unexpected magic number 
" + magicNumber + ".");
-                       }
-                       N namespace = namespaceSerializer.deserialize(dis);
-
-                       if (dis.available() > 0) {
-                               throw new IOException("Unconsumed bytes in the 
serialized key and namespace.");
-                       }
-
-                       return new Tuple2<>(key, namespace);
-               } catch (IOException e) {
-                       throw new IOException("Unable to deserialize key " +
-                               "and namespace. This indicates a mismatch in 
the key/namespace " +
-                               "serializers used by the KvState instance and 
this access.", e);
-               }
-       }
-
-       /**
-        * Serializes the value with the given serializer.
-        *
-        * @param value      Value of type T to serialize
-        * @param serializer Serializer for T
-        * @param <T>        Type of the value
-        * @return Serialized value or <code>null</code> if value 
<code>null</code>
-        * @throws IOException On failure during serialization
-        */
-       public static <T> byte[] serializeValue(T value, TypeSerializer<T> 
serializer) throws IOException {
-               if (value != null) {
-                       // Serialize
-                       DataOutputSerializer dos = new DataOutputSerializer(32);
-                       serializer.serialize(value, dos);
-                       return dos.getCopyOfBuffer();
-               } else {
-                       return null;
-               }
-       }
-
-       /**
-        * Deserializes the value with the given serializer.
-        *
-        * @param serializedValue Serialized value of type T
-        * @param serializer      Serializer for T
-        * @param <T>             Type of the value
-        * @return Deserialized value or <code>null</code> if the serialized 
value
-        * is <code>null</code>
-        * @throws IOException On failure during deserialization
-        */
-       public static <T> T deserializeValue(byte[] serializedValue, 
TypeSerializer<T> serializer) throws IOException {
-               if (serializedValue == null) {
-                       return null;
-               } else {
-                       final DataInputDeserializer deser = new 
DataInputDeserializer(
-                               serializedValue, 0, serializedValue.length);
-                       final T value = serializer.deserialize(deser);
-                       if (deser.available() > 0) {
-                               throw new IOException(
-                                       "Unconsumed bytes in the deserialized 
value. " +
-                                               "This indicates a mismatch in 
the value serializers " +
-                                               "used by the KvState instance 
and this access.");
-                       }
-                       return value;
-               }
-       }
-
-       /**
-        * Deserializes all values with the given serializer.
-        *
-        * @param serializedValue Serialized value of type List&lt;T&gt;
-        * @param serializer      Serializer for T
-        * @param <T>             Type of the value
-        * @return Deserialized list or <code>null</code> if the serialized 
value
-        * is <code>null</code>
-        * @throws IOException On failure during deserialization
-        */
-       public static <T> List<T> deserializeList(byte[] serializedValue, 
TypeSerializer<T> serializer) throws IOException {
-               if (serializedValue != null) {
-                       final DataInputDeserializer in = new 
DataInputDeserializer(
-                               serializedValue, 0, serializedValue.length);
-
-                       try {
-                               final List<T> result = new ArrayList<>();
-                               while (in.available() > 0) {
-                                       result.add(serializer.deserialize(in));
-
-                                       // The expected binary format has a 
single byte separator. We
-                                       // want a consistent binary format in 
order to not need any
-                                       // special casing during 
deserialization. A "cleaner" format
-                                       // would skip this extra byte, but 
would require a memory copy
-                                       // for RocksDB, which stores the data 
serialized in this way
-                                       // for lists.
-                                       if (in.available() > 0) {
-                                               in.readByte();
-                                       }
-                               }
-
-                               return result;
-                       } catch (IOException e) {
-                               throw new IOException(
-                                               "Unable to deserialize value. " 
+
-                                                       "This indicates a 
mismatch in the value serializers " +
-                                                       "used by the KvState 
instance and this access.", e);
-                       }
-               } else {
-                       return null;
-               }
-       }
-
-       /**
-        * Serializes all values of the Iterable with the given serializer.
-        *
-        * @param entries         Key-value pairs to serialize
-        * @param keySerializer   Serializer for UK
-        * @param valueSerializer Serializer for UV
-        * @param <UK>            Type of the keys
-        * @param <UV>            Type of the values
-        * @return Serialized values or <code>null</code> if values 
<code>null</code> or empty
-        * @throws IOException On failure during serialization
-        */
-       public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> 
entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) 
throws IOException {
-               if (entries != null) {
-                       // Serialize
-                       DataOutputSerializer dos = new DataOutputSerializer(32);
-
-                       for (Map.Entry<UK, UV> entry : entries) {
-                               keySerializer.serialize(entry.getKey(), dos);
-
-                               if (entry.getValue() == null) {
-                                       dos.writeBoolean(true);
-                               } else {
-                                       dos.writeBoolean(false);
-                                       
valueSerializer.serialize(entry.getValue(), dos);
-                               }
-                       }
-
-                       return dos.getCopyOfBuffer();
-               } else {
-                       return null;
-               }
-       }
-
-       /**
-        * Deserializes all kv pairs with the given serializer.
-        *
-        * @param serializedValue Serialized value of type Map&lt;UK, UV&gt;
-        * @param keySerializer   Serializer for UK
-        * @param valueSerializer Serializer for UV
-        * @param <UK>            Type of the key
-        * @param <UV>            Type of the value.
-        * @return Deserialized map or <code>null</code> if the serialized value
-        * is <code>null</code>
-        * @throws IOException On failure during deserialization
-        */
-       public static <UK, UV> Map<UK, UV> deserializeMap(byte[] 
serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> 
valueSerializer) throws IOException {
-               if (serializedValue != null) {
-                       DataInputDeserializer in = new 
DataInputDeserializer(serializedValue, 0, serializedValue.length);
-
-                       Map<UK, UV> result = new HashMap<>();
-                       while (in.available() > 0) {
-                               UK key = keySerializer.deserialize(in);
-
-                               boolean isNull = in.readBoolean();
-                               UV value = isNull ? null : 
valueSerializer.deserialize(in);
-
-                               result.put(key, value);
-                       }
-
-                       return result;
-               } else {
-                       return null;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Helper for writing the header.
-        *
-        * @param buf         Buffer to serialize header into
-        * @param requestType Result type to serialize
-        */
-       private static void writeHeader(ByteBuf buf, KvStateRequestType 
requestType) {
-               buf.writeInt(VERSION);
-               buf.writeInt(requestType.ordinal());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
deleted file mode 100644
index de7270a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.runtime.query.netty.KvStateServer;
-
-/**
- * Expected message types when communicating with the {@link KvStateServer}.
- */
-public enum KvStateRequestType {
-
-       /** Request a KvState instance. */
-       REQUEST,
-
-       /** Successful response to a KvStateRequest. */
-       REQUEST_RESULT,
-
-       /** Failure response to a KvStateRequest. */
-       REQUEST_FAILURE,
-
-       /** Generic server failure. */
-       SERVER_FAILURE
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
new file mode 100644
index 0000000..44ee571
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query.netty.message;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serialization and deserialization the different state types and namespaces.
+ */
+public final class KvStateSerializer {
+
+       // 
------------------------------------------------------------------------
+       // Generic serialization utils
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Serializes the key and namespace into a {@link ByteBuffer}.
+        *
+        * <p>The serialized format matches the RocksDB state backend key 
format, i.e.
+        * the key and namespace don't have to be deserialized for RocksDB 
lookups.
+        *
+        * @param key                 Key to serialize
+        * @param keySerializer       Serializer for the key
+        * @param namespace           Namespace to serialize
+        * @param namespaceSerializer Serializer for the namespace
+        * @param <K>                 Key type
+        * @param <N>                 Namespace type
+        * @return Buffer holding the serialized key and namespace
+        * @throws IOException Serialization errors are forwarded
+        */
+       public static <K, N> byte[] serializeKeyAndNamespace(
+                       K key,
+                       TypeSerializer<K> keySerializer,
+                       N namespace,
+                       TypeSerializer<N> namespaceSerializer) throws 
IOException {
+
+               DataOutputSerializer dos = new DataOutputSerializer(32);
+
+               keySerializer.serialize(key, dos);
+               dos.writeByte(42);
+               namespaceSerializer.serialize(namespace, dos);
+
+               return dos.getCopyOfBuffer();
+       }
+
+       /**
+        * Deserializes the key and namespace into a {@link Tuple2}.
+        *
+        * @param serializedKeyAndNamespace Serialized key and namespace
+        * @param keySerializer             Serializer for the key
+        * @param namespaceSerializer       Serializer for the namespace
+        * @param <K>                       Key type
+        * @param <N>                       Namespace
+        * @return Tuple2 holding deserialized key and namespace
+        * @throws IOException              if the deserialization fails for 
any reason
+        */
+       public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
+                       byte[] serializedKeyAndNamespace,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer) throws 
IOException {
+
+               DataInputDeserializer dis = new DataInputDeserializer(
+                               serializedKeyAndNamespace,
+                               0,
+                               serializedKeyAndNamespace.length);
+
+               try {
+                       K key = keySerializer.deserialize(dis);
+                       byte magicNumber = dis.readByte();
+                       if (magicNumber != 42) {
+                               throw new IOException("Unexpected magic number 
" + magicNumber + ".");
+                       }
+                       N namespace = namespaceSerializer.deserialize(dis);
+
+                       if (dis.available() > 0) {
+                               throw new IOException("Unconsumed bytes in the 
serialized key and namespace.");
+                       }
+
+                       return new Tuple2<>(key, namespace);
+               } catch (IOException e) {
+                       throw new IOException("Unable to deserialize key " +
+                               "and namespace. This indicates a mismatch in 
the key/namespace " +
+                               "serializers used by the KvState instance and 
this access.", e);
+               }
+       }
+
+       /**
+        * Serializes the value with the given serializer.
+        *
+        * @param value      Value of type T to serialize
+        * @param serializer Serializer for T
+        * @param <T>        Type of the value
+        * @return Serialized value or <code>null</code> if value 
<code>null</code>
+        * @throws IOException On failure during serialization
+        */
+       public static <T> byte[] serializeValue(T value, TypeSerializer<T> 
serializer) throws IOException {
+               if (value != null) {
+                       // Serialize
+                       DataOutputSerializer dos = new DataOutputSerializer(32);
+                       serializer.serialize(value, dos);
+                       return dos.getCopyOfBuffer();
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Deserializes the value with the given serializer.
+        *
+        * @param serializedValue Serialized value of type T
+        * @param serializer      Serializer for T
+        * @param <T>             Type of the value
+        * @return Deserialized value or <code>null</code> if the serialized 
value
+        * is <code>null</code>
+        * @throws IOException On failure during deserialization
+        */
+       public static <T> T deserializeValue(byte[] serializedValue, 
TypeSerializer<T> serializer) throws IOException {
+               if (serializedValue == null) {
+                       return null;
+               } else {
+                       final DataInputDeserializer deser = new 
DataInputDeserializer(
+                               serializedValue, 0, serializedValue.length);
+                       final T value = serializer.deserialize(deser);
+                       if (deser.available() > 0) {
+                               throw new IOException(
+                                       "Unconsumed bytes in the deserialized 
value. " +
+                                               "This indicates a mismatch in 
the value serializers " +
+                                               "used by the KvState instance 
and this access.");
+                       }
+                       return value;
+               }
+       }
+
+       /**
+        * Deserializes all values with the given serializer.
+        *
+        * @param serializedValue Serialized value of type List&lt;T&gt;
+        * @param serializer      Serializer for T
+        * @param <T>             Type of the value
+        * @return Deserialized list or <code>null</code> if the serialized 
value
+        * is <code>null</code>
+        * @throws IOException On failure during deserialization
+        */
+       public static <T> List<T> deserializeList(byte[] serializedValue, 
TypeSerializer<T> serializer) throws IOException {
+               if (serializedValue != null) {
+                       final DataInputDeserializer in = new 
DataInputDeserializer(
+                               serializedValue, 0, serializedValue.length);
+
+                       try {
+                               final List<T> result = new ArrayList<>();
+                               while (in.available() > 0) {
+                                       result.add(serializer.deserialize(in));
+
+                                       // The expected binary format has a 
single byte separator. We
+                                       // want a consistent binary format in 
order to not need any
+                                       // special casing during 
deserialization. A "cleaner" format
+                                       // would skip this extra byte, but 
would require a memory copy
+                                       // for RocksDB, which stores the data 
serialized in this way
+                                       // for lists.
+                                       if (in.available() > 0) {
+                                               in.readByte();
+                                       }
+                               }
+
+                               return result;
+                       } catch (IOException e) {
+                               throw new IOException(
+                                               "Unable to deserialize value. " 
+
+                                                       "This indicates a 
mismatch in the value serializers " +
+                                                       "used by the KvState 
instance and this access.", e);
+                       }
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Serializes all values of the Iterable with the given serializer.
+        *
+        * @param entries         Key-value pairs to serialize
+        * @param keySerializer   Serializer for UK
+        * @param valueSerializer Serializer for UV
+        * @param <UK>            Type of the keys
+        * @param <UV>            Type of the values
+        * @return Serialized values or <code>null</code> if values 
<code>null</code> or empty
+        * @throws IOException On failure during serialization
+        */
+       public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> 
entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) 
throws IOException {
+               if (entries != null) {
+                       // Serialize
+                       DataOutputSerializer dos = new DataOutputSerializer(32);
+
+                       for (Map.Entry<UK, UV> entry : entries) {
+                               keySerializer.serialize(entry.getKey(), dos);
+
+                               if (entry.getValue() == null) {
+                                       dos.writeBoolean(true);
+                               } else {
+                                       dos.writeBoolean(false);
+                                       
valueSerializer.serialize(entry.getValue(), dos);
+                               }
+                       }
+
+                       return dos.getCopyOfBuffer();
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Deserializes all kv pairs with the given serializer.
+        *
+        * @param serializedValue Serialized value of type Map&lt;UK, UV&gt;
+        * @param keySerializer   Serializer for UK
+        * @param valueSerializer Serializer for UV
+        * @param <UK>            Type of the key
+        * @param <UV>            Type of the value.
+        * @return Deserialized map or <code>null</code> if the serialized value
+        * is <code>null</code>
+        * @throws IOException On failure during deserialization
+        */
+       public static <UK, UV> Map<UK, UV> deserializeMap(byte[] 
serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> 
valueSerializer) throws IOException {
+               if (serializedValue != null) {
+                       DataInputDeserializer in = new 
DataInputDeserializer(serializedValue, 0, serializedValue.length);
+
+                       Map<UK, UV> result = new HashMap<>();
+                       while (in.available() > 0) {
+                               UK key = keySerializer.deserialize(in);
+
+                               boolean isNull = in.readBoolean();
+                               UV value = isNull ? null : 
valueSerializer.deserialize(in);
+
+                               result.put(key, value);
+                       }
+
+                       return result;
+               } else {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java
deleted file mode 100644
index 7e8de40..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains all Netty-based client/server classes used to query
- * KvState instances.
- *
- * <h2>Server and Client</h2>
- *
- * <p>Both server and client expect received binary messages to contain a frame
- * length field. Netty's {@link 
io.netty.handler.codec.LengthFieldBasedFrameDecoder}
- * is used to fully receive the frame before giving it to the respective client
- * or server handler.
- *
- * <p>Connection establishment and release happens by the client. The server
- * only closes a connection if a fatal failure happens that cannot be resolved
- * otherwise.
- *
- * <p>The is a single server per task manager and a single client can be shared
- * by multiple Threads.
- *
- * <p>See also:
- * <ul>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateServer}</li>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateServerHandler}</li>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateClient}</li>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateClientHandler}</li>
- * </ul>
- *
- * <h2>Serialization</h2>
- *
- * <p>The exchanged binary messages have the following format:
- *
- * <pre>
- *                     <------ Frame ------------------------->
- *                    +----------------------------------------+
- *                    |        HEADER (8)      | PAYLOAD (VAR) |
- * +------------------+----------------------------------------+
- * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) |
- * +------------------+----------------------------------------+
- * </pre>
- *
- * <p>For frame decoding, both server and client use Netty's {@link
- * io.netty.handler.codec.LengthFieldBasedFrameDecoder}. Message serialization
- * is done via static helpers in {@link 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer}.
- * The serialization helpers return {@link io.netty.buffer.ByteBuf} instances,
- * which are ready to be sent to the client or server respectively as they
- * contain the frame length.
- *
- * <p>See also:
- * <ul>
- * <li>{@link 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer}</li>
- * </ul>
- *
- * <h2>Statistics</h2>
- *
- * <p>Both server and client keep track of request statistics via {@link
- * org.apache.flink.runtime.query.netty.KvStateRequestStats}.
- *
- * <p>See also:
- * <ul>
- * <li>{@link org.apache.flink.runtime.query.netty.KvStateRequestStats}</li>
- * </ul>
- */
-package org.apache.flink.runtime.query.netty;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
deleted file mode 100644
index 07a4396..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains all KvState query related classes.
- *
- * <h2>TaskManager and JobManager</h2>
- *
- * <p>State backends register queryable state instances at the {@link
- * org.apache.flink.runtime.query.KvStateRegistry}.
- * There is one registry per TaskManager. Registered KvState instances are
- * reported to the JobManager, where they are aggregated at the {@link
- * org.apache.flink.runtime.query.KvStateLocationRegistry}.
- *
- * <p>Instances of {@link org.apache.flink.runtime.query.KvStateLocation} 
contain
- * all information needed for a client to query a KvState instance.
- *
- * <p>See also:
- * <ul>
- * <li>{@link org.apache.flink.runtime.query.KvStateRegistry}</li>
- * <li>{@link org.apache.flink.runtime.query.TaskKvStateRegistry}</li>
- * <li>{@link org.apache.flink.runtime.query.KvStateLocation}</li>
- * <li>{@link org.apache.flink.runtime.query.KvStateLocationRegistry}</li>
- * </ul>
- *
- * <h2>Client</h2>
- *
- * The {@link org.apache.flink.runtime.query.QueryableStateClient} is used
- * to query KvState instances. The client takes care of {@link
- * org.apache.flink.runtime.query.KvStateLocation} lookup and caching. Queries
- * are then dispatched via the network client.
- *
- * <h3>JobManager Communication</h3>
- *
- * <p>The JobManager is queried for {@link 
org.apache.flink.runtime.query.KvStateLocation}
- * instances via the {@link 
org.apache.flink.runtime.query.KvStateLocationLookupService}.
- * The client caches resolved locations and dispatches queries directly to the
- * TaskManager.
- *
- * <h3>TaskManager Communication</h3>
- *
- * <p>After the location has been resolved, the TaskManager is queried via the
- * {@link org.apache.flink.runtime.query.netty.KvStateClient}.
- */
-package org.apache.flink.runtime.query;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 7e1123d..97b6bcd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
@@ -90,7 +90,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends 
State, SD extends St
        public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
                Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
 
-               Tuple2<K, N> keyAndNamespace = 
KvStateRequestSerializer.deserializeKeyAndNamespace(
+               Tuple2<K, N> keyAndNamespace = 
KvStateSerializer.deserializeKeyAndNamespace(
                                serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
 
                return getSerializedValue(keyAndNamespace.f0, 
keyAndNamespace.f1);
@@ -108,7 +108,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends 
State, SD extends St
 
                @SuppressWarnings("unchecked,rawtypes")
                TypeSerializer serializer = stateDesc.getSerializer();
-               return KvStateRequestSerializer.serializeValue(result, 
serializer);
+               return KvStateSerializer.serializeValue(result, serializer);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index f393237..f981b9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 
@@ -153,6 +153,6 @@ public class HeapMapState<K, N, UK, UV>
                TypeSerializer<UK> userKeySerializer = 
stateDesc.getKeySerializer();
                TypeSerializer<UV> userValueSerializer = 
stateDesc.getValueSerializer();
 
-               return KvStateRequestSerializer.serializeMap(result.entrySet(), 
userKeySerializer, userValueSerializer);
+               return KvStateSerializer.serializeMap(result.entrySet(), 
userKeySerializer, userValueSerializer);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index fa1ae54..37d28de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -47,7 +47,7 @@ public class QueryableStateConfiguration {
        // 
------------------------------------------------------------------------
 
        /**
-        * Returns whether queryable state is enabled. 
+        * Returns whether queryable state is enabled.
         */
        public boolean enabled() {
                return enabled;
@@ -70,7 +70,7 @@ public class QueryableStateConfiguration {
 
        /**
         * Returns the number of threads for the thread pool that performs the 
actual state lookup.
-        * These threads perform the actual state lookup. 
+        * These threads perform the actual state lookup.
         */
        public int numQueryThreads() {
                return numQueryThreads;
@@ -90,7 +90,7 @@ public class QueryableStateConfiguration {
        // 
------------------------------------------------------------------------
 
        /**
-        * Gets the configuration describing the queryable state as 
deactivated. 
+        * Gets the configuration describing the queryable state as deactivated.
         */
        public static QueryableStateConfiguration disabled() {
                return new QueryableStateConfiguration(false, 0, 0, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 1c30ff6..7c5c830 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -39,8 +39,9 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.QueryableStateUtils;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
@@ -48,6 +49,7 @@ import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,7 +140,7 @@ public class TaskManagerServices {
        public FileCache getFileCache() {
                return fileCache;
        }
-       
+
        public TaskSlotTable getTaskSlotTable() {
                return taskSlotTable;
        }
@@ -214,7 +216,7 @@ public class TaskManagerServices {
                final JobManagerTable jobManagerTable = new JobManagerTable();
 
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-               
+
                return new TaskManagerServices(
                        taskManagerLocation,
                        memoryManager,
@@ -354,7 +356,7 @@ public class TaskManagerServices {
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
 
                KvStateRegistry kvStateRegistry = new KvStateRegistry();
-               KvStateServer kvStateServer;
+               KvStateServer kvStateServer = null;
 
                if 
(taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) {
                        QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
@@ -365,15 +367,13 @@ public class TaskManagerServices {
                        int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
                                        
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numQueryThreads();
 
-                       kvStateServer = new KvStateServer(
-                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                               qsConfig.port(),
-                               numNetworkThreads,
-                               numQueryThreads,
-                               kvStateRegistry,
-                               new DisabledKvStateRequestStats());
-               } else {
-                       kvStateServer = null;
+                       kvStateServer = QueryableStateUtils.createKvStateServer(
+                                       
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                                       qsConfig.port(),
+                                       numNetworkThreads,
+                                       numQueryThreads,
+                                       kvStateRegistry,
+                                       new DisabledKvStateRequestStats());
                }
 
                // we start the network first, to make sure it can allocate its 
buffers first
@@ -395,7 +395,7 @@ public class TaskManagerServices {
         * Calculates the amount of memory used for network buffers based on 
the total memory to use and
         * the according configuration parameters.
         *
-        * The following configuration parameters are involved:
+        * <p>The following configuration parameters are involved:
         * <ul>
         *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
         *      <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
@@ -458,11 +458,11 @@ public class TaskManagerServices {
         * Calculates the amount of memory used for network buffers inside the 
current JVM instance
         * based on the available heap or the max heap size and the according 
configuration parameters.
         *
-        * For containers or when started via scripts, if started with a memory 
limit and set to use
+        * <p>For containers or when started via scripts, if started with a 
memory limit and set to use
         * off-heap memory, the maximum heap size for the JVM is adjusted 
accordingly and we are able
         * to extract the intended values from this.
         *
-        * The following configuration parameters are involved:
+        * <p>The following configuration parameters are involved:
         * <ul>
         *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
         *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
@@ -629,7 +629,7 @@ public class TaskManagerServices {
                                if (LOG.isInfoEnabled()) {
                                        long totalSpaceGb = 
file.getTotalSpace() >> 30;
                                        long usableSpaceGb = 
file.getUsableSpace() >> 30;
-                                       double usablePercentage = 
(double)usableSpaceGb / totalSpaceGb * 100;
+                                       double usablePercentage = (double) 
usableSpaceGb / totalSpaceGb * 100;
                                        String path = file.getAbsolutePath();
                                        LOG.info(String.format("Temporary file 
directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)",
                                                path, totalSpaceGb, 
usableSpaceGb, usablePercentage));

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 6cc7569..f1f7d39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Configuration for the task manager services such as the network 
environment, the memory manager,
- * the io manager and the metric registry
+ * the io manager and the metric registry.
  */
 public class TaskManagerServicesConfiguration {
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServicesConfiguration.class);
@@ -106,7 +107,6 @@ public class TaskManagerServicesConfiguration {
        //  Getter/Setter
        // 
--------------------------------------------------------------------------------------------
 
-
        public InetAddress getTaskManagerAddress() {
                return taskManagerAddress;
        }
@@ -291,7 +291,7 @@ public class TaskManagerServicesConfiguration {
 
                if (!hasNewNetworkBufConf(configuration)) {
                        // map old config to new one:
-                       networkBufMin = networkBufMax = 
((long)numNetworkBuffers) * pageSize;
+                       networkBufMin = networkBufMax = ((long) 
numNetworkBuffers) * pageSize;
                } else {
                        if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
                                LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
@@ -439,9 +439,8 @@ public class TaskManagerServicesConfiguration {
        static void checkConfigParameter(boolean condition, Object parameter, 
String name, String errorMessage)
                        throws IllegalConfigurationException {
                if (!condition) {
-                       throw new IllegalConfigurationException("Invalid 
configuration value for " + 
+                       throw new IllegalConfigurationException("Invalid 
configuration value for " +
                                        name + " : " + parameter + " - " + 
errorMessage);
                }
        }
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
deleted file mode 100644
index edefcf8..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy;
-import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
-import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link AkkaKvStateLocationLookupService}.
- */
-public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
-
-       /** The default timeout. */
-       private static final FiniteDuration TIMEOUT = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-       /** Test actor system shared between the tests. */
-       private static ActorSystem testActorSystem;
-
-       @BeforeClass
-       public static void setUp() throws Exception {
-               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-       }
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (testActorSystem != null) {
-                       testActorSystem.shutdown();
-               }
-       }
-
-       /**
-        * Tests responses if no leader notification has been reported or 
leadership
-        * has been lost (leaderAddress = <code>null</code>).
-        */
-       @Test
-       public void testNoJobManagerRegistered() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
-                       null,
-                       null);
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-               lookupService.start();
-
-               //
-               // No leader registered initially => fail with UnknownJobManager
-               //
-               try {
-                       JobID jobId = new JobID();
-                       String name = "coffee";
-
-                       Future<KvStateLocation> locationFuture = 
lookupService.getKvStateLookupInfo(jobId, name);
-
-                       Await.result(locationFuture, TIMEOUT);
-                       fail("Did not throw expected Exception");
-               } catch (UnknownJobManager ignored) {
-                       // Expected
-               }
-
-               assertEquals("Received unexpected lookup", 0, received.size());
-
-               //
-               // Leader registration => communicate with new leader
-               //
-               UUID leaderSessionId = 
HighAvailabilityServices.DEFAULT_LEADER_ID;
-               KvStateLocation expected = new KvStateLocation(new JobID(), new 
JobVertexID(), 8282, "tea");
-
-               ActorRef testActor = LookupResponseActor.create(received, 
leaderSessionId, expected);
-
-               String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, 
testActor);
-
-               // Notify the service about a leader
-               leaderRetrievalService.notifyListener(testActorAddress, 
leaderSessionId);
-
-               JobID jobId = new JobID();
-               String name = "tea";
-
-               // Verify that the leader response is handled
-               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
-               assertEquals(expected, location);
-
-               // Verify that the correct message was sent to the leader
-               assertEquals(1, received.size());
-
-               verifyLookupMsg(received.poll(), jobId, name);
-
-               //
-               // Leader loss => fail with UnknownJobManager
-               //
-               leaderRetrievalService.notifyListener(null, null);
-
-               try {
-                       Future<KvStateLocation> locationFuture = lookupService
-                                       .getKvStateLookupInfo(new JobID(), 
"coffee");
-
-                       Await.result(locationFuture, TIMEOUT);
-                       fail("Did not throw expected Exception");
-               } catch (UnknownJobManager ignored) {
-                       // Expected
-               }
-
-               // No new messages received
-               assertEquals(0, received.size());
-       }
-
-       /**
-        * Tests that messages are properly decorated with the leader session 
ID.
-        */
-       @Test
-       public void testLeaderSessionIdChange() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
-                       "localhost",
-                       HighAvailabilityServices.DEFAULT_LEADER_ID);
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-               lookupService.start();
-
-               // Create test actors with random leader session IDs
-               KvStateLocation expected1 = new KvStateLocation(new JobID(), 
new JobVertexID(), 8282, "salt");
-               UUID leaderSessionId1 = UUID.randomUUID();
-               ActorRef testActor1 = LookupResponseActor.create(received, 
leaderSessionId1, expected1);
-               String testActorAddress1 = 
AkkaUtils.getAkkaURL(testActorSystem, testActor1);
-
-               KvStateLocation expected2 = new KvStateLocation(new JobID(), 
new JobVertexID(), 22321, "pepper");
-               UUID leaderSessionId2 = UUID.randomUUID();
-               ActorRef testActor2 = LookupResponseActor.create(received, 
leaderSessionId1, expected2);
-               String testActorAddress2 = 
AkkaUtils.getAkkaURL(testActorSystem, testActor2);
-
-               JobID jobId = new JobID();
-
-               //
-               // Notify about first leader
-               //
-               leaderRetrievalService.notifyListener(testActorAddress1, 
leaderSessionId1);
-
-               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
-               assertEquals(expected1, location);
-
-               assertEquals(1, received.size());
-               verifyLookupMsg(received.poll(), jobId, "rock");
-
-               //
-               // Notify about second leader
-               //
-               leaderRetrievalService.notifyListener(testActorAddress2, 
leaderSessionId2);
-
-               location = 
Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
-               assertEquals(expected2, location);
-
-               assertEquals(1, received.size());
-               verifyLookupMsg(received.poll(), jobId, "roll");
-       }
-
-       /**
-        * Tests that lookups are retried when no leader notification is 
available.
-        */
-       @Test
-       public void testRetryOnUnknownJobManager() throws Exception {
-               final Queue<LookupRetryStrategy> retryStrategies = new 
LinkedBlockingQueue<>();
-
-               LookupRetryStrategyFactory retryStrategy =
-                               new LookupRetryStrategyFactory() {
-                                       @Override
-                                       public LookupRetryStrategy 
createRetryStrategy() {
-                                               return retryStrategies.poll();
-                                       }
-                               };
-
-               final TestingLeaderRetrievalService leaderRetrievalService = 
new TestingLeaderRetrievalService(
-                       null,
-                       null);
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               retryStrategy);
-
-               lookupService.start();
-
-               //
-               // Test call to retry
-               //
-               final AtomicBoolean hasRetried = new AtomicBoolean();
-               retryStrategies.add(
-                               new LookupRetryStrategy() {
-                                       @Override
-                                       public FiniteDuration getRetryDelay() {
-                                               return FiniteDuration.Zero();
-                                       }
-
-                                       @Override
-                                       public boolean tryRetry() {
-                                               if 
(hasRetried.compareAndSet(false, true)) {
-                                                       return true;
-                                               }
-                                               return false;
-                                       }
-                               });
-
-               Future<KvStateLocation> locationFuture = 
lookupService.getKvStateLookupInfo(new JobID(), "yessir");
-
-               Await.ready(locationFuture, TIMEOUT);
-               assertTrue("Did not retry ", hasRetried.get());
-
-               //
-               // Test leader notification after retry
-               //
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               KvStateLocation expected = new KvStateLocation(new JobID(), new 
JobVertexID(), 12122, "garlic");
-               ActorRef testActor = LookupResponseActor.create(received, null, 
expected);
-               final String testActorAddress = 
AkkaUtils.getAkkaURL(testActorSystem, testActor);
-
-               retryStrategies.add(new LookupRetryStrategy() {
-                       @Override
-                       public FiniteDuration getRetryDelay() {
-                               return FiniteDuration.apply(100, 
TimeUnit.MILLISECONDS);
-                       }
-
-                       @Override
-                       public boolean tryRetry() {
-                               
leaderRetrievalService.notifyListener(testActorAddress, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-                               return true;
-                       }
-               });
-
-               KvStateLocation location = 
Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), 
TIMEOUT);
-               assertEquals(expected, location);
-       }
-
-       @Test
-       public void testUnexpectedResponseType() throws Exception {
-               TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
-                       "localhost",
-                       HighAvailabilityServices.DEFAULT_LEADER_ID);
-               Queue<LookupKvStateLocation> received = new 
LinkedBlockingQueue<>();
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               testActorSystem,
-                               TIMEOUT,
-                               new 
AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
-
-               lookupService.start();
-
-               // Create test actors with random leader session IDs
-               String expected = "unexpected-response-type";
-               ActorRef testActor = LookupResponseActor.create(received, null, 
expected);
-               String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, 
testActor);
-
-               leaderRetrievalService.notifyListener(testActorAddress, null);
-
-               try {
-                       Await.result(lookupService.getKvStateLookupInfo(new 
JobID(), "spicy"), TIMEOUT);
-                       fail("Did not throw expected Exception");
-               } catch (Throwable ignored) {
-                       // Expected
-               }
-       }
-
-       private static final class LookupResponseActor extends 
FlinkUntypedActor {
-
-               /** Received lookup messages. */
-               private final Queue<LookupKvStateLocation> receivedLookups;
-
-               /** Responses on KvStateMessage.LookupKvStateLocation messages. 
*/
-               private final Queue<Object> lookupResponses;
-
-               /** The leader session ID. */
-               private UUID leaderSessionId;
-
-               public LookupResponseActor(
-                               Queue<LookupKvStateLocation> receivedLookups,
-                               UUID leaderSessionId, Object... 
lookupResponses) {
-
-                       this.receivedLookups = 
Preconditions.checkNotNull(receivedLookups, "Received lookups");
-                       this.leaderSessionId = leaderSessionId;
-                       this.lookupResponses = new ArrayDeque<>();
-
-                       if (lookupResponses != null) {
-                               for (Object resp : lookupResponses) {
-                                       this.lookupResponses.add(resp);
-                               }
-                       }
-               }
-
-               @Override
-               public void handleMessage(Object message) throws Exception {
-                       if (message instanceof LookupKvStateLocation) {
-                               // Add to received lookups queue
-                               receivedLookups.add((LookupKvStateLocation) 
message);
-
-                               Object msg = lookupResponses.poll();
-                               if (msg != null) {
-                                       if (msg instanceof Throwable) {
-                                               sender().tell(new 
Status.Failure((Throwable) msg), self());
-                                       } else {
-                                               sender().tell(new 
Status.Success(msg), self());
-                                       }
-                               }
-                       } else if (message instanceof UUID) {
-                               this.leaderSessionId = (UUID) message;
-                       } else {
-                               LOG.debug("Received unhandled message: {}", 
message);
-                       }
-               }
-
-               @Override
-               protected UUID getLeaderSessionID() {
-                       return leaderSessionId;
-               }
-
-               private static ActorRef create(
-                               Queue<LookupKvStateLocation> receivedLookups,
-                               UUID leaderSessionId,
-                               Object... lookupResponses) {
-
-                       return testActorSystem.actorOf(Props.create(
-                                       LookupResponseActor.class,
-                                       receivedLookups,
-                                       leaderSessionId,
-                                       lookupResponses));
-               }
-       }
-
-       private static void verifyLookupMsg(
-                       LookupKvStateLocation lookUpMsg,
-                       JobID expectedJobId,
-                       String expectedName) {
-
-               assertNotNull(lookUpMsg);
-               assertEquals(expectedJobId, lookUpMsg.getJobId());
-               assertEquals(expectedName, lookUpMsg.getRegistrationName());
-       }
-
-}

Reply via email to