http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
deleted file mode 100644
index 5d4a861..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
-import org.apache.flink.queryablestate.client.KvStateClient;
-import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
-import org.apache.flink.queryablestate.client.QueryableStateClient;
-import org.apache.flink.queryablestate.server.KvStateServerImpl;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
-import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.heap.HeapValueState;
-import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.MathUtils;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link QueryableStateClient}.
- */
-public class QueryableStateClientTest {
-
-       private static final ActorSystem testActorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
-
-       private static final FiniteDuration timeout = new FiniteDuration(100, 
TimeUnit.SECONDS);
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (testActorSystem != null) {
-                       testActorSystem.shutdown();
-               }
-       }
-
-       /**
-        * All failures should lead to a retry with a forced location lookup.
-        *
-        * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, 
UnknownKvStateLocation,
-        * ConnectException are checked explicitly as these indicate out-of-sync
-        * KvStateLocation.
-        */
-       @Test
-       public void testForceLookupOnOutdatedLocation() throws Exception {
-               KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
-               KvStateClient networkClient = mock(KvStateClient.class);
-
-               QueryableStateClient client = new QueryableStateClient(
-                               lookupService,
-                               networkClient,
-                               testActorSystem.dispatcher());
-
-               try {
-                       JobID jobId = new JobID();
-                       int numKeyGroups = 4;
-
-                       //
-                       // UnknownKvStateLocation
-                       //
-                       String query1 = "lucky";
-
-                       Future<KvStateLocation> unknownKvStateLocation = 
Futures.failed(
-                                       new UnknownKvStateLocation(query1));
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query1)))
-                                       .thenReturn(unknownKvStateLocation);
-
-                       Future<Integer> result = client.getKvState(
-                                       jobId,
-                                       query1,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected 
UnknownKvStateLocation exception");
-                       } catch (UnknownKvStateLocation ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query1));
-
-                       //
-                       // UnknownKvStateKeyGroupLocation
-                       //
-                       String query2 = "unlucky";
-
-                       Future<KvStateLocation> unknownKeyGroupLocation = 
Futures.successful(
-                                       new KvStateLocation(jobId, new 
JobVertexID(), numKeyGroups, query2));
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query2)))
-                                       .thenReturn(unknownKeyGroupLocation);
-
-                       result = client.getKvState(
-                                       jobId,
-                                       query2,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected 
UnknownKvStateKeyGroupLocation exception");
-                       } catch (UnknownKvStateKeyGroupLocation ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query2));
-
-                       //
-                       // UnknownKvStateID
-                       //
-                       String query3 = "water";
-                       KvStateID kvStateId = new KvStateID();
-                       Future<byte[]> unknownKvStateId = Futures.failed(new 
UnknownKvStateID(kvStateId));
-
-                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(InetAddress.getLocalHost(), 12323);
-                       KvStateLocation location = new KvStateLocation(jobId, 
new JobVertexID(), numKeyGroups, query3);
-                       for (int i = 0; i < numKeyGroups; i++) {
-                               location.registerKvState(new KeyGroupRange(i, 
i), kvStateId, serverAddress);
-                       }
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query3)))
-                                       
.thenReturn(Futures.successful(location));
-
-                       when(networkClient.getKvState(eq(serverAddress), 
eq(kvStateId), any(byte[].class)))
-                                       .thenReturn(unknownKvStateId);
-
-                       result = client.getKvState(
-                                       jobId,
-                                       query3,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected UnknownKvStateID 
exception");
-                       } catch (UnknownKvStateID ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query3));
-
-                       //
-                       // ConnectException
-                       //
-                       String query4 = "space";
-                       Future<byte[]> connectException = Futures.failed(new 
ConnectException());
-                       kvStateId = new KvStateID();
-
-                       serverAddress = new 
KvStateServerAddress(InetAddress.getLocalHost(), 11123);
-                       location = new KvStateLocation(jobId, new 
JobVertexID(), numKeyGroups, query4);
-                       for (int i = 0; i < numKeyGroups; i++) {
-                               location.registerKvState(new KeyGroupRange(i, 
i), kvStateId, serverAddress);
-                       }
-
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query4)))
-                                       
.thenReturn(Futures.successful(location));
-
-                       when(networkClient.getKvState(eq(serverAddress), 
eq(kvStateId), any(byte[].class)))
-                                       .thenReturn(connectException);
-
-                       result = client.getKvState(
-                                       jobId,
-                                       query4,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       try {
-                               Await.result(result, timeout);
-                               fail("Did not throw expected ConnectException 
exception");
-                       } catch (ConnectException ignored) {
-                               // Expected
-                       }
-
-                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query4));
-
-                       //
-                       // Other Exceptions don't lead to a retry no retry
-                       //
-                       String query5 = "universe";
-                       Future<KvStateLocation> exception = Futures.failed(new 
RuntimeException("Test exception"));
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query5)))
-                                       .thenReturn(exception);
-
-                       client.getKvState(
-                                       jobId,
-                                       query5,
-                                       0,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
-
-                       verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
-               } finally {
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Tests queries against multiple servers.
-        *
-        * <p>The servers are populated with different keys and the client 
queries
-        * all available keys from all servers.
-        */
-       @Test
-       public void testIntegrationWithKvStateServer() throws Exception {
-               // Config
-               int numServers = 2;
-               int numKeys = 1024;
-               int numKeyGroups = 1;
-
-               JobID jobId = new JobID();
-               JobVertexID jobVertexId = new JobVertexID();
-
-               KvStateServer[] servers = new KvStateServer[numServers];
-               AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
-
-               QueryableStateClient client = null;
-               KvStateClient networkClient = null;
-               AtomicKvStateRequestStats networkClientStats = new 
AtomicKvStateRequestStats();
-
-               MemoryStateBackend backend = new MemoryStateBackend();
-               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
-
-               AbstractKeyedStateBackend<Integer> keyedStateBackend = 
backend.createKeyedStateBackend(dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
-
-               try {
-                       KvStateRegistry[] registries = new 
KvStateRegistry[numServers];
-                       KvStateID[] kvStateIds = new KvStateID[numServers];
-                       List<HeapValueState<Integer, VoidNamespace, Integer>> 
kvStates = new ArrayList<>();
-
-                       // Start the servers
-                       for (int i = 0; i < numServers; i++) {
-                               registries[i] = new KvStateRegistry();
-                               serverStats[i] = new 
AtomicKvStateRequestStats();
-                               servers[i] = new 
KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registries[i], 
serverStats[i]);
-                               servers[i].start();
-                               ValueStateDescriptor<Integer> descriptor =
-                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-
-                               
RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> 
registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
-                                               descriptor.getType(),
-                                               descriptor.getName(),
-                                               
VoidNamespaceSerializer.INSTANCE,
-                                               IntSerializer.INSTANCE);
-
-                               // Register state
-                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = new HeapValueState<>(
-                                               descriptor,
-                                               new 
NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
-                                               IntSerializer.INSTANCE,
-                                               
VoidNamespaceSerializer.INSTANCE);
-
-                               kvStates.add(kvState);
-
-                               kvStateIds[i] = registries[i].registerKvState(
-                                               jobId,
-                                               new JobVertexID(),
-                                               new KeyGroupRange(i, i),
-                                               "choco",
-                                               kvState);
-                       }
-
-                       int[] expectedRequests = new int[numServers];
-
-                       for (int key = 0; key < numKeys; key++) {
-                               int targetKeyGroupIndex = 
MathUtils.murmurHash(key) % numServers;
-                               expectedRequests[targetKeyGroupIndex]++;
-
-                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = kvStates.get(targetKeyGroupIndex);
-
-                               keyedStateBackend.setCurrentKey(key);
-                               
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
-                               kvState.update(1337 + key);
-                       }
-
-                       // Location lookup service
-                       KvStateLocation location = new KvStateLocation(jobId, 
jobVertexId, numServers, "choco");
-                       for (int keyGroupIndex = 0; keyGroupIndex < numServers; 
keyGroupIndex++) {
-                               location.registerKvState(new 
KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], 
servers[keyGroupIndex].getAddress());
-                       }
-
-                       KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
-                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq("choco")))
-                                       
.thenReturn(Futures.successful(location));
-
-                       // The client
-                       networkClient = new KvStateClient(1, 
networkClientStats);
-
-                       client = new QueryableStateClient(lookupService, 
networkClient, testActorSystem.dispatcher());
-
-                       // Send all queries
-                       List<Future<Integer>> futures = new 
ArrayList<>(numKeys);
-                       for (int key = 0; key < numKeys; key++) {
-                               ValueStateDescriptor<Integer> descriptor =
-                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
-                               futures.add(client.getKvState(
-                                               jobId,
-                                               "choco",
-                                               key,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               descriptor));
-                       }
-
-                       // Verify results
-                       Future<Iterable<Integer>> future = 
Futures.sequence(futures, testActorSystem.dispatcher());
-                       Iterable<Integer> results = Await.result(future, 
timeout);
-
-                       int index = 0;
-                       for (int buffer : results) {
-                               assertEquals(1337 + index, buffer);
-                               index++;
-                       }
-
-                       // Verify requests
-                       for (int i = 0; i < numServers; i++) {
-                               int numRetries = 10;
-                               for (int retry = 0; retry < numRetries; 
retry++) {
-                                       try {
-                                               assertEquals("Unexpected number 
of requests", expectedRequests[i], serverStats[i].getNumRequests());
-                                               assertEquals("Unexpected 
success requests", expectedRequests[i], serverStats[i].getNumSuccessful());
-                                               assertEquals("Unexpected failed 
requests", 0, serverStats[i].getNumFailed());
-                                               break;
-                                       } catch (Throwable t) {
-                                               // Retry
-                                               if (retry == numRetries - 1) {
-                                                       throw t;
-                                               } else {
-                                                       Thread.sleep(100);
-                                               }
-                                       }
-                               }
-                       }
-               } finally {
-                       if (client != null) {
-                               client.shutDown();
-                       }
-
-                       if (networkClient != null) {
-                               networkClient.shutDown();
-                       }
-
-                       for (KvStateServer server : servers) {
-                               if (server != null) {
-                                       server.shutDown();
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Tests that the QueryableState client correctly caches location 
lookups
-        * keyed by both job and name. This test is mainly due to a previous 
bug due
-        * to which cache entries were by name only. This is a problem, because 
the
-        * same client can be used to query multiple jobs.
-        */
-       @Test
-       public void testLookupMultipleJobIds() throws Exception {
-               String name = "unique-per-job";
-
-               // Exact contents don't matter here
-               KvStateLocation location = new KvStateLocation(new JobID(), new 
JobVertexID(), 1, name);
-               location.registerKvState(new KeyGroupRange(0, 0), new 
KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
-
-               JobID jobId1 = new JobID();
-               JobID jobId2 = new JobID();
-
-               KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
-
-               when(lookupService.getKvStateLookupInfo(any(JobID.class), 
anyString()))
-                               .thenReturn(Futures.successful(location));
-
-               KvStateClient networkClient = mock(KvStateClient.class);
-               when(networkClient.getKvState(any(KvStateServerAddress.class), 
any(KvStateID.class), any(byte[].class)))
-                               .thenReturn(Futures.successful(new byte[0]));
-
-               QueryableStateClient client = new QueryableStateClient(
-                               lookupService,
-                               networkClient,
-                               testActorSystem.dispatcher());
-
-               ValueStateDescriptor<Integer> stateDesc = new 
ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
-
-               // Query ies with same name, but different job IDs should lead 
to a
-               // single lookup per query and job ID.
-               client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, 
stateDesc);
-               client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, 
stateDesc);
-
-               verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
-               verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index b982c8e..50ef543 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -445,4 +445,20 @@ public class FutureUtils {
 
                return result;
        }
+
+       // 
------------------------------------------------------------------------
+       //  Future Completed with an exception.
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns a {@link CompletableFuture} that has failed with the 
exception
+        * provided as argument.
+        * @param throwable the exception to fail the future with.
+        * @return The failed future.
+        */
+       public static <T> CompletableFuture<T> getFailedFuture(Throwable 
throwable) {
+               CompletableFuture<T> failedAttempt = new CompletableFuture<>();
+               failedAttempt.completeExceptionally(throwable);
+               return failedAttempt;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 53503ce..d6c5d75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -29,14 +29,16 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +67,9 @@ public class NetworkEnvironment {
        /** Server for {@link InternalKvState} requests. */
        private final KvStateServer kvStateServer;
 
+       /** Proxy for the queryable state client. */
+       private final KvStateClientProxy kvStateProxy;
+
        /** Registry for {@link InternalKvState} instances. */
        private final KvStateRegistry kvStateRegistry;
 
@@ -76,6 +81,7 @@ public class NetworkEnvironment {
 
        /** Number of network buffers to use for each outgoing/incoming channel 
(subpartition/input channel). */
        private final int networkBuffersPerChannel;
+
        /** Number of extra network buffers to use for each outgoing/incoming 
gate (result partition/input gate). */
        private final int extraNetworkBuffersPerGate;
 
@@ -88,6 +94,7 @@ public class NetworkEnvironment {
                        TaskEventDispatcher taskEventDispatcher,
                        KvStateRegistry kvStateRegistry,
                        KvStateServer kvStateServer,
+                       KvStateClientProxy kvStateClientProxy,
                        IOMode defaultIOMode,
                        int partitionRequestInitialBackoff,
                        int partitionRequestMaxBackoff,
@@ -101,6 +108,7 @@ public class NetworkEnvironment {
                this.kvStateRegistry = checkNotNull(kvStateRegistry);
 
                this.kvStateServer = kvStateServer;
+               this.kvStateProxy = kvStateClientProxy;
 
                this.defaultIOMode = defaultIOMode;
 
@@ -152,6 +160,10 @@ public class NetworkEnvironment {
                return kvStateServer;
        }
 
+       public KvStateClientProxy getKvStateProxy() {
+               return kvStateProxy;
+       }
+
        public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, 
JobVertexID jobVertexId) {
                return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
        }
@@ -291,17 +303,25 @@ public class NetworkEnvironment {
                        try {
                                LOG.debug("Starting network connection 
manager");
                                connectionManager.start(resultPartitionManager, 
taskEventDispatcher);
-                       }
-                       catch (IOException t) {
+                       } catch (IOException t) {
                                throw new IOException("Failed to instantiate 
network connection manager.", t);
                        }
 
                        if (kvStateServer != null) {
                                try {
-                                       LOG.debug("Starting the KvState 
server.");
                                        kvStateServer.start();
+                                       LOG.info("Started Queryable State Data 
Server @ {}", kvStateServer.getServerAddress());
+                               } catch (InterruptedException ie) {
+                                       throw new IOException("Failed to start 
the Queryable State Data Server.", ie);
+                               }
+                       }
+
+                       if (kvStateProxy != null) {
+                               try {
+                                       kvStateProxy.start();
+                                       LOG.info("Started the Queryable State 
Client Proxy @ {}", kvStateProxy.getServerAddress());
                                } catch (InterruptedException ie) {
-                                       throw new IOException("Failed to start 
the KvState server.", ie);
+                                       throw new IOException("Failed to start 
the Queryable State Client Proxy.", ie);
                                }
                        }
                }
@@ -318,11 +338,21 @@ public class NetworkEnvironment {
 
                        LOG.info("Shutting down the network environment and its 
components.");
 
+                       if (kvStateProxy != null) {
+                               try {
+                                       LOG.debug("Shutting down Queryable 
State Client Proxy.");
+                                       kvStateProxy.shutdown();
+                               } catch (Throwable t) {
+                                       LOG.warn("Cannot shut down Queryable 
State Client Proxy.", t);
+                               }
+                       }
+
                        if (kvStateServer != null) {
                                try {
-                                       kvStateServer.shutDown();
+                                       LOG.debug("Shutting down Queryable 
State Data Server.");
+                                       kvStateServer.shutdown();
                                } catch (Throwable t) {
-                                       LOG.warn("Cannot shut down KvState 
server.", t);
+                                       LOG.warn("Cannot shut down Queryable 
State Data Server.", t);
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
new file mode 100644
index 0000000..d605952
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An interface for the Queryable State Client Proxy running on each Task 
Manager in the cluster.
+ *
+ * <p>This proxy is where the Queryable State Client (potentially running 
outside your Flink
+ * cluster) connects to, and his responsibility is to forward the client's 
requests to the rest
+ * of the entities participating in fetching the requested state, and running 
within the cluster.
+ *
+ * <p>These are:
+ * <ol>
+ *     <li> the {@link org.apache.flink.runtime.jobmanager.JobManager Job 
Manager},
+ *     which is responsible for sending the
+ *     {@link org.apache.flink.runtime.taskmanager.TaskManager Task Manager} 
storing
+ *     the requested state, and </li>
+ *     <li> the Task Manager having the state itself.</li>
+ * </ol>
+ */
+public interface KvStateClientProxy extends KvStateServer {
+
+       /**
+        * Updates the active {@link 
org.apache.flink.runtime.jobmanager.JobManager Job Manager}
+        * in case of change.
+        *
+        * <p>This is useful in settings where high-availability is enabled and
+        * a failed Job Manager is replaced by a new one.
+        *
+        * <p><b>IMPORTANT: </b> this method may be called by a different 
thread than the {@link #getJobManagerFuture()}.
+        *
+        * @param leadingJobManager the currently leading job manager.
+        * */
+       void updateJobManager(CompletableFuture<ActorGateway> 
leadingJobManager) throws Exception;
+
+       /**
+        * Retrieves a future containing the currently leading Job Manager.
+        *
+        * <p><b>IMPORTANT: </b> this method may be called by a different 
thread than the
+        * {@link #updateJobManager(CompletableFuture)}.
+        *
+        * @return A {@link CompletableFuture} containing the currently active 
Job Manager.
+        */
+       CompletableFuture<ActorGateway> getJobManagerFuture();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 8a213bb..03e8238 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -31,7 +31,7 @@ import java.util.Arrays;
  * Location information for all key groups of a {@link InternalKvState} 
instance.
  *
  * <p>This is populated by the {@link KvStateLocationRegistry} and used by the
- * Queryable State Client to target queries.
+ * queryable state to target queries.
  */
 public class KvStateLocation implements Serializable {
 
@@ -183,10 +183,6 @@ public class KvStateLocation implements Serializable {
                }
        }
 
-       public static long getSerialVersionUID() {
-               return serialVersionUID;
-       }
-
        /**
         * Registers a KvState instance for the given key group index.
         *
@@ -194,7 +190,7 @@ public class KvStateLocation implements Serializable {
         * @throws IndexOutOfBoundsException If key group range start < 0 or 
key group range end >= Number of key groups
         * @throws IllegalArgumentException  If no location information 
registered for a key group index in the range.
         */
-       public void unregisterKvState(KeyGroupRange keyGroupRange) {
+       void unregisterKvState(KeyGroupRange keyGroupRange) {
                if (keyGroupRange.getStartKeyGroup() < 0 || 
keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
                        throw new IndexOutOfBoundsException("Key group index");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
index 9b14c49..81727fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
@@ -20,24 +20,21 @@ package org.apache.flink.runtime.query;
 
 /**
  * An interface for the Queryable State Server running on each Task Manager in 
the cluster.
- * This server is responsible for serving requests coming from the Queryable 
State Client and
- * requesting <b>locally</b> stored state.
+ * This server is responsible for serving requests coming from the {@link 
KvStateClientProxy
+ * Queryable State Proxy} and requesting <b>locally</b> stored state.
  */
 public interface KvStateServer {
 
        /**
-        * Returns the address of this server.
-        *
-        * @return Server address
+        * Returns the {@link KvStateServerAddress address} the server is 
listening to.
+        * @return Server address.
         */
-       KvStateServerAddress getAddress();
+       KvStateServerAddress getServerAddress();
 
 
-       /** Starts the proxy. */
+       /** Starts the server. */
        void start() throws InterruptedException;
 
-       /**
-        * Shuts down the server and all related thread pools.
-        */
-       void shutDown();
+       /** Shuts down the server and all related thread pools. */
+       void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index 852d394..8f66734 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -36,8 +36,56 @@ public final class QueryableStateUtils {
        private static final Logger LOG = 
LoggerFactory.getLogger(QueryableStateUtils.class);
 
        /**
+        * Initializes the {@link KvStateClientProxy client proxy} responsible 
for
+        * receiving requests from the external (to the cluster) client and 
forwarding them internally.
+        *
+        * @param address the address to bind to.
+        * @param port the port to listen to.
+        * @param eventLoopThreads the number of threads to be used to process 
incoming requests.
+        * @param queryThreads the number of threads to be used to send the 
actual state.
+        * @param stats statistics to be gathered about the incoming requests.
+        * @return the {@link KvStateClientProxy client proxy}.
+        */
+       public static KvStateClientProxy createKvStateClientProxy(
+                       final InetAddress address,
+                       final int port,
+                       final int eventLoopThreads,
+                       final int queryThreads,
+                       final KvStateRequestStats stats) {
+
+               Preconditions.checkNotNull(address, "address");
+               Preconditions.checkNotNull(stats, "stats");
+
+               Preconditions.checkArgument(eventLoopThreads >= 1);
+               Preconditions.checkArgument(queryThreads >= 1);
+
+               try {
+                       String classname = 
"org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl";
+                       Class<? extends KvStateClientProxy> clazz = 
Class.forName(classname).asSubclass(KvStateClientProxy.class);
+                       Constructor<? extends KvStateClientProxy> constructor = 
clazz.getConstructor(
+                                       InetAddress.class,
+                                       Integer.class,
+                                       Integer.class,
+                                       Integer.class,
+                                       KvStateRequestStats.class);
+                       return constructor.newInstance(address, port, 
eventLoopThreads, queryThreads, stats);
+               } catch (ClassNotFoundException e) {
+                       LOG.warn("Could not load Queryable State Client Proxy. 
" +
+                                       "Probable reason: flink-queryable-state 
is not in the classpath");
+                       LOG.debug("Caught exception", e);
+                       return null;
+               } catch (InvocationTargetException e) {
+                       LOG.error("Queryable State Client Proxy could not be 
created: ", e.getTargetException());
+                       return null;
+               } catch (Throwable t) {
+                       LOG.error("Failed to instantiate the Queryable State 
Client Proxy.", t);
+                       return null;
+               }
+       }
+
+       /**
         * Initializes the {@link KvStateServer server} responsible for sending 
the
-        * requested internal state to the Queryable State Client.
+        * requested internal state to the {@link KvStateClientProxy client 
proxy}.
         *
         * @param address the address to bind to.
         * @param port the port to listen to.
@@ -74,12 +122,12 @@ public final class QueryableStateUtils {
                                        KvStateRequestStats.class);
                        return constructor.newInstance(address, port, 
eventLoopThreads, queryThreads, kvStateRegistry, stats);
                } catch (ClassNotFoundException e) {
-                       LOG.info("Could not load Queryable State Server. " +
+                       LOG.warn("Could not load Queryable State Server. " +
                                        "Probable reason: flink-queryable-state 
is not in the classpath");
                        LOG.debug("Caught exception", e);
                        return null;
                } catch (InvocationTargetException e) {
-                       LOG.error("Queryable State Server could not be 
created", e.getTargetException());
+                       LOG.error("Queryable State Server could not be created: 
", e.getTargetException());
                        return null;
                } catch (Throwable t) {
                        LOG.error("Failed to instantiate the Queryable State 
Server.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
index 9781e23..19caf92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.query.netty;
 
-import org.apache.flink.runtime.query.KvStateServer;
-
 /**
- * Simple statistics for {@link KvStateServer} monitoring.
+ * Simple statistics for
+ * {@link org.apache.flink.runtime.query.KvStateServer} and
+ * {@link org.apache.flink.runtime.query.KvStateClientProxy} monitoring.
  */
 public interface KvStateRequestStats {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index 37d28de..fed5fc0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -82,6 +82,7 @@ public class QueryableStateConfiguration {
        public String toString() {
                return "QueryableStateConfiguration {" +
                                "enabled=" + enabled +
+                               ", port=" + port +
                                ", numServerThreads=" + numServerThreads +
                                ", numQueryThreads=" + numQueryThreads +
                                '}';

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 7c5c830..cbf0d95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -38,10 +38,11 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
@@ -66,7 +67,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 public class TaskManagerServices {
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServices.class);
 
-       /** TaskManager services */
+       /** TaskManager services. */
        private final TaskManagerLocation taskManagerLocation;
        private final MemoryManager memoryManager;
        private final IOManager ioManager;
@@ -356,6 +357,7 @@ public class TaskManagerServices {
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
 
                KvStateRegistry kvStateRegistry = new KvStateRegistry();
+               KvStateClientProxy kvClientProxy = null;
                KvStateServer kvStateServer = null;
 
                if 
(taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) {
@@ -367,11 +369,18 @@ public class TaskManagerServices {
                        int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
                                        
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numQueryThreads();
 
-                       kvStateServer = QueryableStateUtils.createKvStateServer(
+                       kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
                                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
                                        qsConfig.port(),
                                        numNetworkThreads,
                                        numQueryThreads,
+                                       new DisabledKvStateRequestStats());
+
+                       kvStateServer = QueryableStateUtils.createKvStateServer(
+                                       
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                                       0,
+                                       numNetworkThreads,
+                                       numQueryThreads,
                                        kvStateRegistry,
                                        new DisabledKvStateRequestStats());
                }
@@ -384,6 +393,7 @@ public class TaskManagerServices {
                        taskEventDispatcher,
                        kvStateRegistry,
                        kvStateServer,
+                       kvClientProxy,
                        networkEnvironmentConfiguration.ioMode(),
                        
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
                        
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index f1f7d39..e6643b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -189,8 +189,8 @@ public class TaskManagerServicesConfiguration {
                        remoteAddress,
                        slots);
 
-               final QueryableStateConfiguration queryableStateConfig = 
localCommunication ?
-                               QueryableStateConfiguration.disabled() :
+               // @Ufuk todo why was it like this before ???
+               final QueryableStateConfiguration queryableStateConfig =
                                parseQueryableStateConfiguration(configuration);
 
                // extract memory settings

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 558388c..c370725 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.blob.{BlobClient, 
BlobService, BlobCacheService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.Executors
+import org.apache.flink.runtime.concurrent.{Executors, FutureUtils}
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
FallbackLibraryCacheManager, LibraryCacheManager}
@@ -47,7 +47,7 @@ import 
org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInf
 import org.apache.flink.runtime.filecache.FileCache
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, 
HighAvailabilityServicesUtils}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, 
HardwareDescription, InstanceID}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, 
HardwareDescription, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker
@@ -951,7 +951,7 @@ class TaskManager(
       kvStateRegistry.registerListener(
         new ActorGatewayKvStateRegistryListener(
           jobManagerGateway,
-          kvStateServer.getAddress))
+          kvStateServer.getServerAddress))
     }
 
     // start a blob service, if a blob server is specified
@@ -1423,6 +1423,28 @@ class TaskManager(
   }
 
   override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: 
UUID): Unit = {
+    val proxy = network.getKvStateProxy
+    if (proxy != null) {
+
+      val askTimeoutString = 
config.getConfiguration.getString(AkkaOptions.ASK_TIMEOUT)
+
+      val timeout = Duration(askTimeoutString)
+
+      if (!timeout.isFinite) {
+        throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key +
+          " is not a finite timeout ('" + askTimeoutString + "')")
+      }
+
+      if (leaderAddress != null) {
+        val actorGwFuture: Future[ActorGateway] =
+          AkkaUtils.getActorRefFuture(
+            leaderAddress, context.system, timeout.asInstanceOf[FiniteDuration]
+          ).map(actor => new AkkaActorGateway(actor, 
leaderSessionID))(context.system.dispatcher)
+
+        proxy.updateJobManager(FutureUtils.toJava(actorGwFuture))
+      }
+    }
+
     self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 826ae3f..ef2d5c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -69,6 +69,7 @@ public class NetworkEnvironmentTest {
                        new TaskEventDispatcher(),
                        new KvStateRegistry(),
                        null,
+                       null,
                        IOManager.IOMode.SYNC,
                        0,
                        0,

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 052699a..6dabcd3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -145,6 +145,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                                new TaskEventDispatcher(),
                                new KvStateRegistry(),
                                null,
+                               null,
                                netConf.ioMode(),
                                netConf.partitionRequestInitialBackoff(),
                                netConf.partitionRequestMaxBackoff(),

Reply via email to