http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
deleted file mode 100644
index a2850b3..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.client.KvStateClient;
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link KvStateClient}.
- */
-public class KvStateClientTest {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientTest.class);
-
-       // Thread pool for client bootstrap (shared between tests)
-       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
-
-       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(100, TimeUnit.SECONDS);
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (NIO_GROUP != null) {
-                       NIO_GROUP.shutdownGracefully();
-               }
-       }
-
-       /**
-        * Tests simple queries, of which half succeed and half fail.
-        */
-       @Test
-       public void testSimpleRequests() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       // Random result
-                       final byte[] expected = new byte[1024];
-                       ThreadLocalRandom.current().nextBytes(expected);
-
-                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.add((ByteBuf) msg);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       List<Future<byte[]>> futures = new ArrayList<>();
-
-                       int numQueries = 1024;
-
-                       for (int i = 0; i < numQueries; i++) {
-                               futures.add(client.getKvState(serverAddress, 
new KvStateID(), new byte[0]));
-                       }
-
-                       // Respond to messages
-                       Exception testException = new 
RuntimeException("Expected test Exception");
-
-                       for (int i = 0; i < numQueries; i++) {
-                               ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               assertNotNull("Receive timed out", buf);
-
-                               Channel ch = channel.get();
-                               assertNotNull("Channel not active", ch);
-
-                               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-                               KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
-
-                               buf.release();
-
-                               if (i % 2 == 0) {
-                                       ByteBuf response = 
MessageSerializer.serializeKvStateRequestResult(
-                                                       serverChannel.alloc(),
-                                                       request.getRequestId(),
-                                                       expected);
-
-                                       ch.writeAndFlush(response);
-                               } else {
-                                       ByteBuf response = 
MessageSerializer.serializeKvStateRequestFailure(
-                                                       serverChannel.alloc(),
-                                                       request.getRequestId(),
-                                                       testException);
-
-                                       ch.writeAndFlush(response);
-                               }
-                       }
-
-                       for (int i = 0; i < numQueries; i++) {
-                               if (i % 2 == 0) {
-                                       byte[] serializedResult = 
Await.result(futures.get(i), deadline.timeLeft());
-                                       assertArrayEquals(expected, 
serializedResult);
-                               } else {
-                                       try {
-                                               Await.result(futures.get(i), 
deadline.timeLeft());
-                                               fail("Did not throw expected 
Exception");
-                                       } catch (RuntimeException ignored) {
-                                               // Expected
-                                       }
-                               }
-                       }
-
-                       assertEquals(numQueries, stats.getNumRequests());
-                       int expectedRequests = numQueries / 2;
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != expectedRequests ||
-                                       stats.getNumFailed() != 
expectedRequests)) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(expectedRequests, 
stats.getNumSuccessful());
-                       assertEquals(expectedRequests, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a request to an unavailable host is failed with 
ConnectException.
-        */
-       @Test
-       public void testRequestUnavailableHost() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-               KvStateClient client = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       int availablePort = NetUtils.getAvailablePort();
-
-                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(
-                                       InetAddress.getLocalHost(),
-                                       availablePort);
-
-                       Future<byte[]> future = 
client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
-                       try {
-                               Await.result(future, deadline.timeLeft());
-                               fail("Did not throw expected ConnectException");
-                       } catch (ConnectException ignored) {
-                               // Expected
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Multiple threads concurrently fire queries.
-        */
-       @Test
-       public void testConcurrentQueries() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               ExecutorService executor = null;
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               final byte[] serializedResult = new byte[1024];
-               ThreadLocalRandom.current().nextBytes(serializedResult);
-
-               try {
-                       int numQueryTasks = 4;
-                       final int numQueriesPerTask = 1024;
-
-                       executor = Executors.newFixedThreadPool(numQueryTasks);
-
-                       client = new KvStateClient(1, stats);
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       ByteBuf buf = (ByteBuf) msg;
-                                       assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-                                       KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
-
-                                       buf.release();
-
-                                       ByteBuf response = 
MessageSerializer.serializeKvStateRequestResult(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       serializedResult);
-
-                                       ctx.channel().writeAndFlush(response);
-                               }
-                       });
-
-                       final KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       final KvStateClient finalClient = client;
-                       Callable<List<Future<byte[]>>> queryTask = new 
Callable<List<Future<byte[]>>>() {
-                               @Override
-                               public List<Future<byte[]>> call() throws 
Exception {
-                                       List<Future<byte[]>> results = new 
ArrayList<>(numQueriesPerTask);
-
-                                       for (int i = 0; i < numQueriesPerTask; 
i++) {
-                                               
results.add(finalClient.getKvState(
-                                                               serverAddress,
-                                                               new KvStateID(),
-                                                               new byte[0]));
-                                       }
-
-                                       return results;
-                               }
-                       };
-
-                       // Submit query tasks
-                       List<java.util.concurrent.Future<List<Future<byte[]>>>> 
futures = new ArrayList<>();
-                       for (int i = 0; i < numQueryTasks; i++) {
-                               futures.add(executor.submit(queryTask));
-                       }
-
-                       // Verify results
-                       for (java.util.concurrent.Future<List<Future<byte[]>>> 
future : futures) {
-                               List<Future<byte[]>> results = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               for (Future<byte[]> result : results) {
-                                       byte[] actual = Await.result(result, 
deadline.timeLeft());
-                                       assertArrayEquals(serializedResult, 
actual);
-                               }
-                       }
-
-                       int totalQueries = numQueryTasks * numQueriesPerTask;
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
stats.getNumSuccessful() != totalQueries) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(totalQueries, stats.getNumRequests());
-                       assertEquals(totalQueries, stats.getNumSuccessful());
-               } finally {
-                       if (executor != null) {
-                               executor.shutdown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a server failure closes the connection and removes it from
-        * the established connections.
-        */
-       @Test
-       public void testFailureClosesChannel() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.add((ByteBuf) msg);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       // Requests
-                       List<Future<byte[]>> futures = new ArrayList<>();
-                       futures.add(client.getKvState(serverAddress, new 
KvStateID(), new byte[0]));
-                       futures.add(client.getKvState(serverAddress, new 
KvStateID(), new byte[0]));
-
-                       ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                       assertNotNull("Receive timed out", buf);
-                       buf.release();
-
-                       buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                       assertNotNull("Receive timed out", buf);
-                       buf.release();
-
-                       assertEquals(1, stats.getNumConnections());
-
-                       Channel ch = channel.get();
-                       assertNotNull("Channel not active", ch);
-
-                       // Respond with failure
-                       
ch.writeAndFlush(MessageSerializer.serializeServerFailure(
-                                       serverChannel.alloc(),
-                                       new RuntimeException("Expected test 
server failure")));
-
-                       try {
-                               Await.result(futures.remove(0), 
deadline.timeLeft());
-                               fail("Did not throw expected server failure");
-                       } catch (RuntimeException ignored) {
-                               // Expected
-                       }
-
-                       try {
-                               Await.result(futures.remove(0), 
deadline.timeLeft());
-                               fail("Did not throw expected server failure");
-                       } catch (RuntimeException ignored) {
-                               // Expected
-                       }
-
-                       assertEquals(0, stats.getNumConnections());
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0 ||
-                                       stats.getNumFailed() != 2)) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(2, stats.getNumRequests());
-                       assertEquals(0, stats.getNumSuccessful());
-                       assertEquals(2, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a server channel close, closes the connection and removes 
it
-        * from the established connections.
-        */
-       @Test
-       public void testServerClosesChannel() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       final AtomicBoolean received = new AtomicBoolean();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.set(true);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       // Requests
-                       Future<byte[]> future = 
client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
-                       while (!received.get() && deadline.hasTimeLeft()) {
-                               Thread.sleep(50);
-                       }
-                       assertTrue("Receive timed out", received.get());
-
-                       assertEquals(1, stats.getNumConnections());
-
-                       
channel.get().close().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-
-                       try {
-                               Await.result(future, deadline.timeLeft());
-                               fail("Did not throw expected server failure");
-                       } catch (ClosedChannelException ignored) {
-                               // Expected
-                       }
-
-                       assertEquals(0, stats.getNumConnections());
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0 ||
-                                       stats.getNumFailed() != 1)) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(1, stats.getNumRequests());
-                       assertEquals(0, stats.getNumSuccessful());
-                       assertEquals(1, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests multiple clients querying multiple servers until 100k queries 
have
-        * been processed. At this point, the client is shut down and its 
verified
-        * that all ongoing requests are failed.
-        */
-       @Test
-       public void testClientServerIntegration() throws Exception {
-               // Config
-               final int numServers = 2;
-               final int numServerEventLoopThreads = 2;
-               final int numServerQueryThreads = 2;
-
-               final int numClientEventLoopThreads = 4;
-               final int numClientsTasks = 8;
-
-               final int batchSize = 16;
-
-               final int numKeyGroups = 1;
-
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               KvStateRegistry dummyRegistry = new KvStateRegistry();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(dummyRegistry);
-
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
-
-               final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-               AtomicKvStateRequestStats clientStats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               ExecutorService clientTaskExecutor = null;
-               final KvStateServer[] server = new KvStateServer[numServers];
-
-               try {
-                       client = new KvStateClient(numClientEventLoopThreads, 
clientStats);
-                       clientTaskExecutor = 
Executors.newFixedThreadPool(numClientsTasks);
-
-                       // Create state
-                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-                       desc.setQueryable("any");
-
-                       // Create servers
-                       KvStateRegistry[] registry = new 
KvStateRegistry[numServers];
-                       AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
-                       final KvStateID[] ids = new KvStateID[numServers];
-
-                       for (int i = 0; i < numServers; i++) {
-                               registry[i] = new KvStateRegistry();
-                               serverStats[i] = new 
AtomicKvStateRequestStats();
-                               server[i] = new KvStateServerImpl(
-                                               InetAddress.getLocalHost(),
-                                               0,
-                                               numServerEventLoopThreads,
-                                               numServerQueryThreads,
-                                               registry[i],
-                                               serverStats[i]);
-
-                               server[i].start();
-
-                               backend.setCurrentKey(1010 + i);
-
-                               // Value per server
-                               ValueState<Integer> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE,
-                                               
VoidNamespaceSerializer.INSTANCE,
-                                               desc);
-
-                               state.update(201 + i);
-
-                               // we know it must be a KvStat but this is not 
exposed to the user via State
-                               InternalKvState<?> kvState = 
(InternalKvState<?>) state;
-
-                               // Register KvState (one state instance for all 
server)
-                               ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
-                       }
-
-                       final KvStateClient finalClient = client;
-                       Callable<Void> queryTask = new Callable<Void>() {
-                               @Override
-                               public Void call() throws Exception {
-                                       while (true) {
-                                               if (Thread.interrupted()) {
-                                                       throw new 
InterruptedException();
-                                               }
-
-                                               // Random server permutation
-                                               List<Integer> random = new 
ArrayList<>();
-                                               for (int j = 0; j < batchSize; 
j++) {
-                                                       random.add(j);
-                                               }
-                                               Collections.shuffle(random);
-
-                                               // Dispatch queries
-                                               List<Future<byte[]>> futures = 
new ArrayList<>(batchSize);
-
-                                               for (int j = 0; j < batchSize; 
j++) {
-                                                       int targetServer = 
random.get(j) % numServers;
-
-                                                       byte[] 
serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
-                                                                       1010 + 
targetServer,
-                                                                       
IntSerializer.INSTANCE,
-                                                                       
VoidNamespace.INSTANCE,
-                                                                       
VoidNamespaceSerializer.INSTANCE);
-
-                                                       
futures.add(finalClient.getKvState(
-                                                                       
server[targetServer].getAddress(),
-                                                                       
ids[targetServer],
-                                                                       
serializedKeyAndNamespace));
-                                               }
-
-                                               // Verify results
-                                               for (int j = 0; j < batchSize; 
j++) {
-                                                       int targetServer = 
random.get(j) % numServers;
-
-                                                       Future<byte[]> future = 
futures.get(j);
-                                                       byte[] buf = 
Await.result(future, timeout);
-                                                       int value = 
KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
-                                                       assertEquals(201 + 
targetServer, value);
-                                               }
-                                       }
-                               }
-                       };
-
-                       // Submit tasks
-                       List<java.util.concurrent.Future<Void>> taskFutures = 
new ArrayList<>();
-                       for (int i = 0; i < numClientsTasks; i++) {
-                               
taskFutures.add(clientTaskExecutor.submit(queryTask));
-                       }
-
-                       long numRequests;
-                       while ((numRequests = clientStats.getNumRequests()) < 
100_000) {
-                               Thread.sleep(100);
-                               LOG.info("Number of requests {}/100_000", 
numRequests);
-                       }
-
-                       // Shut down
-                       client.shutDown();
-
-                       for (java.util.concurrent.Future<Void> future : 
taskFutures) {
-                               try {
-                                       future.get();
-                                       fail("Did not throw expected Exception 
after shut down");
-                               } catch (ExecutionException t) {
-                                       if (t.getCause() instanceof 
ClosedChannelException ||
-                                                       t.getCause() instanceof 
IllegalStateException) {
-                                               // Expected
-                                       } else {
-                                               t.printStackTrace();
-                                               fail("Failed with unexpected 
Exception type: " + t.getClass().getName());
-                                       }
-                               }
-                       }
-
-                       assertEquals("Connection leak (client)", 0, 
clientStats.getNumConnections());
-                       for (int i = 0; i < numServers; i++) {
-                               boolean success = false;
-                               int numRetries = 0;
-                               while (!success) {
-                                       try {
-                                               assertEquals("Connection leak 
(server)", 0, serverStats[i].getNumConnections());
-                                               success = true;
-                                       } catch (Throwable t) {
-                                               if (numRetries < 10) {
-                                                       LOG.info("Retrying 
connection leak check (server)");
-                                                       
Thread.sleep((numRetries + 1) * 50);
-                                                       numRetries++;
-                                               } else {
-                                                       throw t;
-                                               }
-                                       }
-                               }
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       for (int i = 0; i < numServers; i++) {
-                               if (server[i] != null) {
-                                       server[i].shutDown();
-                               }
-                       }
-
-                       if (clientTaskExecutor != null) {
-                               clientTaskExecutor.shutdown();
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private Channel createServerChannel(final ChannelHandler... handlers) 
throws UnknownHostException, InterruptedException {
-               ServerBootstrap bootstrap = new ServerBootstrap()
-                               // Bind address and port
-                               .localAddress(InetAddress.getLocalHost(), 0)
-                               // NIO server channels
-                               .group(NIO_GROUP)
-                               .channel(NioServerSocketChannel.class)
-                               // See initializer for pipeline details
-                               .childHandler(new 
ChannelInitializer<SocketChannel>() {
-                                       @Override
-                                       protected void 
initChannel(SocketChannel ch) throws Exception {
-                                               ch.pipeline()
-                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                                               
.addLast(handlers);
-                                       }
-                               });
-
-               return bootstrap.bind().sync().channel();
-       }
-
-       private KvStateServerAddress getKvStateServerAddress(Channel 
serverChannel) {
-               InetSocketAddress localAddress = (InetSocketAddress) 
serverChannel.localAddress();
-
-               return new KvStateServerAddress(localAddress.getAddress(), 
localAddress.getPort());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
deleted file mode 100644
index f28ca68..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link KvStateSerializer}.
- */
-@RunWith(Parameterized.class)
-public class KvStateRequestSerializerTest {
-
-       private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
-
-       @Parameterized.Parameters
-       public static Collection<Boolean> parameters() {
-               return Arrays.asList(false, true);
-       }
-
-       @Parameterized.Parameter
-       public boolean async;
-
-       /**
-        * Tests KvState request serialization.
-        */
-       @Test
-       public void testKvStateRequestSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 1337L;
-               KvStateID kvStateId = new KvStateID();
-               byte[] serializedKeyAndNamespace = randomByteArray(1024);
-
-               ByteBuf buf = MessageSerializer.serializeKvStateRequest(
-                               alloc,
-                               requestId,
-                               kvStateId,
-                               serializedKeyAndNamespace);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(requestId, request.getRequestId());
-               assertEquals(kvStateId, request.getKvStateId());
-               assertArrayEquals(serializedKeyAndNamespace, 
request.getSerializedKeyAndNamespace());
-       }
-
-       /**
-        * Tests KvState request serialization with zero-length serialized key 
and namespace.
-        */
-       @Test
-       public void 
testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception 
{
-               byte[] serializedKeyAndNamespace = new byte[0];
-
-               ByteBuf buf = MessageSerializer.serializeKvStateRequest(
-                               alloc,
-                               1823,
-                               new KvStateID(),
-                               serializedKeyAndNamespace);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequest request = 
MessageSerializer.deserializeKvStateRequest(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertArrayEquals(serializedKeyAndNamespace, 
request.getSerializedKeyAndNamespace());
-       }
-
-       /**
-        * Tests that we don't try to be smart about <code>null</code> key and 
namespace.
-        * They should be treated explicitly.
-        */
-       @Test(expected = NullPointerException.class)
-       public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
-               new KvStateRequest(0, new KvStateID(), null);
-       }
-
-       /**
-        * Tests KvState request result serialization.
-        */
-       @Test
-       public void testKvStateRequestResultSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 72727278L;
-               byte[] serializedResult = randomByteArray(1024);
-
-               ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
-                               alloc,
-                               requestId,
-                               serializedResult);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestResult request = 
MessageSerializer.deserializeKvStateRequestResult(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(requestId, request.getRequestId());
-
-               assertArrayEquals(serializedResult, 
request.getSerializedResult());
-       }
-
-       /**
-        * Tests KvState request result serialization with zero-length 
serialized result.
-        */
-       @Test
-       public void 
testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws 
Exception {
-               byte[] serializedResult = new byte[0];
-
-               ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
-                               alloc,
-                               72727278,
-                               serializedResult);
-
-               int frameLength = buf.readInt();
-
-               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestResult request = 
MessageSerializer.deserializeKvStateRequestResult(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertArrayEquals(serializedResult, 
request.getSerializedResult());
-       }
-
-       /**
-        * Tests that we don't try to be smart about <code>null</code> results.
-        * They should be treated explicitly.
-        */
-       @Test(expected = NullPointerException.class)
-       public void testNullPointerExceptionOnNullSerializedResult() throws 
Exception {
-               new KvStateRequestResult(0, null);
-       }
-
-       /**
-        * Tests KvState request failure serialization.
-        */
-       @Test
-       public void testKvStateRequestFailureSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 1111222L;
-               IllegalStateException cause = new 
IllegalStateException("Expected test");
-
-               ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure(
-                               alloc,
-                               requestId,
-                               cause);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestFailure request = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(requestId, request.getRequestId());
-               assertEquals(cause.getClass(), request.getCause().getClass());
-               assertEquals(cause.getMessage(), 
request.getCause().getMessage());
-       }
-
-       /**
-        * Tests KvState server failure serialization.
-        */
-       @Test
-       public void testServerFailureSerialization() throws Exception {
-               IllegalStateException cause = new 
IllegalStateException("Expected test");
-
-               ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, 
cause);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               Throwable request = 
MessageSerializer.deserializeServerFailure(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(cause.getClass(), request.getClass());
-               assertEquals(cause.getMessage(), request.getMessage());
-       }
-
-       private byte[] randomByteArray(int capacity) {
-               byte[] bytes = new byte[capacity];
-               ThreadLocalRandom.current().nextBytes(bytes);
-               return bytes;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index c37c822..944349ee 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -24,20 +24,22 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.ChunkedByteBuf;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
 import org.apache.flink.queryablestate.server.KvStateServerHandler;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
 import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -57,10 +59,11 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -76,16 +79,28 @@ import static org.mockito.Mockito.when;
  */
 public class KvStateServerHandlerTest extends TestLogger {
 
-       /** Shared Thread pool for query execution. */
-       private static final ExecutorService TEST_THREAD_POOL = 
Executors.newSingleThreadExecutor();
-
-       private static final int READ_TIMEOUT_MILLIS = 10000;
+       private static KvStateServerImpl testServer;
+
+       private static final long READ_TIMEOUT_MILLIS = 10000L;
+
+       @BeforeClass
+       public static void setup() {
+               try {
+                       testServer = new KvStateServerImpl(
+                                       InetAddress.getLocalHost(),
+                                       0,
+                                       1,
+                                       1,
+                                       new KvStateRegistry(),
+                                       new DisabledKvStateRequestStats());
+               } catch (UnknownHostException e) {
+                       e.printStackTrace();
+               }
+       }
 
        @AfterClass
        public static void tearDown() throws Exception {
-               if (TEST_THREAD_POOL != null) {
-                       TEST_THREAD_POOL.shutdown();
-               }
+               testServer.shutdown();
        }
 
        /**
@@ -96,7 +111,10 @@ public class KvStateServerHandlerTest extends TestLogger {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                // Register state
@@ -141,40 +159,40 @@ public class KvStateServerHandlerTest extends TestLogger {
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               registryListener.kvStateId,
-                               serializedKeyAndNamespace);
+               KvStateInternalRequest request = new KvStateInternalRequest(
+                               registryListener.kvStateId, 
serializedKeyAndNamespace);
+
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
                buf.skipBytes(4); // skip frame length
 
                // Verify the response
                assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestResult response = 
MessageSerializer.deserializeKvStateRequestResult(buf);
+               long deserRequestId = MessageSerializer.getRequestId(buf);
+               KvStateResponse response = serializer.deserializeResponse(buf);
 
-               assertEquals(requestId, response.getRequestId());
+               assertEquals(requestId, deserRequestId);
 
-               int actualValue = 
KvStateSerializer.deserializeValue(response.getSerializedResult(), 
IntSerializer.INSTANCE);
+               int actualValue = 
KvStateSerializer.deserializeValue(response.getContent(), 
IntSerializer.INSTANCE);
                assertEquals(expectedValue, actualValue);
 
                assertEquals(stats.toString(), 1, stats.getNumRequests());
 
                // Wait for async successful request report
                long deadline = System.nanoTime() + 
TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
-               while (stats.getNumSuccessful() != 1 && System.nanoTime() <= 
deadline) {
-                       Thread.sleep(10);
+               while (stats.getNumSuccessful() != 1L && System.nanoTime() <= 
deadline) {
+                       Thread.sleep(10L);
                }
 
-               assertEquals(stats.toString(), 1, stats.getNumSuccessful());
+               assertEquals(stats.toString(), 1L, stats.getNumSuccessful());
        }
 
        /**
-        * Tests the failure response with {@link UnknownKvStateID} as cause on
+        * Tests the failure response with {@link UnknownKvStateIdException} as 
cause on
         * queries for unregistered KvStateIDs.
         */
        @Test
@@ -182,36 +200,38 @@ public class KvStateServerHandlerTest extends TestLogger {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                long requestId = Integer.MAX_VALUE + 182828L;
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               new KvStateID(),
-                               new byte[0]);
+
+               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0]);
+
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
                buf.skipBytes(4); // skip frame length
 
                // Verify the response
                assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
 
                assertEquals(requestId, response.getRequestId());
 
-               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKvStateID);
+               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKvStateIdException);
 
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
+               assertEquals(1L, stats.getNumRequests());
+               assertEquals(1L, stats.getNumFailed());
        }
 
        /**
-        * Tests the failure response with {@link UnknownKeyOrNamespace} as 
cause
+        * Tests the failure response with {@link 
UnknownKeyOrNamespaceException} as cause
         * on queries for non-existing keys.
         */
        @Test
@@ -219,7 +239,10 @@ public class KvStateServerHandlerTest extends TestLogger {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                int numKeyGroups = 1;
@@ -254,40 +277,39 @@ public class KvStateServerHandlerTest extends TestLogger {
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               registryListener.kvStateId,
-                               serializedKeyAndNamespace);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
                buf.skipBytes(4); // skip frame length
 
                // Verify the response
                assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
 
                assertEquals(requestId, response.getRequestId());
 
-               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKeyOrNamespace);
+               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKeyOrNamespaceException);
 
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
+               assertEquals(1L, stats.getNumRequests());
+               assertEquals(1L, stats.getNumFailed());
        }
 
        /**
-        * Tests the failure response on a failure on the {@link 
InternalKvState#getSerializedValue(byte[])}
-        * call.
+        * Tests the failure response on a failure on the {@link 
InternalKvState#getSerializedValue(byte[])} call.
         */
        @Test
        public void testFailureOnGetSerializedValue() throws Exception {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                // Failing KvState
@@ -302,38 +324,37 @@ public class KvStateServerHandlerTest extends TestLogger {
                                "vanilla",
                                kvState);
 
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               282872,
-                               kvStateId,
-                               new byte[0]);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, new byte[0]);
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
                buf.skipBytes(4); // skip frame length
 
                // Verify the response
                assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
 
                assertTrue(response.getCause().getMessage().contains("Expected 
test Exception"));
 
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
+               assertEquals(1L, stats.getNumRequests());
+               assertEquals(1L, stats.getNumFailed());
        }
 
        /**
-        * Tests that the channel is closed if an Exception reaches the channel
-        * handler.
+        * Tests that the channel is closed if an Exception reaches the channel 
handler.
         */
        @Test
        public void testCloseChannelOnExceptionCaught() throws Exception {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new EmbeddedChannel(handler);
 
                channel.pipeline().fireExceptionCaught(new 
RuntimeException("Expected test Exception"));
@@ -352,19 +373,28 @@ public class KvStateServerHandlerTest extends TestLogger {
        }
 
        /**
-        * Tests the failure response on a rejected execution, because the query
-        * executor has been closed.
+        * Tests the failure response on a rejected execution, because the 
query executor has been closed.
         */
        @Test
        public void testQueryExecutorShutDown() throws Exception {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               ExecutorService closedExecutor = 
Executors.newSingleThreadExecutor();
-               closedExecutor.shutdown();
-               assertTrue(closedExecutor.isShutdown());
+               KvStateServerImpl localTestServer = new KvStateServerImpl(
+                               InetAddress.getLocalHost(),
+                               0,
+                               1,
+                               1,
+                               new KvStateRegistry(),
+                               new DisabledKvStateRequestStats());
+
+               localTestServer.shutdown();
+               assertTrue(localTestServer.isExecutorShutdown());
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, closedExecutor, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(localTestServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                int numKeyGroups = 1;
@@ -391,26 +421,25 @@ public class KvStateServerHandlerTest extends TestLogger {
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               282872,
-                               registryListener.kvStateId,
-                               new byte[0]);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
                buf.skipBytes(4); // skip frame length
 
                // Verify the response
                assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
 
                
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
 
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
+               assertEquals(1L, stats.getNumRequests());
+               assertEquals(1L, stats.getNumFailed());
+
+               localTestServer.shutdown();
        }
 
        /**
@@ -421,7 +450,10 @@ public class KvStateServerHandlerTest extends TestLogger {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                // Write the request and wait for the response
@@ -438,13 +470,11 @@ public class KvStateServerHandlerTest extends TestLogger {
                assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
                Throwable response = 
MessageSerializer.deserializeServerFailure(buf);
 
-               assertEquals(0, stats.getNumRequests());
-               assertEquals(0, stats.getNumFailed());
+               assertEquals(0L, stats.getNumRequests());
+               assertEquals(0L, stats.getNumFailed());
 
-               unexpectedMessage = 
MessageSerializer.serializeKvStateRequestResult(
-                               channel.alloc(),
-                               192,
-                               new byte[0]);
+               KvStateResponse stateResponse = new KvStateResponse(new 
byte[0]);
+               unexpectedMessage = 
MessageSerializer.serializeResponse(channel.alloc(), 192L, stateResponse);
 
                channel.writeInbound(unexpectedMessage);
 
@@ -457,8 +487,8 @@ public class KvStateServerHandlerTest extends TestLogger {
 
                assertTrue("Unexpected failure cause " + 
response.getClass().getName(), response instanceof IllegalArgumentException);
 
-               assertEquals(0, stats.getNumRequests());
-               assertEquals(0, stats.getNumFailed());
+               assertEquals(0L, stats.getNumRequests());
+               assertEquals(0L, stats.getNumFailed());
        }
 
        /**
@@ -469,30 +499,30 @@ public class KvStateServerHandlerTest extends TestLogger {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               282872,
-                               new KvStateID(),
-                               new byte[0]);
+               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0]);
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
-               assertEquals(1, request.refCnt());
+               assertEquals(1L, serRequest.refCnt());
 
                // Write regular request
-               channel.writeInbound(request);
-               assertEquals("Buffer not recycled", 0, request.refCnt());
+               channel.writeInbound(serRequest);
+               assertEquals("Buffer not recycled", 0L, serRequest.refCnt());
 
                // Write unexpected msg
                ByteBuf unexpected = channel.alloc().buffer(8);
                unexpected.writeInt(4);
                unexpected.writeInt(4);
 
-               assertEquals(1, unexpected.refCnt());
+               assertEquals(1L, unexpected.refCnt());
 
                channel.writeInbound(unexpected);
-               assertEquals("Buffer not recycled", 0, unexpected.refCnt());
+               assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
        }
 
        /**
@@ -503,7 +533,10 @@ public class KvStateServerHandlerTest extends TestLogger {
                KvStateRegistry registry = new KvStateRegistry();
                AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                int numKeyGroups = 1;
@@ -550,45 +583,40 @@ public class KvStateServerHandlerTest extends TestLogger {
                                StringSerializer.INSTANCE);
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               182828,
-                               registryListener.kvStateId,
-                               wrongKeyAndNamespace);
+
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 182828L, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
                buf.skipBytes(4); // skip frame length
 
                // Verify the response
                assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
-               assertEquals(182828, response.getRequestId());
+               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
+               assertEquals(182828L, response.getRequestId());
                
assertTrue(response.getCause().getMessage().contains("IOException"));
 
                // Repeat with wrong namespace only
-               request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               182829,
-                               registryListener.kvStateId,
-                               wrongNamespace);
+               request = new 
KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
+               serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 182829L, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                buf = (ByteBuf) readInboundBlocking(channel);
                buf.skipBytes(4); // skip frame length
 
                // Verify the response
                assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
-               assertEquals(182829, response.getRequestId());
+               response = MessageSerializer.deserializeRequestFailure(buf);
+               assertEquals(182829L, response.getRequestId());
                
assertTrue(response.getCause().getMessage().contains("IOException"));
 
-               assertEquals(2, stats.getNumRequests());
-               assertEquals(2, stats.getNumFailed());
+               assertEquals(2L, stats.getNumRequests());
+               assertEquals(2L, stats.getNumFailed());
        }
 
        /**
@@ -599,7 +627,10 @@ public class KvStateServerHandlerTest extends TestLogger {
                KvStateRegistry registry = new KvStateRegistry();
                KvStateRequestStats stats = new AtomicKvStateRequestStats();
 
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                int numKeyGroups = 1;
@@ -650,14 +681,11 @@ public class KvStateServerHandlerTest extends TestLogger {
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               ByteBuf request = MessageSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               registryListener.kvStateId,
-                               serializedKeyAndNamespace);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
                // Write the request and wait for the response
-               channel.writeInbound(request);
+               channel.writeInbound(serRequest);
 
                Object msg = readInboundBlocking(channel);
                assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
@@ -669,9 +697,9 @@ public class KvStateServerHandlerTest extends TestLogger {
         * Queries the embedded channel for data.
         */
        private Object readInboundBlocking(EmbeddedChannel channel) throws 
InterruptedException, TimeoutException {
-               final int sleepMillis = 50;
+               final long sleepMillis = 50L;
 
-               int sleptMillis = 0;
+               long sleptMillis = 0L;
 
                Object msg = null;
                while (sleptMillis < READ_TIMEOUT_MILLIS &&

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 9332e68..b7f489a 100644
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -22,14 +22,14 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
 import org.apache.flink.queryablestate.network.messages.MessageType;
 import org.apache.flink.queryablestate.server.KvStateServerImpl;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateRequestStats;
@@ -66,7 +66,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests for {@link KvStateServer}.
+ * Tests for {@link KvStateServerImpl}.
  */
 public class KvStateServerTest {
 
@@ -87,7 +87,7 @@ public class KvStateServerTest {
         */
        @Test
        public void testSimpleRequest() throws Exception {
-               KvStateServer server = null;
+               KvStateServerImpl server = null;
                Bootstrap bootstrap = null;
                try {
                        KvStateRegistry registry = new KvStateRegistry();
@@ -96,7 +96,7 @@ public class KvStateServerTest {
                        server = new 
KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
                        server.start();
 
-                       KvStateServerAddress serverAddress = 
server.getAddress();
+                       KvStateServerAddress serverAddress = 
server.getServerAddress();
                        int numKeyGroups = 1;
                        AbstractStateBackend abstractBackend = new 
MemoryStateBackend();
                        DummyEnvironment dummyEnv = new 
DummyEnvironment("test", 1, 0);
@@ -155,25 +155,29 @@ public class KvStateServerTest {
                        long requestId = Integer.MAX_VALUE + 182828L;
 
                        
assertTrue(registryListener.registrationName.equals("vanilla"));
-                       ByteBuf request = 
MessageSerializer.serializeKvStateRequest(
-                                       channel.alloc(),
-                                       requestId,
+
+                       final KvStateInternalRequest request = new 
KvStateInternalRequest(
                                        registryListener.kvStateId,
                                        serializedKeyAndNamespace);
 
-                       channel.writeAndFlush(request);
+                       ByteBuf serializeRequest = 
MessageSerializer.serializeRequest(
+                                       channel.alloc(),
+                                       requestId,
+                                       request);
+
+                       channel.writeAndFlush(serializeRequest);
 
                        ByteBuf buf = responses.poll(TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
 
                        assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-                       KvStateRequestResult response = 
MessageSerializer.deserializeKvStateRequestResult(buf);
+                       assertEquals(requestId, 
MessageSerializer.getRequestId(buf));
+                       KvStateResponse response = 
server.getSerializer().deserializeResponse(buf);
 
-                       assertEquals(requestId, response.getRequestId());
-                       int actualValue = 
KvStateSerializer.deserializeValue(response.getSerializedResult(), 
IntSerializer.INSTANCE);
+                       int actualValue = 
KvStateSerializer.deserializeValue(response.getContent(), 
IntSerializer.INSTANCE);
                        assertEquals(expectedValue, actualValue);
                } finally {
                        if (server != null) {
-                               server.shutDown();
+                               server.shutdown();
                        }
 
                        if (bootstrap != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
new file mode 100644
index 0000000..32a0c9b
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+import org.apache.flink.runtime.query.KvStateID;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MessageSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class MessageSerializerTest {
+
+       private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+       @Parameterized.Parameters
+       public static Collection<Boolean> parameters() {
+               return Arrays.asList(false, true);
+       }
+
+       @Parameterized.Parameter
+       public boolean async;
+
+       /**
+        * Tests request serialization.
+        */
+       @Test
+       public void testRequestSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 1337L;
+               KvStateID kvStateId = new KvStateID();
+               byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeRequest(alloc, 
requestId, request);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(requestId, MessageSerializer.getRequestId(buf));
+               KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
+
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(kvStateId, requestDeser.getKvStateId());
+               assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
+       }
+
+       /**
+        * Tests request serialization with zero-length serialized key and 
namespace.
+        */
+       @Test
+       public void testRequestSerializationWithZeroLengthKeyAndNamespace() 
throws Exception {
+
+               long requestId = Integer.MAX_VALUE + 1337L;
+               KvStateID kvStateId = new KvStateID();
+               byte[] serializedKeyAndNamespace = new byte[0];
+
+               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeRequest(alloc, 
requestId, request);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(requestId, MessageSerializer.getRequestId(buf));
+               KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
+
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(kvStateId, requestDeser.getKvStateId());
+               assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
+       }
+
+       /**
+        * Tests that we don't try to be smart about <code>null</code> key and 
namespace.
+        * They should be treated explicitly.
+        */
+       @Test(expected = NullPointerException.class)
+       public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
+               new KvStateInternalRequest(new KvStateID(), null);
+       }
+
+       /**
+        * Tests response serialization.
+        */
+       @Test
+       public void testResponseSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 72727278L;
+               byte[] serializedResult = randomByteArray(1024);
+
+               final KvStateResponse response = new 
KvStateResponse(serializedResult);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeResponse(alloc, 
requestId, response);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(requestId, MessageSerializer.getRequestId(buf));
+               KvStateResponse responseDeser = 
serializer.deserializeResponse(buf);
+
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertArrayEquals(serializedResult, responseDeser.getContent());
+       }
+
+       /**
+        * Tests response serialization with zero-length serialized result.
+        */
+       @Test
+       public void testResponseSerializationWithZeroLengthSerializedResult() 
throws Exception {
+               byte[] serializedResult = new byte[0];
+
+               final KvStateResponse response = new 
KvStateResponse(serializedResult);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeResponse(alloc, 
72727278L, response);
+
+               int frameLength = buf.readInt();
+
+               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(72727278L, MessageSerializer.getRequestId(buf));
+               KvStateResponse responseDeser = 
serializer.deserializeResponse(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertArrayEquals(serializedResult, responseDeser.getContent());
+       }
+
+       /**
+        * Tests that we don't try to be smart about <code>null</code> results.
+        * They should be treated explicitly.
+        */
+       @Test(expected = NullPointerException.class)
+       public void testNullPointerExceptionOnNullSerializedResult() throws 
Exception {
+               new KvStateResponse((byte[]) null);
+       }
+
+       /**
+        * Tests request failure serialization.
+        */
+       @Test
+       public void testKvStateRequestFailureSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 1111222L;
+               IllegalStateException cause = new 
IllegalStateException("Expected test");
+
+               ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, 
requestId, cause);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               RequestFailure requestFailure = 
MessageSerializer.deserializeRequestFailure(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(requestId, requestFailure.getRequestId());
+               assertEquals(cause.getClass(), 
requestFailure.getCause().getClass());
+               assertEquals(cause.getMessage(), 
requestFailure.getCause().getMessage());
+       }
+
+       /**
+        * Tests server failure serialization.
+        */
+       @Test
+       public void testServerFailureSerialization() throws Exception {
+               IllegalStateException cause = new 
IllegalStateException("Expected test");
+
+               ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, 
cause);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               Throwable request = 
MessageSerializer.deserializeServerFailure(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(cause.getClass(), request.getClass());
+               assertEquals(cause.getMessage(), request.getMessage());
+       }
+
+       private byte[] randomByteArray(int capacity) {
+               byte[] bytes = new byte[capacity];
+               ThreadLocalRandom.current().nextBytes(bytes);
+               return bytes;
+       }
+}

Reply via email to