http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
deleted file mode 100644
index afd9e46..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateClient;
-import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.query.netty.UnknownKvStateID;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.heap.HeapValueState;
-import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.MathUtils;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link QueryableStateClient}.
- */
-public class QueryableStateClientTest {
-
-       private static final ActorSystem testActorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
-
-       private static final FiniteDuration timeout = new FiniteDuration(100, 
TimeUnit.SECONDS);
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (testActorSystem != null) {
-                       testActorSystem.shutdown();
-               }
-       }
-
-       /**
-        * All failures should lead to a retry with a forced location lookup.
-        *
-        * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, 
UnknownKvStateLocation,
-        * ConnectException are checked explicitly as these indicate out-of-sync
-        * KvStateLocation.
-        */
-       @Test
-       public void testForceLookupOnOutdatedLocation() throws Exception {
-               KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
-               KvStateClient networkClient = mock(KvStateClient.class);
-
-               QueryableStateClient client = new QueryableStateClient(
-                               lookupService,
-                               networkClient,
-                               testActorSystem.dispatcher());
-
-               try {
-                       JobID jobId = new JobID();
-                       int numKeyGroups = 4;
-
-                       //
-                       // UnknownKvStateLocation
-                       //
-                       String query1 = "lucky";
-
-                       Future<KvStateLocation> unknownKvStateLocation = 
Futures.failed(
-                                       new UnknownKvStateLocation(query1));
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query1)))
-                                       .thenReturn(unknownKvStateLocation);
-
-                       Future<Integer> result = client.getKvState(
-                                       jobId,
-                                       query1,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected 
UnknownKvStateLocation exception");
-                       } catch (UnknownKvStateLocation ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query1));
-
-                       //
-                       // UnknownKvStateKeyGroupLocation
-                       //
-                       String query2 = "unlucky";
-
-                       Future<KvStateLocation> unknownKeyGroupLocation = 
Futures.successful(
-                                       new KvStateLocation(jobId, new 
JobVertexID(), numKeyGroups, query2));
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query2)))
-                                       .thenReturn(unknownKeyGroupLocation);
-
-                       result = client.getKvState(
-                                       jobId,
-                                       query2,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected 
UnknownKvStateKeyGroupLocation exception");
-                       } catch (UnknownKvStateKeyGroupLocation ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query2));
-
-                       //
-                       // UnknownKvStateID
-                       //
-                       String query3 = "water";
-                       KvStateID kvStateId = new KvStateID();
-                       Future<byte[]> unknownKvStateId = Futures.failed(new 
UnknownKvStateID(kvStateId));
-
-                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(InetAddress.getLocalHost(), 12323);
-                       KvStateLocation location = new KvStateLocation(jobId, 
new JobVertexID(), numKeyGroups, query3);
-                       for (int i = 0; i < numKeyGroups; i++) {
-                               location.registerKvState(new KeyGroupRange(i, 
i), kvStateId, serverAddress);
-                       }
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query3)))
-                                       
.thenReturn(Futures.successful(location));
-
-                       when(networkClient.getKvState(eq(serverAddress), 
eq(kvStateId), any(byte[].class)))
-                                       .thenReturn(unknownKvStateId);
-
-                       result = client.getKvState(
-                                       jobId,
-                                       query3,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected UnknownKvStateID 
exception");
-                       } catch (UnknownKvStateID ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query3));
-
-                       //
-                       // ConnectException
-                       //
-                       String query4 = "space";
-                       Future<byte[]> connectException = Futures.failed(new 
ConnectException());
-                       kvStateId = new KvStateID();
-
-                       serverAddress = new 
KvStateServerAddress(InetAddress.getLocalHost(), 11123);
-                       location = new KvStateLocation(jobId, new 
JobVertexID(), numKeyGroups, query4);
-                       for (int i = 0; i < numKeyGroups; i++) {
-                               location.registerKvState(new KeyGroupRange(i, 
i), kvStateId, serverAddress);
-                       }
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query4)))
-                                       
.thenReturn(Futures.successful(location));
-
-                       when(networkClient.getKvState(eq(serverAddress), 
eq(kvStateId), any(byte[].class)))
-                                       .thenReturn(connectException);
-
-                       result = client.getKvState(
-                                       jobId,
-                                       query4,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected ConnectException 
exception");
-                       } catch (ConnectException ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query4));
-
-                       //
-                       // Other Exceptions don't lead to a retry no retry
-                       //
-                       String query5 = "universe";
-                       Future<KvStateLocation> exception = Futures.failed(new 
RuntimeException("Test exception"));
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query5)))
-                                       .thenReturn(exception);
-
-                       client.getKvState(
-                                       jobId,
-                                       query5,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
-               } finally {
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Tests queries against multiple servers.
-        *
-        * <p>The servers are populated with different keys and the client 
queries
-        * all available keys from all servers.
-        */
-       @Test
-       public void testIntegrationWithKvStateServer() throws Exception {
-               // Config
-               int numServers = 2;
-               int numKeys = 1024;
-               int numKeyGroups = 1;
-
-               JobID jobId = new JobID();
-               JobVertexID jobVertexId = new JobVertexID();
-
-               KvStateServer[] servers = new KvStateServer[numServers];
-               AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
-
-               QueryableStateClient client = null;
-               KvStateClient networkClient = null;
-               AtomicKvStateRequestStats networkClientStats = new 
AtomicKvStateRequestStats();
-
-               MemoryStateBackend backend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-
-               AbstractKeyedStateBackend<Integer> keyedStateBackend = 
backend.createKeyedStateBackend(dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
-
-               try {
-                       KvStateRegistry[] registries = new 
KvStateRegistry[numServers];
-                       KvStateID[] kvStateIds = new KvStateID[numServers];
-                       List<HeapValueState<Integer, VoidNamespace, Integer>> 
kvStates = new ArrayList<>();
-
-                       // Start the servers
-                       for (int i = 0; i < numServers; i++) {
-                               registries[i] = new KvStateRegistry();
-                               serverStats[i] = new 
AtomicKvStateRequestStats();
-                               servers[i] = new 
KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], 
serverStats[i]);
-                               servers[i].start();
-                               ValueStateDescriptor<Integer> descriptor =
-                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-
-                               
RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> 
registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
-                                               descriptor.getType(),
-                                               descriptor.getName(),
-                                               
VoidNamespaceSerializer.INSTANCE,
-                                               IntSerializer.INSTANCE);
-
-                               // Register state
-                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = new HeapValueState<>(
-                                               descriptor,
-                                               new 
NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
-                                               IntSerializer.INSTANCE,
-                                               
VoidNamespaceSerializer.INSTANCE);
-
-                               kvStates.add(kvState);
-
-                               kvStateIds[i] = registries[i].registerKvState(
-                                               jobId,
-                                               new JobVertexID(),
-                                               new KeyGroupRange(i, i),
-                                               "choco",
-                                               kvState);
-                       }
-
-                       int[] expectedRequests = new int[numServers];
-
-                       for (int key = 0; key < numKeys; key++) {
-                               int targetKeyGroupIndex = 
MathUtils.murmurHash(key) % numServers;
-                               expectedRequests[targetKeyGroupIndex]++;
-
-                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = kvStates.get(targetKeyGroupIndex);
-
-                               keyedStateBackend.setCurrentKey(key);
-                               
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
-                               kvState.update(1337 + key);
-                       }
-
-                       // Location lookup service
-                       KvStateLocation location = new KvStateLocation(jobId, 
jobVertexId, numServers, "choco");
-                       for (int keyGroupIndex = 0; keyGroupIndex < numServers; 
keyGroupIndex++) {
-                               location.registerKvState(new 
KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], 
servers[keyGroupIndex].getAddress());
-                       }
-
-                       KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq("choco")))
-                                       
.thenReturn(Futures.successful(location));
-
-                       // The client
-                       networkClient = new KvStateClient(1, 
networkClientStats);
-
-                       client = new QueryableStateClient(lookupService, 
networkClient, testActorSystem.dispatcher());
-
-                       // Send all queries
-                       List<Future<Integer>> futures = new 
ArrayList<>(numKeys);
-                       for (int key = 0; key < numKeys; key++) {
-                               ValueStateDescriptor<Integer> descriptor =
-                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-                               futures.add(client.getKvState(
-                                               jobId,
-                                               "choco",
-                                               key,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               descriptor));
-                       }
-
-                       // Verify results
-                       Future<Iterable<Integer>> future = 
Futures.sequence(futures, testActorSystem.dispatcher());
-                       Iterable<Integer> results = Await.result(future, 
timeout);
-
-                       int index = 0;
-                       for (int buffer : results) {
-                               assertEquals(1337 + index, buffer);
-                               index++;
-                       }
-
-                       // Verify requests
-                       for (int i = 0; i < numServers; i++) {
-                               int numRetries = 10;
-                               for (int retry = 0; retry < numRetries; 
retry++) {
-                                       try {
-                                               assertEquals("Unexpected number 
of requests", expectedRequests[i], serverStats[i].getNumRequests());
-                                               assertEquals("Unexpected 
success requests", expectedRequests[i], serverStats[i].getNumSuccessful());
-                                               assertEquals("Unexpected failed 
requests", 0, serverStats[i].getNumFailed());
-                                               break;
-                                       } catch (Throwable t) {
-                                               // Retry
-                                               if (retry == numRetries - 1) {
-                                                       throw t;
-                                               } else {
-                                                       Thread.sleep(100);
-                                               }
-                                       }
-                               }
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (networkClient != null) {
-                               networkClient.shutDown();
-                       }
-
-                       for (KvStateServer server : servers) {
-                               if (server != null) {
-                                       server.shutDown();
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Tests that the QueryableState client correctly caches location 
lookups
-        * keyed by both job and name. This test is mainly due to a previous 
bug due
-        * to which cache entries were by name only. This is a problem, because 
the
-        * same client can be used to query multiple jobs.
-        */
-       @Test
-       public void testLookupMultipleJobIds() throws Exception {
-               String name = "unique-per-job";
-
-               // Exact contents don't matter here
-               KvStateLocation location = new KvStateLocation(new JobID(), new 
JobVertexID(), 1, name);
-               location.registerKvState(new KeyGroupRange(0, 0), new 
KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
-
-               JobID jobId1 = new JobID();
-               JobID jobId2 = new JobID();
-
-               KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
-
-               when(lookupService.getKvStateLookupInfo(any(JobID.class), 
anyString()))
-                               .thenReturn(Futures.successful(location));
-
-               KvStateClient networkClient = mock(KvStateClient.class);
-               when(networkClient.getKvState(any(KvStateServerAddress.class), 
any(KvStateID.class), any(byte[].class)))
-                               .thenReturn(Futures.successful(new byte[0]));
-
-               QueryableStateClient client = new QueryableStateClient(
-                               lookupService,
-                               networkClient,
-                               testActorSystem.dispatcher());
-
-               ValueStateDescriptor<Integer> stateDesc = new 
ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
-
-               // Query ies with same name, but different job IDs should lead 
to a
-               // single lookup per query and job ID.
-               client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, 
stateDesc);
-               client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, 
stateDesc);
-
-               verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
-               verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java
deleted file mode 100644
index 1e41236..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
-
-import org.junit.Test;
-
-import java.nio.channels.ClosedChannelException;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests for {@link KvStateClientHandler}.
- */
-public class KvStateClientHandlerTest {
-
-       /**
-        * Tests that on reads the expected callback methods are called and read
-        * buffers are recycled.
-        */
-       @Test
-       public void testReadCallbacksAndBufferRecycling() throws Exception {
-               KvStateClientHandlerCallback callback = 
mock(KvStateClientHandlerCallback.class);
-
-               EmbeddedChannel channel = new EmbeddedChannel(new 
KvStateClientHandler(callback));
-
-               //
-               // Request success
-               //
-               ByteBuf buf = 
KvStateRequestSerializer.serializeKvStateRequestResult(
-                               channel.alloc(),
-                               1222112277,
-                               new byte[0]);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, times(1)).onRequestResult(eq(1222112277L), 
any(byte[].class));
-               assertEquals("Buffer not recycled", 0, buf.refCnt());
-
-               //
-               // Request failure
-               //
-               buf = KvStateRequestSerializer.serializeKvStateRequestFailure(
-                               channel.alloc(),
-                               1222112278,
-                               new RuntimeException("Expected test 
Exception"));
-               buf.skipBytes(4); // skip frame length
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, times(1)).onRequestFailure(eq(1222112278L), 
any(RuntimeException.class));
-               assertEquals("Buffer not recycled", 0, buf.refCnt());
-
-               //
-               // Server failure
-               //
-               buf = KvStateRequestSerializer.serializeServerFailure(
-                               channel.alloc(),
-                               new RuntimeException("Expected test 
Exception"));
-               buf.skipBytes(4); // skip frame length
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, 
times(1)).onFailure(any(RuntimeException.class));
-
-               //
-               // Unexpected messages
-               //
-               buf = channel.alloc().buffer(4).writeInt(1223823);
-
-               // Verify callback
-               channel.writeInbound(buf);
-               verify(callback, 
times(2)).onFailure(any(IllegalStateException.class));
-               assertEquals("Buffer not recycled", 0, buf.refCnt());
-
-               //
-               // Exception caught
-               //
-               channel.pipeline().fireExceptionCaught(new 
RuntimeException("Expected test Exception"));
-               verify(callback, 
times(3)).onFailure(any(RuntimeException.class));
-
-               //
-               // Channel inactive
-               //
-               channel.pipeline().fireChannelInactive();
-               verify(callback, 
times(4)).onFailure(any(ClosedChannelException.class));
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
deleted file mode 100644
index 6b21487..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.message.KvStateRequest;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link KvStateClient}.
- */
-public class KvStateClientTest {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientTest.class);
-
-       // Thread pool for client bootstrap (shared between tests)
-       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
-
-       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(100, TimeUnit.SECONDS);
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (NIO_GROUP != null) {
-                       NIO_GROUP.shutdownGracefully();
-               }
-       }
-
-       /**
-        * Tests simple queries, of which half succeed and half fail.
-        */
-       @Test
-       public void testSimpleRequests() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       // Random result
-                       final byte[] expected = new byte[1024];
-                       ThreadLocalRandom.current().nextBytes(expected);
-
-                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.add((ByteBuf) msg);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       List<Future<byte[]>> futures = new ArrayList<>();
-
-                       int numQueries = 1024;
-
-                       for (int i = 0; i < numQueries; i++) {
-                               futures.add(client.getKvState(serverAddress, 
new KvStateID(), new byte[0]));
-                       }
-
-                       // Respond to messages
-                       Exception testException = new 
RuntimeException("Expected test Exception");
-
-                       for (int i = 0; i < numQueries; i++) {
-                               ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               assertNotNull("Receive timed out", buf);
-
-                               Channel ch = channel.get();
-                               assertNotNull("Channel not active", ch);
-
-                               assertEquals(KvStateRequestType.REQUEST, 
KvStateRequestSerializer.deserializeHeader(buf));
-                               KvStateRequest request = 
KvStateRequestSerializer.deserializeKvStateRequest(buf);
-
-                               buf.release();
-
-                               if (i % 2 == 0) {
-                                       ByteBuf response = 
KvStateRequestSerializer.serializeKvStateRequestResult(
-                                                       serverChannel.alloc(),
-                                                       request.getRequestId(),
-                                                       expected);
-
-                                       ch.writeAndFlush(response);
-                               } else {
-                                       ByteBuf response = 
KvStateRequestSerializer.serializeKvStateRequestFailure(
-                                                       serverChannel.alloc(),
-                                                       request.getRequestId(),
-                                                       testException);
-
-                                       ch.writeAndFlush(response);
-                               }
-                       }
-
-                       for (int i = 0; i < numQueries; i++) {
-                               if (i % 2 == 0) {
-                                       byte[] serializedResult = 
Await.result(futures.get(i), deadline.timeLeft());
-                                       assertArrayEquals(expected, 
serializedResult);
-                               } else {
-                                       try {
-                                               Await.result(futures.get(i), 
deadline.timeLeft());
-                                               fail("Did not throw expected 
Exception");
-                                       } catch (RuntimeException ignored) {
-                                               // Expected
-                                       }
-                               }
-                       }
-
-                       assertEquals(numQueries, stats.getNumRequests());
-                       int expectedRequests = numQueries / 2;
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != expectedRequests ||
-                                       stats.getNumFailed() != 
expectedRequests)) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(expectedRequests, 
stats.getNumSuccessful());
-                       assertEquals(expectedRequests, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a request to an unavailable host is failed with 
ConnectException.
-        */
-       @Test
-       public void testRequestUnavailableHost() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-               KvStateClient client = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       int availablePort = NetUtils.getAvailablePort();
-
-                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(
-                                       InetAddress.getLocalHost(),
-                                       availablePort);
-
-                       Future<byte[]> future = 
client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
-                       try {
-                               Await.result(future, deadline.timeLeft());
-                               fail("Did not throw expected ConnectException");
-                       } catch (ConnectException ignored) {
-                               // Expected
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Multiple threads concurrently fire queries.
-        */
-       @Test
-       public void testConcurrentQueries() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               ExecutorService executor = null;
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               final byte[] serializedResult = new byte[1024];
-               ThreadLocalRandom.current().nextBytes(serializedResult);
-
-               try {
-                       int numQueryTasks = 4;
-                       final int numQueriesPerTask = 1024;
-
-                       executor = Executors.newFixedThreadPool(numQueryTasks);
-
-                       client = new KvStateClient(1, stats);
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       ByteBuf buf = (ByteBuf) msg;
-                                       
assertEquals(KvStateRequestType.REQUEST, 
KvStateRequestSerializer.deserializeHeader(buf));
-                                       KvStateRequest request = 
KvStateRequestSerializer.deserializeKvStateRequest(buf);
-
-                                       buf.release();
-
-                                       ByteBuf response = 
KvStateRequestSerializer.serializeKvStateRequestResult(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       serializedResult);
-
-                                       ctx.channel().writeAndFlush(response);
-                               }
-                       });
-
-                       final KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       final KvStateClient finalClient = client;
-                       Callable<List<Future<byte[]>>> queryTask = new 
Callable<List<Future<byte[]>>>() {
-                               @Override
-                               public List<Future<byte[]>> call() throws 
Exception {
-                                       List<Future<byte[]>> results = new 
ArrayList<>(numQueriesPerTask);
-
-                                       for (int i = 0; i < numQueriesPerTask; 
i++) {
-                                               
results.add(finalClient.getKvState(
-                                                               serverAddress,
-                                                               new KvStateID(),
-                                                               new byte[0]));
-                                       }
-
-                                       return results;
-                               }
-                       };
-
-                       // Submit query tasks
-                       List<java.util.concurrent.Future<List<Future<byte[]>>>> 
futures = new ArrayList<>();
-                       for (int i = 0; i < numQueryTasks; i++) {
-                               futures.add(executor.submit(queryTask));
-                       }
-
-                       // Verify results
-                       for (java.util.concurrent.Future<List<Future<byte[]>>> 
future : futures) {
-                               List<Future<byte[]>> results = 
future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                               for (Future<byte[]> result : results) {
-                                       byte[] actual = Await.result(result, 
deadline.timeLeft());
-                                       assertArrayEquals(serializedResult, 
actual);
-                               }
-                       }
-
-                       int totalQueries = numQueryTasks * numQueriesPerTask;
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
stats.getNumSuccessful() != totalQueries) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(totalQueries, stats.getNumRequests());
-                       assertEquals(totalQueries, stats.getNumSuccessful());
-               } finally {
-                       if (executor != null) {
-                               executor.shutdown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a server failure closes the connection and removes it from
-        * the established connections.
-        */
-       @Test
-       public void testFailureClosesChannel() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.add((ByteBuf) msg);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       // Requests
-                       List<Future<byte[]>> futures = new ArrayList<>();
-                       futures.add(client.getKvState(serverAddress, new 
KvStateID(), new byte[0]));
-                       futures.add(client.getKvState(serverAddress, new 
KvStateID(), new byte[0]));
-
-                       ByteBuf buf = 
received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                       assertNotNull("Receive timed out", buf);
-                       buf.release();
-
-                       buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-                       assertNotNull("Receive timed out", buf);
-                       buf.release();
-
-                       assertEquals(1, stats.getNumConnections());
-
-                       Channel ch = channel.get();
-                       assertNotNull("Channel not active", ch);
-
-                       // Respond with failure
-                       
ch.writeAndFlush(KvStateRequestSerializer.serializeServerFailure(
-                                       serverChannel.alloc(),
-                                       new RuntimeException("Expected test 
server failure")));
-
-                       try {
-                               Await.result(futures.remove(0), 
deadline.timeLeft());
-                               fail("Did not throw expected server failure");
-                       } catch (RuntimeException ignored) {
-                               // Expected
-                       }
-
-                       try {
-                               Await.result(futures.remove(0), 
deadline.timeLeft());
-                               fail("Did not throw expected server failure");
-                       } catch (RuntimeException ignored) {
-                               // Expected
-                       }
-
-                       assertEquals(0, stats.getNumConnections());
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0 ||
-                                       stats.getNumFailed() != 2)) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(2, stats.getNumRequests());
-                       assertEquals(0, stats.getNumSuccessful());
-                       assertEquals(2, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests that a server channel close, closes the connection and removes 
it
-        * from the established connections.
-        */
-       @Test
-       public void testServerClosesChannel() throws Exception {
-               Deadline deadline = TEST_TIMEOUT.fromNow();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               Channel serverChannel = null;
-
-               try {
-                       client = new KvStateClient(1, stats);
-
-                       final AtomicBoolean received = new AtomicBoolean();
-                       final AtomicReference<Channel> channel = new 
AtomicReference<>();
-
-                       serverChannel = createServerChannel(new 
ChannelInboundHandlerAdapter() {
-                               @Override
-                               public void channelActive(ChannelHandlerContext 
ctx) throws Exception {
-                                       channel.set(ctx.channel());
-                               }
-
-                               @Override
-                               public void channelRead(ChannelHandlerContext 
ctx, Object msg) throws Exception {
-                                       received.set(true);
-                               }
-                       });
-
-                       KvStateServerAddress serverAddress = 
getKvStateServerAddress(serverChannel);
-
-                       // Requests
-                       Future<byte[]> future = 
client.getKvState(serverAddress, new KvStateID(), new byte[0]);
-
-                       while (!received.get() && deadline.hasTimeLeft()) {
-                               Thread.sleep(50);
-                       }
-                       assertTrue("Receive timed out", received.get());
-
-                       assertEquals(1, stats.getNumConnections());
-
-                       
channel.get().close().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-
-                       try {
-                               Await.result(future, deadline.timeLeft());
-                               fail("Did not throw expected server failure");
-                       } catch (ClosedChannelException ignored) {
-                               // Expected
-                       }
-
-                       assertEquals(0, stats.getNumConnections());
-
-                       // Counts can take some time to propagate
-                       while (deadline.hasTimeLeft() && 
(stats.getNumSuccessful() != 0 ||
-                                       stats.getNumFailed() != 1)) {
-                               Thread.sleep(100);
-                       }
-
-                       assertEquals(1, stats.getNumRequests());
-                       assertEquals(0, stats.getNumSuccessful());
-                       assertEquals(1, stats.getNumFailed());
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (serverChannel != null) {
-                               serverChannel.close();
-                       }
-
-                       assertEquals("Channel leak", 0, 
stats.getNumConnections());
-               }
-       }
-
-       /**
-        * Tests multiple clients querying multiple servers until 100k queries 
have
-        * been processed. At this point, the client is shut down and its 
verified
-        * that all ongoing requests are failed.
-        */
-       @Test
-       public void testClientServerIntegration() throws Exception {
-               // Config
-               final int numServers = 2;
-               final int numServerEventLoopThreads = 2;
-               final int numServerQueryThreads = 2;
-
-               final int numClientEventLoopThreads = 4;
-               final int numClientsTasks = 8;
-
-               final int batchSize = 16;
-
-               final int numKeyGroups = 1;
-
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               KvStateRegistry dummyRegistry = new KvStateRegistry();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(dummyRegistry);
-
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
-
-               final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-               AtomicKvStateRequestStats clientStats = new 
AtomicKvStateRequestStats();
-
-               KvStateClient client = null;
-               ExecutorService clientTaskExecutor = null;
-               final KvStateServer[] server = new KvStateServer[numServers];
-
-               try {
-                       client = new KvStateClient(numClientEventLoopThreads, 
clientStats);
-                       clientTaskExecutor = 
Executors.newFixedThreadPool(numClientsTasks);
-
-                       // Create state
-                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-                       desc.setQueryable("any");
-
-                       // Create servers
-                       KvStateRegistry[] registry = new 
KvStateRegistry[numServers];
-                       AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
-                       final KvStateID[] ids = new KvStateID[numServers];
-
-                       for (int i = 0; i < numServers; i++) {
-                               registry[i] = new KvStateRegistry();
-                               serverStats[i] = new 
AtomicKvStateRequestStats();
-                               server[i] = new KvStateServer(
-                                               InetAddress.getLocalHost(),
-                                               0,
-                                               numServerEventLoopThreads,
-                                               numServerQueryThreads,
-                                               registry[i],
-                                               serverStats[i]);
-
-                               server[i].start();
-
-                               backend.setCurrentKey(1010 + i);
-
-                               // Value per server
-                               ValueState<Integer> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE,
-                                               
VoidNamespaceSerializer.INSTANCE,
-                                               desc);
-
-                               state.update(201 + i);
-
-                               // we know it must be a KvStat but this is not 
exposed to the user via State
-                               InternalKvState<?> kvState = 
(InternalKvState<?>) state;
-
-                               // Register KvState (one state instance for all 
server)
-                               ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
-                       }
-
-                       final KvStateClient finalClient = client;
-                       Callable<Void> queryTask = new Callable<Void>() {
-                               @Override
-                               public Void call() throws Exception {
-                                       while (true) {
-                                               if (Thread.interrupted()) {
-                                                       throw new 
InterruptedException();
-                                               }
-
-                                               // Random server permutation
-                                               List<Integer> random = new 
ArrayList<>();
-                                               for (int j = 0; j < batchSize; 
j++) {
-                                                       random.add(j);
-                                               }
-                                               Collections.shuffle(random);
-
-                                               // Dispatch queries
-                                               List<Future<byte[]>> futures = 
new ArrayList<>(batchSize);
-
-                                               for (int j = 0; j < batchSize; 
j++) {
-                                                       int targetServer = 
random.get(j) % numServers;
-
-                                                       byte[] 
serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
-                                                                       1010 + 
targetServer,
-                                                                       
IntSerializer.INSTANCE,
-                                                                       
VoidNamespace.INSTANCE,
-                                                                       
VoidNamespaceSerializer.INSTANCE);
-
-                                                       
futures.add(finalClient.getKvState(
-                                                                       
server[targetServer].getAddress(),
-                                                                       
ids[targetServer],
-                                                                       
serializedKeyAndNamespace));
-                                               }
-
-                                               // Verify results
-                                               for (int j = 0; j < batchSize; 
j++) {
-                                                       int targetServer = 
random.get(j) % numServers;
-
-                                                       Future<byte[]> future = 
futures.get(j);
-                                                       byte[] buf = 
Await.result(future, timeout);
-                                                       int value = 
KvStateRequestSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
-                                                       assertEquals(201 + 
targetServer, value);
-                                               }
-                                       }
-                               }
-                       };
-
-                       // Submit tasks
-                       List<java.util.concurrent.Future<Void>> taskFutures = 
new ArrayList<>();
-                       for (int i = 0; i < numClientsTasks; i++) {
-                               
taskFutures.add(clientTaskExecutor.submit(queryTask));
-                       }
-
-                       long numRequests;
-                       while ((numRequests = clientStats.getNumRequests()) < 
100_000) {
-                               Thread.sleep(100);
-                               LOG.info("Number of requests {}/100_000", 
numRequests);
-                       }
-
-                       // Shut down
-                       client.shutDown();
-
-                       for (java.util.concurrent.Future<Void> future : 
taskFutures) {
-                               try {
-                                       future.get();
-                                       fail("Did not throw expected Exception 
after shut down");
-                               } catch (ExecutionException t) {
-                                       if (t.getCause() instanceof 
ClosedChannelException ||
-                                                       t.getCause() instanceof 
IllegalStateException) {
-                                               // Expected
-                                       } else {
-                                               t.printStackTrace();
-                                               fail("Failed with unexpected 
Exception type: " + t.getClass().getName());
-                                       }
-                               }
-                       }
-
-                       assertEquals("Connection leak (client)", 0, 
clientStats.getNumConnections());
-                       for (int i = 0; i < numServers; i++) {
-                               boolean success = false;
-                               int numRetries = 0;
-                               while (!success) {
-                                       try {
-                                               assertEquals("Connection leak 
(server)", 0, serverStats[i].getNumConnections());
-                                               success = true;
-                                       } catch (Throwable t) {
-                                               if (numRetries < 10) {
-                                                       LOG.info("Retrying 
connection leak check (server)");
-                                                       
Thread.sleep((numRetries + 1) * 50);
-                                                       numRetries++;
-                                               } else {
-                                                       throw t;
-                                               }
-                                       }
-                               }
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       for (int i = 0; i < numServers; i++) {
-                               if (server[i] != null) {
-                                       server[i].shutDown();
-                               }
-                       }
-
-                       if (clientTaskExecutor != null) {
-                               clientTaskExecutor.shutdown();
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private Channel createServerChannel(final ChannelHandler... handlers) 
throws UnknownHostException, InterruptedException {
-               ServerBootstrap bootstrap = new ServerBootstrap()
-                               // Bind address and port
-                               .localAddress(InetAddress.getLocalHost(), 0)
-                               // NIO server channels
-                               .group(NIO_GROUP)
-                               .channel(NioServerSocketChannel.class)
-                               // See initializer for pipeline details
-                               .childHandler(new 
ChannelInitializer<SocketChannel>() {
-                                       @Override
-                                       protected void 
initChannel(SocketChannel ch) throws Exception {
-                                               ch.pipeline()
-                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                                               
.addLast(handlers);
-                                       }
-                               });
-
-               return bootstrap.bind().sync().channel();
-       }
-
-       private KvStateServerAddress getKvStateServerAddress(Channel 
serverChannel) {
-               InetSocketAddress localAddress = (InetSocketAddress) 
serverChannel.localAddress();
-
-               return new KvStateServerAddress(localAddress.getAddress(), 
localAddress.getPort());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
deleted file mode 100644
index 4914ff7..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ /dev/null
@@ -1,721 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link KvStateServerHandler}.
- */
-public class KvStateServerHandlerTest extends TestLogger {
-
-       /** Shared Thread pool for query execution. */
-       private static final ExecutorService TEST_THREAD_POOL = 
Executors.newSingleThreadExecutor();
-
-       private static final int READ_TIMEOUT_MILLIS = 10000;
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (TEST_THREAD_POOL != null) {
-                       TEST_THREAD_POOL.shutdown();
-               }
-       }
-
-       /**
-        * Tests a simple successful query via an EmbeddedChannel.
-        */
-       @Test
-       public void testSimpleQuery() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Update the KvState and request it
-               int expectedValue = 712828289;
-
-               int key = 99812822;
-               backend.setCurrentKey(key);
-               ValueState<Integer> state = backend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
-
-               state.update(expectedValue);
-
-               byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-                               key,
-                               IntSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE);
-
-               long requestId = Integer.MAX_VALUE + 182828L;
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               registryListener.kvStateId,
-                               serializedKeyAndNamespace);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.REQUEST_RESULT, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestResult response = 
KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
-
-               assertEquals(requestId, response.getRequestId());
-
-               int actualValue = 
KvStateRequestSerializer.deserializeValue(response.getSerializedResult(), 
IntSerializer.INSTANCE);
-               assertEquals(expectedValue, actualValue);
-
-               assertEquals(stats.toString(), 1, stats.getNumRequests());
-
-               // Wait for async successful request report
-               long deadline = System.nanoTime() + 
TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
-               while (stats.getNumSuccessful() != 1 && System.nanoTime() <= 
deadline) {
-                       Thread.sleep(10);
-               }
-
-               assertEquals(stats.toString(), 1, stats.getNumSuccessful());
-       }
-
-       /**
-        * Tests the failure response with {@link UnknownKvStateID} as cause on
-        * queries for unregistered KvStateIDs.
-        */
-       @Test
-       public void testQueryUnknownKvStateID() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               long requestId = Integer.MAX_VALUE + 182828L;
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               new KvStateID(),
-                               new byte[0]);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-
-               assertEquals(requestId, response.getRequestId());
-
-               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKvStateID);
-
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
-       }
-
-       /**
-        * Tests the failure response with {@link UnknownKeyOrNamespace} as 
cause
-        * on queries for non-existing keys.
-        */
-       @Test
-       public void testQueryUnknownKey() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
-
-               byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-                               1238283,
-                               IntSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE);
-
-               long requestId = Integer.MAX_VALUE + 22982L;
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               registryListener.kvStateId,
-                               serializedKeyAndNamespace);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-
-               assertEquals(requestId, response.getRequestId());
-
-               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKeyOrNamespace);
-
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
-       }
-
-       /**
-        * Tests the failure response on a failure on the {@link 
InternalKvState#getSerializedValue(byte[])}
-        * call.
-        */
-       @Test
-       public void testFailureOnGetSerializedValue() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               // Failing KvState
-               InternalKvState<?> kvState = mock(InternalKvState.class);
-               when(kvState.getSerializedValue(any(byte[].class)))
-                               .thenThrow(new RuntimeException("Expected test 
Exception"));
-
-               KvStateID kvStateId = registry.registerKvState(
-                               new JobID(),
-                               new JobVertexID(),
-                               new KeyGroupRange(0, 0),
-                               "vanilla",
-                               kvState);
-
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               282872,
-                               kvStateId,
-                               new byte[0]);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-
-               assertTrue(response.getCause().getMessage().contains("Expected 
test Exception"));
-
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
-       }
-
-       /**
-        * Tests that the channel is closed if an Exception reaches the channel
-        * handler.
-        */
-       @Test
-       public void testCloseChannelOnExceptionCaught() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new EmbeddedChannel(handler);
-
-               channel.pipeline().fireExceptionCaught(new 
RuntimeException("Expected test Exception"));
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.SERVER_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               Throwable response = 
KvStateRequestSerializer.deserializeServerFailure(buf);
-
-               assertTrue(response.getMessage().contains("Expected test 
Exception"));
-
-               channel.closeFuture().await(READ_TIMEOUT_MILLIS);
-               assertFalse(channel.isActive());
-       }
-
-       /**
-        * Tests the failure response on a rejected execution, because the query
-        * executor has been closed.
-        */
-       @Test
-       public void testQueryExecutorShutDown() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               ExecutorService closedExecutor = 
Executors.newSingleThreadExecutor();
-               closedExecutor.shutdown();
-               assertTrue(closedExecutor.isShutdown());
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, closedExecutor, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               282872,
-                               registryListener.kvStateId,
-                               new byte[0]);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-
-               
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
-
-               assertEquals(1, stats.getNumRequests());
-               assertEquals(1, stats.getNumFailed());
-       }
-
-       /**
-        * Tests response on unexpected messages.
-        */
-       @Test
-       public void testUnexpectedMessage() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               // Write the request and wait for the response
-               ByteBuf unexpectedMessage = Unpooled.buffer(8);
-               unexpectedMessage.writeInt(4);
-               unexpectedMessage.writeInt(123238213);
-
-               channel.writeInbound(unexpectedMessage);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.SERVER_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               Throwable response = 
KvStateRequestSerializer.deserializeServerFailure(buf);
-
-               assertEquals(0, stats.getNumRequests());
-               assertEquals(0, stats.getNumFailed());
-
-               unexpectedMessage = 
KvStateRequestSerializer.serializeKvStateRequestResult(
-                               channel.alloc(),
-                               192,
-                               new byte[0]);
-
-               channel.writeInbound(unexpectedMessage);
-
-               buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.SERVER_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               response = 
KvStateRequestSerializer.deserializeServerFailure(buf);
-
-               assertTrue("Unexpected failure cause " + 
response.getClass().getName(), response instanceof IllegalArgumentException);
-
-               assertEquals(0, stats.getNumRequests());
-               assertEquals(0, stats.getNumFailed());
-       }
-
-       /**
-        * Tests that incoming buffer instances are recycled.
-        */
-       @Test
-       public void testIncomingBufferIsRecycled() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               282872,
-                               new KvStateID(),
-                               new byte[0]);
-
-               assertEquals(1, request.refCnt());
-
-               // Write regular request
-               channel.writeInbound(request);
-               assertEquals("Buffer not recycled", 0, request.refCnt());
-
-               // Write unexpected msg
-               ByteBuf unexpected = channel.alloc().buffer(8);
-               unexpected.writeInt(4);
-               unexpected.writeInt(4);
-
-               assertEquals(1, unexpected.refCnt());
-
-               channel.writeInbound(unexpected);
-               assertEquals("Buffer not recycled", 0, unexpected.refCnt());
-       }
-
-       /**
-        * Tests the failure response if the serializers don't match.
-        */
-       @Test
-       public void testSerializerMismatch() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               ValueState<Integer> state = backend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
-
-               int key = 99812822;
-
-               // Update the KvState
-               backend.setCurrentKey(key);
-               state.update(712828289);
-
-               byte[] wrongKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-                               "wrong-key-type",
-                               StringSerializer.INSTANCE,
-                               "wrong-namespace-type",
-                               StringSerializer.INSTANCE);
-
-               byte[] wrongNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-                               key,
-                               IntSerializer.INSTANCE,
-                               "wrong-namespace-type",
-                               StringSerializer.INSTANCE);
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               182828,
-                               registryListener.kvStateId,
-                               wrongKeyAndNamespace);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestFailure response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-               assertEquals(182828, response.getRequestId());
-               
assertTrue(response.getCause().getMessage().contains("IOException"));
-
-               // Repeat with wrong namespace only
-               request = KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               182829,
-                               registryListener.kvStateId,
-                               wrongNamespace);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               buf = (ByteBuf) readInboundBlocking(channel);
-               buf.skipBytes(4); // skip frame length
-
-               // Verify the response
-               assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-               assertEquals(182829, response.getRequestId());
-               
assertTrue(response.getCause().getMessage().contains("IOException"));
-
-               assertEquals(2, stats.getNumRequests());
-               assertEquals(2, stats.getNumFailed());
-       }
-
-       /**
-        * Tests that large responses are chunked.
-        */
-       @Test
-       public void testChunkedResponse() throws Exception {
-               KvStateRegistry registry = new KvStateRegistry();
-               KvStateRequestStats stats = new AtomicKvStateRequestStats();
-
-               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
-               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
-
-               int numKeyGroups = 1;
-               AbstractStateBackend abstractBackend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-               dummyEnv.setKvStateRegistry(registry);
-               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
-
-               final TestRegistryListener registryListener = new 
TestRegistryListener();
-               registry.registerListener(registryListener);
-
-               // Register state
-               ValueStateDescriptor<byte[]> desc = new 
ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
-               desc.setQueryable("vanilla");
-
-               ValueState<byte[]> state = backend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               desc);
-
-               // Update KvState
-               byte[] bytes = new byte[2 * 
channel.config().getWriteBufferHighWaterMark()];
-
-               byte current = 0;
-               for (int i = 0; i < bytes.length; i++) {
-                       bytes[i] = current++;
-               }
-
-               int key = 99812822;
-               backend.setCurrentKey(key);
-               state.update(bytes);
-
-               // Request
-               byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-                               key,
-                               IntSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE);
-
-               long requestId = Integer.MAX_VALUE + 182828L;
-
-               assertTrue(registryListener.registrationName.equals("vanilla"));
-
-               ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                               channel.alloc(),
-                               requestId,
-                               registryListener.kvStateId,
-                               serializedKeyAndNamespace);
-
-               // Write the request and wait for the response
-               channel.writeInbound(request);
-
-               Object msg = readInboundBlocking(channel);
-               assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Queries the embedded channel for data.
-        */
-       private Object readInboundBlocking(EmbeddedChannel channel) throws 
InterruptedException, TimeoutException {
-               final int sleepMillis = 50;
-
-               int sleptMillis = 0;
-
-               Object msg = null;
-               while (sleptMillis < READ_TIMEOUT_MILLIS &&
-                               (msg = channel.readOutbound()) == null) {
-
-                       Thread.sleep(sleepMillis);
-                       sleptMillis += sleepMillis;
-               }
-
-               if (msg == null) {
-                       throw new TimeoutException();
-               } else {
-                       return msg;
-               }
-       }
-
-       /**
-        * Frame length decoder (expected by the serialized messages).
-        */
-       private ChannelHandler getFrameDecoder() {
-               return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 
4, 0, 4);
-       }
-
-       /**
-        * A listener that keeps the last updated KvState information so that a 
test
-        * can retrieve it.
-        */
-       static class TestRegistryListener implements KvStateRegistryListener {
-               volatile JobVertexID jobVertexID;
-               volatile KeyGroupRange keyGroupIndex;
-               volatile String registrationName;
-               volatile KvStateID kvStateId;
-
-               @Override
-               public void notifyKvStateRegistered(JobID jobId,
-                               JobVertexID jobVertexId,
-                               KeyGroupRange keyGroupRange,
-                               String registrationName,
-                               KvStateID kvStateId) {
-                       this.jobVertexID = jobVertexId;
-                       this.keyGroupIndex = keyGroupRange;
-                       this.registrationName = registrationName;
-                       this.kvStateId = kvStateId;
-               }
-
-               @Override
-               public void notifyKvStateUnregistered(JobID jobId,
-                               JobVertexID jobVertexId,
-                               KeyGroupRange keyGroupRange,
-                               String registrationName) {
-
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
deleted file mode 100644
index f8213e1..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link KvStateServer}.
- */
-public class KvStateServerTest {
-
-       // Thread pool for client bootstrap (shared between tests)
-       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
-
-       private static final int TIMEOUT_MILLIS = 10000;
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (NIO_GROUP != null) {
-                       NIO_GROUP.shutdownGracefully();
-               }
-       }
-
-       /**
-        * Tests a simple successful query via a SocketChannel.
-        */
-       @Test
-       public void testSimpleRequest() throws Exception {
-               KvStateServer server = null;
-               Bootstrap bootstrap = null;
-               try {
-                       KvStateRegistry registry = new KvStateRegistry();
-                       KvStateRequestStats stats = new 
AtomicKvStateRequestStats();
-
-                       server = new KvStateServer(InetAddress.getLocalHost(), 
0, 1, 1, registry, stats);
-                       server.start();
-
-                       KvStateServerAddress serverAddress = 
server.getAddress();
-                       int numKeyGroups = 1;
-                       AbstractStateBackend abstractBackend = new 
MemoryStateBackend();
-                       DummyEnvironment dummyEnv = new 
DummyEnvironment("test", 1, 0);
-                       dummyEnv.setKvStateRegistry(registry);
-                       AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                                       dummyEnv,
-                                       new JobID(),
-                                       "test_op",
-                                       IntSerializer.INSTANCE,
-                                       numKeyGroups,
-                                       new KeyGroupRange(0, 0),
-                                       registry.createTaskRegistry(new 
JobID(), new JobVertexID()));
-
-                       final KvStateServerHandlerTest.TestRegistryListener 
registryListener =
-                                       new 
KvStateServerHandlerTest.TestRegistryListener();
-
-                       registry.registerListener(registryListener);
-
-                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-                       desc.setQueryable("vanilla");
-
-                       ValueState<Integer> state = backend.getPartitionedState(
-                                       VoidNamespace.INSTANCE,
-                                       VoidNamespaceSerializer.INSTANCE,
-                                       desc);
-
-                       // Update KvState
-                       int expectedValue = 712828289;
-
-                       int key = 99812822;
-                       backend.setCurrentKey(key);
-                       state.update(expectedValue);
-
-                       // Request
-                       byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-                                       key,
-                                       IntSerializer.INSTANCE,
-                                       VoidNamespace.INSTANCE,
-                                       VoidNamespaceSerializer.INSTANCE);
-
-                       // Connect to the server
-                       final BlockingQueue<ByteBuf> responses = new 
LinkedBlockingQueue<>();
-                       bootstrap = createBootstrap(
-                                       new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
-                                       new ChannelInboundHandlerAdapter() {
-                                               @Override
-                                               public void 
channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-                                                       responses.add((ByteBuf) 
msg);
-                                               }
-                                       });
-
-                       Channel channel = bootstrap
-                                       .connect(serverAddress.getHost(), 
serverAddress.getPort())
-                                       .sync().channel();
-
-                       long requestId = Integer.MAX_VALUE + 182828L;
-
-                       
assertTrue(registryListener.registrationName.equals("vanilla"));
-                       ByteBuf request = 
KvStateRequestSerializer.serializeKvStateRequest(
-                                       channel.alloc(),
-                                       requestId,
-                                       registryListener.kvStateId,
-                                       serializedKeyAndNamespace);
-
-                       channel.writeAndFlush(request);
-
-                       ByteBuf buf = responses.poll(TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
-
-                       assertEquals(KvStateRequestType.REQUEST_RESULT, 
KvStateRequestSerializer.deserializeHeader(buf));
-                       KvStateRequestResult response = 
KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
-
-                       assertEquals(requestId, response.getRequestId());
-                       int actualValue = 
KvStateRequestSerializer.deserializeValue(response.getSerializedResult(), 
IntSerializer.INSTANCE);
-                       assertEquals(expectedValue, actualValue);
-               } finally {
-                       if (server != null) {
-                               server.shutDown();
-                       }
-
-                       if (bootstrap != null) {
-                               EventLoopGroup group = bootstrap.group();
-                               if (group != null) {
-                                       group.shutdownGracefully();
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Creates a client bootstrap.
-        */
-       private Bootstrap createBootstrap(final ChannelHandler... handlers) {
-               return new 
Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class)
-                               .handler(new 
ChannelInitializer<SocketChannel>() {
-                                       @Override
-                                       protected void 
initChannel(SocketChannel ch) throws Exception {
-                                               ch.pipeline().addLast(handlers);
-                                       }
-                               });
-       }
-
-}

Reply via email to