http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
deleted file mode 100644
index 4023925..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link Client}.
- */
-public class ClientTest {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
-
-       // Thread pool for client bootstrap (shared between tests)
-       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
-
-       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10L, TimeUnit.SECONDS);
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (NIO_GROUP != null) {
-                       NIO_GROUP.shutdownGracefully();
-               }
-       }
-
-       /**
-        * Tests simple queries, of which half succeed and half fail.
-        */
-       @Test
-       public void testSimpleRequests() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               Client<KvStateInternalRequest, KvStateResponse> client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new Client<>("Test Client", 1, serializer, 
stats);
-
-                       // Random result
-                       final byte[] expected = new byte[1024];
-                       ThreadLocalRandom.current().nextBytes(expected);
-
-                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.add((ByteBuf) msg);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       long numQueries = 1024L;
-
-                       List<CompletableFuture<KvStateResponse>> futures = new 
ArrayList<>();
-                       for (long i = 0L; i < numQueries; i++) {
-                               KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
-                               futures.add(client.sendRequest(serverAddress, 
request));
-                       }
-
-                       // Respond to messages
-                       Exception testException = new 
RuntimeException("Expected test Exception");
-
-                       for (long i = 0L; i < numQueries; i++) {
-                               ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               assertNotNull("Receive timed out", buf);
-
-                               Channel ch = channel.get();
-                               assertNotNull("Channel not active", ch);
-
-                               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-                               long requestId = 
MessageSerializer.getRequestId(buf);
-                               KvStateInternalRequest deserRequest = 
serializer.deserializeRequest(buf);
-
-                               buf.release();
-
-                               if (i % 2L == 0L) {
-                                       ByteBuf response = 
MessageSerializer.serializeResponse(
-                                                       serverChannel.alloc(),
-                                                       requestId,
-                                                       new 
KvStateResponse(expected));
-
-                                       ch.writeAndFlush(response);
-                               } else {
-                                       ByteBuf response = 
MessageSerializer.serializeRequestFailure(
-                                                       serverChannel.alloc(),
-                                                       requestId,
-                                                       testException);
-
-                                       ch.writeAndFlush(response);
-                               }
-                       }
-
-                       for (long i = 0L; i < numQueries; i++) {
-
-                               if (i % 2L == 0L) {
-                                       KvStateResponse serializedResult = 
futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                                       assertArrayEquals(expected, 
serializedResult.getContent());
-                               } else {
-                                       try {
-                                               futures.get((int) 
i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                                               fail("Did not throw expected 
Exception");
-                                       } catch (ExecutionException e) {
-
-                                               if (!(e.getCause() instanceof 
RuntimeException)) {
-                                                       fail("Did not throw 
expected Exception");
-                                               }
-                                               // else expected
-                                       }
-                               }
-                       }
-
-                       assertEquals(numQueries, stats.getNumRequests());
-                       long expectedRequests = numQueries / 2L;
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != expectedRequests ||
-                                       stats.getNumFailed() != 
expectedRequests)) {
-                               Thread.sleep(100L);
-                       }
-
-                       assertEquals(expectedRequests, 
stats.getNumSuccessful());
-                       assertEquals(expectedRequests, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutdown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a request to an unavailable host is failed with 
ConnectException.
-        */
-       @Test
-       public void testRequestUnavailableHost() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               Client<KvStateInternalRequest, KvStateResponse> client = null;
-
-               try {
-                       client = new Client<>("Test Client", 1, serializer, 
stats);
-
-                       int availablePort = NetUtils.getAvailablePort();
-
-                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(
-                                       InetAddress.getLocalHost(),
-                                       availablePort);
-
-                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
-                       CompletableFuture<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
-
-                       try {
-                               future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                               fail("Did not throw expected ConnectException");
-                       } catch (ExecutionException e) {
-                               if (!(e.getCause() instanceof 
ConnectException)) {
-                                       fail("Did not throw expected 
ConnectException");
-                               }
-                               // else expected
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutdown();
-                       }
-
-                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Multiple threads concurrently fire queries.
-        */
-       @Test
-       public void testConcurrentQueries() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               ExecutorService executor = null;
-               Client<KvStateInternalRequest, KvStateResponse> client = null;
-               Channel serverChannel = null;
-
-               final byte[] serializedResult = new byte[1024];
-               ThreadLocalRandom.current().nextBytes(serializedResult);
-
-               try {
-                       int numQueryTasks = 4;
-                       final int numQueriesPerTask = 1024;
-
-                       executor = Executors.newFixedThreadPool(numQueryTasks);
-
-                       client = new Client<>("Test Client", 1, serializer, 
stats);
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       ByteBuf buf = (ByteBuf) msg;
-                                       assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-                                       long requestId = 
MessageSerializer.getRequestId(buf);
-                                       KvStateInternalRequest request = 
serializer.deserializeRequest(buf);
-
-                                       buf.release();
-
-                                       KvStateResponse response = new 
KvStateResponse(serializedResult);
-                                       ByteBuf serResponse = 
MessageSerializer.serializeResponse(
-                                                       ctx.alloc(),
-                                                       requestId,
-                                                       response);
-
-                                       
ctx.channel().writeAndFlush(serResponse);
-                               }
-                       });
-
-                       final KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       final Client<KvStateInternalRequest, KvStateResponse> 
finalClient = client;
-                       Callable<List<CompletableFuture<KvStateResponse>>> 
queryTask = () -> {
-                               List<CompletableFuture<KvStateResponse>> 
results = new ArrayList<>(numQueriesPerTask);
-
-                               for (int i = 0; i < numQueriesPerTask; i++) {
-                                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
-                                       
results.add(finalClient.sendRequest(serverAddress, request));
-                               }
-
-                               return results;
-                       };
-
-                       // Submit query tasks
-                       List<Future<List<CompletableFuture<KvStateResponse>>>> 
futures = new ArrayList<>();
-                       for (int i = 0; i < numQueryTasks; i++) {
-                               futures.add(executor.submit(queryTask));
-                       }
-
-                       // Verify results
-                       for (Future<List<CompletableFuture<KvStateResponse>>> 
future : futures) {
-                               List<CompletableFuture<KvStateResponse>> 
results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               for (CompletableFuture<KvStateResponse> result 
: results) {
-                                       KvStateResponse actual = 
result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                                       assertArrayEquals(serializedResult, 
actual.getContent());
-                               }
-                       }
-
-                       int totalQueries = numQueryTasks * numQueriesPerTask;
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
stats.getNumSuccessful() != totalQueries) {
-                               Thread.sleep(100L);
-                       }
-
-                       assertEquals(totalQueries, stats.getNumRequests());
-                       assertEquals(totalQueries, stats.getNumSuccessful());
-               } finally {
-                       if (executor != null) {
-                               executor.shutdown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       if (client != null) {
-                               client.shutdown();
-                       }
-
-                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a server failure closes the connection and removes it from
-        * the established connections.
-        */
-       @Test
-       public void testFailureClosesChannel() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               Client<KvStateInternalRequest, KvStateResponse> client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new Client<>("Test Client", 1, serializer, 
stats);
-
-                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.add((ByteBuf) msg);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       // Requests
-                       List<Future<KvStateResponse>> futures = new 
ArrayList<>();
-                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
-
-                       futures.add(client.sendRequest(serverAddress, request));
-                       futures.add(client.sendRequest(serverAddress, request));
-
-                       ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                       assertNotNull("Receive timed out", buf);
-                       buf.release();
-
-                       buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                       assertNotNull("Receive timed out", buf);
-                       buf.release();
-
-                       assertEquals(1L, stats.getNumConnections());
-
-                       Channel ch = channel.get();
-                       assertNotNull("Channel not active", ch);
-
-                       // Respond with failure
-                       
ch.writeAndFlush(MessageSerializer.serializeServerFailure(
-                                       serverChannel.alloc(),
-                                       new RuntimeException("Expected test 
server failure")));
-
-                       try {
-                               
futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               fail("Did not throw expected server failure");
-                       } catch (ExecutionException e) {
-
-                               if (!(e.getCause() instanceof 
RuntimeException)) {
-                                       fail("Did not throw expected 
Exception");
-                               }
-                               // Expected
-                       }
-
-                       try {
-                               
futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               fail("Did not throw expected server failure");
-                       } catch (ExecutionException e) {
-
-                               if (!(e.getCause() instanceof 
RuntimeException)) {
-                                       fail("Did not throw expected 
Exception");
-                               }
-                               // Expected
-                       }
-
-                       assertEquals(0L, stats.getNumConnections());
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) {
-                               Thread.sleep(100L);
-                       }
-
-                       assertEquals(2L, stats.getNumRequests());
-                       assertEquals(0L, stats.getNumSuccessful());
-                       assertEquals(2L, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutdown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a server channel close, closes the connection and removes 
it
-        * from the established connections.
-        */
-       @Test
-       public void testServerClosesChannel() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               Client<KvStateInternalRequest, KvStateResponse> client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new Client<>("Test Client", 1, serializer, 
stats);
-
-                       final AtomicBoolean received = new AtomicBoolean();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.set(true);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       // Requests
-                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
-                       Future<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
-
-                       while (!received.get() && deadline.hasTimeLeft()) {
-                               Thread.sleep(50L);
-                       }
-                       assertTrue("Receive timed out", received.get());
-
-                       assertEquals(1, stats.getNumConnections());
-
-                       
channel.get().close().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-
-                       try {
-                               future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                               fail("Did not throw expected server failure");
-                       } catch (ExecutionException e) {
-                               if (!(e.getCause() instanceof 
ClosedChannelException)) {
-                                       fail("Did not throw expected 
Exception");
-                               }
-                               // Expected
-                       }
-
-                       assertEquals(0L, stats.getNumConnections());
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) {
-                               Thread.sleep(100L);
-                       }
-
-                       assertEquals(1L, stats.getNumRequests());
-                       assertEquals(0L, stats.getNumSuccessful());
-                       assertEquals(1L, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutdown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0L, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests multiple clients querying multiple servers until 100k queries 
have
-        * been processed. At this point, the client is shut down and its 
verified
-        * that all ongoing requests are failed.
-        */
-       @Test
-       public void testClientServerIntegration() throws Throwable {
-               // Config
-               final int numServers = 2;
-               final int numServerEventLoopThreads = 2;
-               final int numServerQueryThreads = 2;
-
-               final int numClientEventLoopThreads = 4;
-               final int numClientsTasks = 8;
-
-               final int batchSize = 16;
-
-               final int numKeyGroups = 1;
-
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               KvStateRegistry dummyRegistry = new KvStateRegistry();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(dummyRegistry);
-
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
-
-               final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-               AtomicKvStateRequestStats clientStats = new 
AtomicKvStateRequestStats();
-
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               Client<KvStateInternalRequest, KvStateResponse> client = null;
-               ExecutorService clientTaskExecutor = null;
-               final KvStateServerImpl[] server = new 
KvStateServerImpl[numServers];
-
-               try {
-                       client = new Client<>("Test Client", 
numClientEventLoopThreads, serializer, clientStats);
-                       clientTaskExecutor = 
Executors.newFixedThreadPool(numClientsTasks);
-
-                       // Create state
-                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-                       desc.setQueryable("any");
-
-                       // Create servers
-                       KvStateRegistry[] registry = new 
KvStateRegistry[numServers];
-                       AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
-                       final KvStateID[] ids = new KvStateID[numServers];
-
-                       for (int i = 0; i < numServers; i++) {
-                               registry[i] = new KvStateRegistry();
-                               serverStats[i] = new 
AtomicKvStateRequestStats();
-                               server[i] = new KvStateServerImpl(
-                                               InetAddress.getLocalHost(),
-                                               
Collections.singletonList(0).iterator(),
-                                               numServerEventLoopThreads,
-                                               numServerQueryThreads,
-                                               registry[i],
-                                               serverStats[i]);
-
-                               server[i].start();
-
-                               backend.setCurrentKey(1010 + i);
-
-                               // Value per server
-                               ValueState<Integer> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE,
-                                               
VoidNamespaceSerializer.INSTANCE,
-                                               desc);
-
-                               state.update(201 + i);
-
-                               // we know it must be a KvStat but this is not 
exposed to the user via State
-                               InternalKvState<?> kvState = 
(InternalKvState<?>) state;
-
-                               // Register KvState (one state instance for all 
server)
-                               ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
-                       }
-
-                       final Client<KvStateInternalRequest, KvStateResponse> 
finalClient = client;
-                       Callable<Void> queryTask = () -> {
-                               while (true) {
-                                       if (Thread.interrupted()) {
-                                               throw new 
InterruptedException();
-                                       }
-
-                                       // Random server permutation
-                                       List<Integer> random = new 
ArrayList<>();
-                                       for (int j = 0; j < batchSize; j++) {
-                                               random.add(j);
-                                       }
-                                       Collections.shuffle(random);
-
-                                       // Dispatch queries
-                                       List<Future<KvStateResponse>> futures = 
new ArrayList<>(batchSize);
-
-                                       for (int j = 0; j < batchSize; j++) {
-                                               int targetServer = 
random.get(j) % numServers;
-
-                                               byte[] 
serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
-                                                               1010 + 
targetServer,
-                                                               
IntSerializer.INSTANCE,
-                                                               
VoidNamespace.INSTANCE,
-                                                               
VoidNamespaceSerializer.INSTANCE);
-
-                                               KvStateInternalRequest request 
= new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
-                                               
futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), 
request));
-                                       }
-
-                                       // Verify results
-                                       for (int j = 0; j < batchSize; j++) {
-                                               int targetServer = 
random.get(j) % numServers;
-
-                                               Future<KvStateResponse> future 
= futures.get(j);
-                                               byte[] buf = 
future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).getContent();
-                                               int value = 
KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
-                                               assertEquals(201L + 
targetServer, value);
-                                       }
-                               }
-                       };
-
-                       // Submit tasks
-                       List<Future<Void>> taskFutures = new ArrayList<>();
-                       for (int i = 0; i < numClientsTasks; i++) {
-                               
taskFutures.add(clientTaskExecutor.submit(queryTask));
-                       }
-
-                       long numRequests;
-                       while ((numRequests = clientStats.getNumRequests()) < 
100_000L) {
-                               Thread.sleep(100L);
-                               LOG.info("Number of requests {}/100_000", 
numRequests);
-                       }
-
-                       // Shut down
-                       client.shutdown();
-
-                       for (Future<Void> future : taskFutures) {
-                               try {
-                                       future.get();
-                                       fail("Did not throw expected Exception 
after shut down");
-                               } catch (ExecutionException t) {
-                                       if (t.getCause().getCause() instanceof 
ClosedChannelException ||
-                                                       t.getCause().getCause() 
instanceof IllegalStateException) {
-                                               // Expected
-                                       } else {
-                                               t.printStackTrace();
-                                               fail("Failed with unexpected 
Exception type: " + t.getClass().getName());
-                                       }
-                               }
-                       }
-
-                       assertEquals("Connection leak (client)", 0L, 
clientStats.getNumConnections());
-                       for (int i = 0; i < numServers; i++) {
-                               boolean success = false;
-                               int numRetries = 0;
-                               while (!success) {
-                                       try {
-                                               assertEquals("Connection leak 
(server)", 0L, serverStats[i].getNumConnections());
-                                               success = true;
-                                       } catch (Throwable t) {
-                                               if (numRetries < 10) {
-                                                       LOG.info("Retrying 
connection leak check (server)");
-                                                       
Thread.sleep((numRetries + 1) * 50L);
-                                                       numRetries++;
-                                               } else {
-                                                       throw t;
-                                               }
-                                       }
-                               }
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutdown();
-                       }
-
-                       for (int i = 0; i < numServers; i++) {
-                               if (server[i] != null) {
-                                       server[i].shutdown();
-                               }
-                       }
-
-                       if (clientTaskExecutor != null) {
-                               clientTaskExecutor.shutdown();
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private Channel createServerChannel(final ChannelHandler... handlers) 
throws UnknownHostException, InterruptedException {
-               ServerBootstrap bootstrap = new ServerBootstrap()
-                               // Bind address and port
-                               .localAddress(InetAddress.getLocalHost(), 0)
-                               // NIO server channels
-                               .group(NIO_GROUP)
-                               .channel(NioServerSocketChannel.class)
-                               // See initializer for pipeline details
-                               .childHandler(new 
ChannelInitializer<SocketChannel>() {
-                                       @Override
-                                       protected void 
initChannel(SocketChannel ch) throws Exception {
-                                               ch.pipeline()
-                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                                               
.addLast(handlers);
-                                       }
-                               });
-
-               return bootstrap.bind().sync().channel();
-       }
-
-       private KvStateServerAddress getKvStateServerAddress(Channel 
serverChannel) {
-               InetSocketAddress localAddress = (InetSocketAddress) 
serverChannel.localAddress();
-
-               return new KvStateServerAddress(localAddress.getAddress(), 
localAddress.getPort());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
deleted file mode 100644
index cb490aa..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
-
-import org.junit.Test;
-
-import java.nio.channels.ClosedChannelException;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests for {@link ClientHandler}.
- */
-public class KvStateClientHandlerTest {
-
-       /**
-        * Tests that on reads the expected callback methods are called and read
-        * buffers are recycled.
-        */
-       @Test
-       public void testReadCallbacksAndBufferRecycling() throws Exception {
-               final ClientHandlerCallback<KvStateResponse> callback = 
mock(ClientHandlerCallback.class);
-
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-               final EmbeddedChannel channel = new EmbeddedChannel(new 
ClientHandler<>("Test Client", serializer, callback));
-
-               final byte[] content = new byte[0];
-               final KvStateResponse response = new KvStateResponse(content);
-
-               //
-               // Request success
-               //
-               ByteBuf buf = 
MessageSerializer.serializeResponse(channel.alloc(), 1222112277L, response);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, times(1)).onRequestResult(eq(1222112277L), 
any(KvStateResponse.class));
-               assertEquals("Buffer not recycled", 0, buf.refCnt());
-
-               //
-               // Request failure
-               //
-               buf = MessageSerializer.serializeRequestFailure(
-                               channel.alloc(),
-                               1222112278,
-                               new RuntimeException("Expected test 
Exception"));
-               buf.skipBytes(4); // skip frame length
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, times(1)).onRequestFailure(eq(1222112278L), 
any(RuntimeException.class));
-               assertEquals("Buffer not recycled", 0, buf.refCnt());
-
-               //
-               // Server failure
-               //
-               buf = MessageSerializer.serializeServerFailure(
-                               channel.alloc(),
-                               new RuntimeException("Expected test 
Exception"));
-               buf.skipBytes(4); // skip frame length
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, 
times(1)).onFailure(any(RuntimeException.class));
-
-               //
-               // Unexpected messages
-               //
-               buf = channel.alloc().buffer(4).writeInt(1223823);
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, 
times(2)).onFailure(any(IllegalStateException.class));
-               assertEquals("Buffer not recycled", 0, buf.refCnt());
-
-               //
-               // Exception caught
-               //
-               channel.pipeline().fireExceptionCaught(new 
RuntimeException("Expected test Exception"));
-               verify(callback, 
times(3)).onFailure(any(RuntimeException.class));
-
-               //
-               // Channel inactive
-               //
-               channel.pipeline().fireChannelInactive();
-               verify(callback, 
times(4)).onFailure(any(ClosedChannelException.class));
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
deleted file mode 100644
index 217d0b5..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ /dev/null
@@ -1,758 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException;
-import org.apache.flink.queryablestate.UnknownKvStateIdException;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.network.messages.RequestFailure;
-import org.apache.flink.queryablestate.server.KvStateServerHandler;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link KvStateServerHandler}.
- */
-public class KvStateServerHandlerTest extends TestLogger {
-
-       private static KvStateServerImpl testServer;
-
-       private static final long READ_TIMEOUT_MILLIS = 10000L;
-
-       @BeforeClass
-       public static void setup() {
-               try {
-                       testServer = new KvStateServerImpl(
-                                       InetAddress.getLocalHost(),
-                                       Collections.singletonList(0).iterator(),
-                                       1,
-                                       1,
-                                       new KvStateRegistry(),
-                                       new DisabledKvStateRequestStats());
-                       testServer.start();
-               } catch (Throwable e) {
-                       e.printStackTrace();
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               testServer.shutdown();
-       }
-
-       /**
-        * Tests a simple successful query via an EmbeddedChannel.
-        */
-       @Test
-       public void testSimpleQuery() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Update the KvState and request it
-               int expectedValue = 712828289;
-
-               int key = 99812822;
-               backend.setCurrentKey(key);
-               ValueState<Integer> state = backend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
-
-               state.update(expectedValue);
-
-               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                               key,
-                               IntSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE);
-
-               long requestId = Integer.MAX_VALUE + 182828L;
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               KvStateInternalRequest request = new KvStateInternalRequest(
-                               registryListener.kvStateId, 
serializedKeyAndNamespace);
-
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-               long deserRequestId = MessageSerializer.getRequestId(buf);
-               KvStateResponse response = serializer.deserializeResponse(buf);
-
-               assertEquals(requestId, deserRequestId);
-
-               int actualValue = 
KvStateSerializer.deserializeValue(response.getContent(), 
IntSerializer.INSTANCE);
-               assertEquals(expectedValue, actualValue);
-
-               assertEquals(stats.toString(), 1, stats.getNumRequests());
-
-               // Wait for async successful request report
-               long deadline = System.nanoTime() + 
TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
-               while (stats.getNumSuccessful() != 1L && System.nanoTime() <= 
deadline) {
-                       Thread.sleep(10L);
-               }
-
-               assertEquals(stats.toString(), 1L, stats.getNumSuccessful());
-       }
-
-       /**
-        * Tests the failure response with {@link UnknownKvStateIdException} as 
cause on
-        * queries for unregistered KvStateIDs.
-        */
-       @Test
-       public void testQueryUnknownKvStateID() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               long requestId = Integer.MAX_VALUE + 182828L;
-
-               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0]);
-
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
-
-               assertEquals(requestId, response.getRequestId());
-
-               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKvStateIdException);
-
-               assertEquals(1L, stats.getNumRequests());
-               assertEquals(1L, stats.getNumFailed());
-       }
-
-       /**
-        * Tests the failure response with {@link 
UnknownKeyOrNamespaceException} as cause
-        * on queries for non-existing keys.
-        */
-       @Test
-       public void testQueryUnknownKey() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
-
-               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                               1238283,
-                               IntSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE);
-
-               long requestId = Integer.MAX_VALUE + 22982L;
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
-
-               assertEquals(requestId, response.getRequestId());
-
-               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKeyOrNamespaceException);
-
-               assertEquals(1L, stats.getNumRequests());
-               assertEquals(1L, stats.getNumFailed());
-       }
-
-       /**
-        * Tests the failure response on a failure on the {@link 
InternalKvState#getSerializedValue(byte[])} call.
-        */
-       @Test
-       public void testFailureOnGetSerializedValue() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               // Failing KvState
-               InternalKvState<?> kvState = mock(InternalKvState.class);
-               when(kvState.getSerializedValue(any(byte[].class)))
-                               .thenThrow(new RuntimeException("Expected test 
Exception"));
-
-               KvStateID kvStateId = registry.registerKvState(
-                               new JobID(),
-                               new JobVertexID(),
-                               new KeyGroupRange(0, 0),
-                               "vanilla",
-                               kvState);
-
-               KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, new byte[0]);
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
-
-               assertTrue(response.getCause().getMessage().contains("Expected 
test Exception"));
-
-               assertEquals(1L, stats.getNumRequests());
-               assertEquals(1L, stats.getNumFailed());
-       }
-
-       /**
-        * Tests that the channel is closed if an Exception reaches the channel 
handler.
-        */
-       @Test
-       public void testCloseChannelOnExceptionCaught() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new EmbeddedChannel(handler);
-
-               channel.pipeline().fireExceptionCaught(new 
RuntimeException("Expected test Exception"));
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               Throwable response = 
MessageSerializer.deserializeServerFailure(buf);
-
-               assertTrue(response.getMessage().contains("Expected test 
Exception"));
-
-               channel.closeFuture().await(READ_TIMEOUT_MILLIS);
-               assertFalse(channel.isActive());
-       }
-
-       /**
-        * Tests the failure response on a rejected execution, because the 
query executor has been closed.
-        */
-       @Test
-       public void testQueryExecutorShutDown() throws Throwable {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerImpl localTestServer = new KvStateServerImpl(
-                               InetAddress.getLocalHost(),
-                               Collections.singletonList(0).iterator(),
-                               1,
-                               1,
-                               new KvStateRegistry(),
-                               new DisabledKvStateRequestStats());
-
-               localTestServer.start();
-               localTestServer.shutdown();
-               assertTrue(localTestServer.isExecutorShutdown());
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(localTestServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
-
-               
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
-
-               assertEquals(1L, stats.getNumRequests());
-               assertEquals(1L, stats.getNumFailed());
-
-               localTestServer.shutdown();
-       }
-
-       /**
-        * Tests response on unexpected messages.
-        */
-       @Test
-       public void testUnexpectedMessage() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               // Write the request and wait for the response
-               ByteBuf unexpectedMessage = Unpooled.buffer(8);
-               unexpectedMessage.writeInt(4);
-               unexpectedMessage.writeInt(123238213);
-
-               channel.writeInbound(unexpectedMessage);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               Throwable response = 
MessageSerializer.deserializeServerFailure(buf);
-
-               assertEquals(0L, stats.getNumRequests());
-               assertEquals(0L, stats.getNumFailed());
-
-               KvStateResponse stateResponse = new KvStateResponse(new 
byte[0]);
-               unexpectedMessage = 
MessageSerializer.serializeResponse(channel.alloc(), 192L, stateResponse);
-
-               channel.writeInbound(unexpectedMessage);
-
-               buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               response = MessageSerializer.deserializeServerFailure(buf);
-
-               assertTrue("Unexpected failure cause " + 
response.getClass().getName(), response instanceof IllegalArgumentException);
-
-               assertEquals(0L, stats.getNumRequests());
-               assertEquals(0L, stats.getNumFailed());
-       }
-
-       /**
-        * Tests that incoming buffer instances are recycled.
-        */
-       @Test
-       public void testIncomingBufferIsRecycled() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0]);
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
-
-               assertEquals(1L, serRequest.refCnt());
-
-               // Write regular request
-               channel.writeInbound(serRequest);
-               assertEquals("Buffer not recycled", 0L, serRequest.refCnt());
-
-               // Write unexpected msg
-               ByteBuf unexpected = channel.alloc().buffer(8);
-               unexpected.writeInt(4);
-               unexpected.writeInt(4);
-
-               assertEquals(1L, unexpected.refCnt());
-
-               channel.writeInbound(unexpected);
-               assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
-       }
-
-       /**
-        * Tests the failure response if the serializers don't match.
-        */
-       @Test
-       public void testSerializerMismatch() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               ValueState<Integer> state = backend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
-
-               int key = 99812822;
-
-               // Update the KvState
-               backend.setCurrentKey(key);
-               state.update(712828289);
-
-               byte[] wrongKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                               "wrong-key-type",
-                               StringSerializer.INSTANCE,
-                               "wrong-namespace-type",
-                               StringSerializer.INSTANCE);
-
-               byte[] wrongNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                               key,
-                               IntSerializer.INSTANCE,
-                               "wrong-namespace-type",
-                               StringSerializer.INSTANCE);
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 182828L, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
-               assertEquals(182828L, response.getRequestId());
-               
assertTrue(response.getCause().getMessage().contains("IOException"));
-
-               // Repeat with wrong namespace only
-               request = new 
KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
-               serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 182829L, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               response = MessageSerializer.deserializeRequestFailure(buf);
-               assertEquals(182829L, response.getRequestId());
-               
assertTrue(response.getCause().getMessage().contains("IOException"));
-
-               assertEquals(2L, stats.getNumRequests());
-               assertEquals(2L, stats.getNumFailed());
-       }
-
-       /**
-        * Tests that large responses are chunked.
-        */
-       @Test
-       public void testChunkedResponse() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               KvStateRequestStats stats = new AtomicKvStateRequestStats();
-
-               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<byte[]> desc = new 
ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               ValueState<byte[]> state = backend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
-
-               // Update KvState
-               byte[] bytes = new byte[2 * 
channel.config().getWriteBufferHighWaterMark()];
-
-               byte current = 0;
-               for (int i = 0; i < bytes.length; i++) {
-                       bytes[i] = current++;
-               }
-
-               int key = 99812822;
-               backend.setCurrentKey(key);
-               state.update(bytes);
-
-               // Request
-               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                               key,
-                               IntSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE);
-
-               long requestId = Integer.MAX_VALUE + 182828L;
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
-               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
-
-               // Write the request and wait for the response
-               channel.writeInbound(serRequest);
-
-               Object msg = readInboundBlocking(channel);
-               assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Queries the embedded channel for data.
-        */
-       private Object readInboundBlocking(EmbeddedChannel channel) throws 
InterruptedException, TimeoutException {
-               final long sleepMillis = 50L;
-
-               long sleptMillis = 0L;
-
-               Object msg = null;
-               while (sleptMillis < READ_TIMEOUT_MILLIS &&
-                               (msg = channel.readOutbound()) == null) {
-
-                       Thread.sleep(sleepMillis);
-                       sleptMillis += sleepMillis;
-               }
-
-               if (msg == null) {
-                       throw new TimeoutException();
-               } else {
-                       return msg;
-               }
-       }
-
-       /**
-        * Frame length decoder (expected by the serialized messages).
-        */
-       private ChannelHandler getFrameDecoder() {
-               return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 
4, 0, 4);
-       }
-
-       /**
-        * A listener that keeps the last updated KvState information so that a 
test
-        * can retrieve it.
-        */
-       static class TestRegistryListener implements KvStateRegistryListener {
-               volatile JobVertexID jobVertexID;
-               volatile KeyGroupRange keyGroupIndex;
-               volatile String registrationName;
-               volatile KvStateID kvStateId;
-
-               @Override
-               public void notifyKvStateRegistered(JobID jobId,
-                               JobVertexID jobVertexId,
-                               KeyGroupRange keyGroupRange,
-                               String registrationName,
-                               KvStateID kvStateId) {
-                       this.jobVertexID = jobVertexId;
-                       this.keyGroupIndex = keyGroupRange;
-                       this.registrationName = registrationName;
-                       this.kvStateId = kvStateId;
-               }
-
-               @Override
-               public void notifyKvStateUnregistered(JobID jobId,
-                               JobVertexID jobVertexId,
-                               KeyGroupRange keyGroupRange,
-                               String registrationName) {
-
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
deleted file mode 100644
index 7abc84e..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link KvStateServerImpl}.
- */
-public class KvStateServerTest {
-
-       // Thread pool for client bootstrap (shared between tests)
-       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
-
-       private static final int TIMEOUT_MILLIS = 10000;
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (NIO_GROUP != null) {
-                       NIO_GROUP.shutdownGracefully();
-               }
-       }
-
-       /**
-        * Tests a simple successful query via a SocketChannel.
-        */
-       @Test
-       public void testSimpleRequest() throws Throwable {
-               KvStateServerImpl server = null;
-               Bootstrap bootstrap = null;
-               try {
-                       KvStateRegistry registry = new KvStateRegistry();
-                       KvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-                       server = new 
KvStateServerImpl(InetAddress.getLocalHost(),
-                                       
Collections.singletonList(0).iterator(), 1, 1, registry, stats);
-                       server.start();
-
-                       KvStateServerAddress serverAddress = 
server.getServerAddress();
-                       int numKeyGroups = 1;
-                       AbstractStateBackend abstractBackend = new 
MemoryStateBackend();
-                       DummyEnvironment dummyEnv = new 
DummyEnvironment("test", 1, 0);
-                       dummyEnv.setKvStateRegistry(registry);
-                       AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                                       dummyEnv,
-                                       new JobID(),
-                                       "test_op",
-                                       IntSerializer.INSTANCE,
-                                       numKeyGroups,
-                                       new KeyGroupRange(0, 0),
-                                       registry.createTaskRegistry(new 
JobID(), new JobVertexID()));
-
-                       final KvStateServerHandlerTest.TestRegistryListener 
registryListener =
-                                       new 
KvStateServerHandlerTest.TestRegistryListener();
-
-                       registry.registerListener(registryListener);
-
-                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-                       desc.setQueryable("vanilla");
-
-                       ValueState<Integer> state = backend.getPartitionedState(
-                                       VoidNamespace.INSTANCE,
-                                       VoidNamespaceSerializer.INSTANCE,
-                                       desc);
-
-                       // Update KvState
-                       int expectedValue = 712828289;
-
-                       int key = 99812822;
-                       backend.setCurrentKey(key);
-                       state.update(expectedValue);
-
-                       // Request
-                       byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                                       key,
-                                       IntSerializer.INSTANCE,
-                                       VoidNamespace.INSTANCE,
-                                       VoidNamespaceSerializer.INSTANCE);
-
-                       // Connect to the server
-                       final BlockingQueue<ByteBuf> responses = new 
LinkedBlockingQueue<>();
-                       bootstrap = createBootstrap(
-                                       new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
-                                       new ChannelInboundHandlerAdapter() {
-                                               @Override
-                                               public void 
channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-                                                       responses.add((ByteBuf) 
msg);
-                                               }
-                                       });
-
-                       Channel channel = bootstrap
-                                       .connect(serverAddress.getHost(), 
serverAddress.getPort())
-                                       .sync().channel();
-
-                       long requestId = Integer.MAX_VALUE + 182828L;
-
-                       
assertTrue(registryListener.registrationName.equals("vanilla"));
-
-                       final KvStateInternalRequest request = new 
KvStateInternalRequest(
-                                       registryListener.kvStateId,
-                                       serializedKeyAndNamespace);
-
-                       ByteBuf serializeRequest = 
MessageSerializer.serializeRequest(
-                                       channel.alloc(),
-                                       requestId,
-                                       request);
-
-                       channel.writeAndFlush(serializeRequest);
-
-                       ByteBuf buf = responses.poll(TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
-
-                       assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-                       assertEquals(requestId, 
MessageSerializer.getRequestId(buf));
-                       KvStateResponse response = 
server.getSerializer().deserializeResponse(buf);
-
-                       int actualValue = 
KvStateSerializer.deserializeValue(response.getContent(), 
IntSerializer.INSTANCE);
-                       assertEquals(expectedValue, actualValue);
-               } finally {
-                       if (server != null) {
-                               server.shutdown();
-                       }
-
-                       if (bootstrap != null) {
-                               EventLoopGroup group = bootstrap.group();
-                               if (group != null) {
-                                       group.shutdownGracefully();
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Creates a client bootstrap.
-        */
-       private Bootstrap createBootstrap(final ChannelHandler... handlers) {
-               return new 
Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class)
-                               .handler(new 
ChannelInitializer<SocketChannel>() {
-                                       @Override
-                                       protected void 
initChannel(SocketChannel ch) throws Exception {
-                                               ch.pipeline().addLast(handlers);
-                                       }
-                               });
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
deleted file mode 100644
index 32a0c9b..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
-import org.apache.flink.queryablestate.messages.KvStateResponse;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.network.messages.RequestFailure;
-import org.apache.flink.runtime.query.KvStateID;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link MessageSerializer}.
- */
-@RunWith(Parameterized.class)
-public class MessageSerializerTest {
-
-       private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
-
-       @Parameterized.Parameters
-       public static Collection<Boolean> parameters() {
-               return Arrays.asList(false, true);
-       }
-
-       @Parameterized.Parameter
-       public boolean async;
-
-       /**
-        * Tests request serialization.
-        */
-       @Test
-       public void testRequestSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 1337L;
-               KvStateID kvStateId = new KvStateID();
-               byte[] serializedKeyAndNamespace = randomByteArray(1024);
-
-               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               ByteBuf buf = MessageSerializer.serializeRequest(alloc, 
requestId, request);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-               assertEquals(requestId, MessageSerializer.getRequestId(buf));
-               KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
-
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(kvStateId, requestDeser.getKvStateId());
-               assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
-       }
-
-       /**
-        * Tests request serialization with zero-length serialized key and 
namespace.
-        */
-       @Test
-       public void testRequestSerializationWithZeroLengthKeyAndNamespace() 
throws Exception {
-
-               long requestId = Integer.MAX_VALUE + 1337L;
-               KvStateID kvStateId = new KvStateID();
-               byte[] serializedKeyAndNamespace = new byte[0];
-
-               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               ByteBuf buf = MessageSerializer.serializeRequest(alloc, 
requestId, request);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
-               assertEquals(requestId, MessageSerializer.getRequestId(buf));
-               KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
-
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(kvStateId, requestDeser.getKvStateId());
-               assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
-       }
-
-       /**
-        * Tests that we don't try to be smart about <code>null</code> key and 
namespace.
-        * They should be treated explicitly.
-        */
-       @Test(expected = NullPointerException.class)
-       public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
-               new KvStateInternalRequest(new KvStateID(), null);
-       }
-
-       /**
-        * Tests response serialization.
-        */
-       @Test
-       public void testResponseSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 72727278L;
-               byte[] serializedResult = randomByteArray(1024);
-
-               final KvStateResponse response = new 
KvStateResponse(serializedResult);
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               ByteBuf buf = MessageSerializer.serializeResponse(alloc, 
requestId, response);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-               assertEquals(requestId, MessageSerializer.getRequestId(buf));
-               KvStateResponse responseDeser = 
serializer.deserializeResponse(buf);
-
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertArrayEquals(serializedResult, responseDeser.getContent());
-       }
-
-       /**
-        * Tests response serialization with zero-length serialized result.
-        */
-       @Test
-       public void testResponseSerializationWithZeroLengthSerializedResult() 
throws Exception {
-               byte[] serializedResult = new byte[0];
-
-               final KvStateResponse response = new 
KvStateResponse(serializedResult);
-               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
-                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
-
-               ByteBuf buf = MessageSerializer.serializeResponse(alloc, 
72727278L, response);
-
-               int frameLength = buf.readInt();
-
-               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
-               assertEquals(72727278L, MessageSerializer.getRequestId(buf));
-               KvStateResponse responseDeser = 
serializer.deserializeResponse(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertArrayEquals(serializedResult, responseDeser.getContent());
-       }
-
-       /**
-        * Tests that we don't try to be smart about <code>null</code> results.
-        * They should be treated explicitly.
-        */
-       @Test(expected = NullPointerException.class)
-       public void testNullPointerExceptionOnNullSerializedResult() throws 
Exception {
-               new KvStateResponse((byte[]) null);
-       }
-
-       /**
-        * Tests request failure serialization.
-        */
-       @Test
-       public void testKvStateRequestFailureSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 1111222L;
-               IllegalStateException cause = new 
IllegalStateException("Expected test");
-
-               ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, 
requestId, cause);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               RequestFailure requestFailure = 
MessageSerializer.deserializeRequestFailure(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(requestId, requestFailure.getRequestId());
-               assertEquals(cause.getClass(), 
requestFailure.getCause().getClass());
-               assertEquals(cause.getMessage(), 
requestFailure.getCause().getMessage());
-       }
-
-       /**
-        * Tests server failure serialization.
-        */
-       @Test
-       public void testServerFailureSerialization() throws Exception {
-               IllegalStateException cause = new 
IllegalStateException("Expected test");
-
-               ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, 
cause);
-
-               int frameLength = buf.readInt();
-               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
-               Throwable request = 
MessageSerializer.deserializeServerFailure(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(cause.getClass(), request.getClass());
-               assertEquals(cause.getMessage(), request.getMessage());
-       }
-
-       private byte[] randomByteArray(int capacity) {
-               byte[] bytes = new byte[capacity];
-               ThreadLocalRandom.current().nextBytes(bytes);
-               return bytes;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
deleted file mode 100644
index 2e05f61..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableAggregatingStateTest}.
- */
-public class ImmutableAggregatingStateTest {
-
-       private final AggregatingStateDescriptor<Long, String, String> 
aggrStateDesc =
-                       new AggregatingStateDescriptor<>(
-                                       "test",
-                                       new SumAggr(),
-                                       String.class);
-
-       private ImmutableAggregatingState<Long, String> aggrState;
-
-       @Before
-       public void setUp() throws Exception {
-               if (!aggrStateDesc.isSerializerInitialized()) {
-                       aggrStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
-
-               final String initValue = "42";
-
-               ByteArrayOutputStream out = new ByteArrayOutputStream();
-               aggrStateDesc.getSerializer().serialize(initValue, new 
DataOutputViewStreamWrapper(out));
-
-               aggrState = ImmutableAggregatingState.createState(
-                               aggrStateDesc,
-                               out.toByteArray()
-               );
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testUpdate() {
-               String value = aggrState.get();
-               assertEquals("42", value);
-
-               aggrState.add(54L);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testClear() {
-               String value = aggrState.get();
-               assertEquals("42", value);
-
-               aggrState.clear();
-       }
-
-       /**
-        * Test {@link AggregateFunction} concatenating the already stored 
string with the long passed as argument.
-        */
-       private static class SumAggr implements AggregateFunction<Long, String, 
String> {
-
-               private static final long serialVersionUID = 
-6249227626701264599L;
-
-               @Override
-               public String createAccumulator() {
-                       return "";
-               }
-
-               @Override
-               public String add(Long value, String accumulator) {
-                       accumulator += ", " + value;
-                       return accumulator;
-               }
-
-               @Override
-               public String getResult(String accumulator) {
-                       return accumulator;
-               }
-
-               @Override
-               public String merge(String a, String b) {
-                       return a + ", " + b;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
deleted file mode 100644
index d2c9535..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableFoldingState}.
- */
-public class ImmutableFoldingStateTest {
-
-       private final FoldingStateDescriptor<Long, String> foldingStateDesc =
-                       new FoldingStateDescriptor<>(
-                                       "test",
-                                       "0",
-                                       new SumFold(),
-                                       StringSerializer.INSTANCE);
-
-       private ImmutableFoldingState<Long, String> foldingState;
-
-       @Before
-       public void setUp() throws Exception {
-               if (!foldingStateDesc.isSerializerInitialized()) {
-                       foldingStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
-
-               ByteArrayOutputStream out = new ByteArrayOutputStream();
-               StringSerializer.INSTANCE.serialize("42", new 
DataOutputViewStreamWrapper(out));
-
-               foldingState = ImmutableFoldingState.createState(
-                               foldingStateDesc,
-                               out.toByteArray()
-               );
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testUpdate() {
-               String value = foldingState.get();
-               assertEquals("42", value);
-
-               foldingState.add(54L);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testClear() {
-               String value = foldingState.get();
-               assertEquals("42", value);
-
-               foldingState.clear();
-       }
-
-       /**
-        * Test {@link FoldFunction} concatenating the already stored string 
with the long passed as argument.
-        */
-       private static class SumFold implements FoldFunction<Long, String> {
-
-               private static final long serialVersionUID = 
-6249227626701264599L;
-
-               @Override
-               public String fold(String accumulator, Long value) throws 
Exception {
-                       long acc = Long.valueOf(accumulator);
-                       acc += value;
-                       return Long.toString(acc);
-               }
-       }
-}

Reply via email to