http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java deleted file mode 100644 index 5d4a861..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.queryablestate.UnknownKvStateID; -import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation; -import org.apache.flink.queryablestate.client.KvStateClient; -import org.apache.flink.queryablestate.client.KvStateLocationLookupService; -import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.queryablestate.server.KvStateServerImpl; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateLocation; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServer; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.UnknownKvStateLocation; -import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.heap.HeapValueState; -import org.apache.flink.runtime.state.heap.NestedMapsStateTable; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.MathUtils; - -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import org.junit.AfterClass; -import org.junit.Test; - -import java.net.ConnectException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for {@link QueryableStateClient}. - */ -public class QueryableStateClientTest { - - private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - - private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS); - - @AfterClass - public static void tearDown() throws Exception { - if (testActorSystem != null) { - testActorSystem.shutdown(); - } - } - - /** - * All failures should lead to a retry with a forced location lookup. - * - * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, UnknownKvStateLocation, - * ConnectException are checked explicitly as these indicate out-of-sync - * KvStateLocation. - */ - @Test - public void testForceLookupOnOutdatedLocation() throws Exception { - KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); - KvStateClient networkClient = mock(KvStateClient.class); - - QueryableStateClient client = new QueryableStateClient( - lookupService, - networkClient, - testActorSystem.dispatcher()); - - try { - JobID jobId = new JobID(); - int numKeyGroups = 4; - - // - // UnknownKvStateLocation - // - String query1 = "lucky"; - - Future<KvStateLocation> unknownKvStateLocation = Futures.failed( - new UnknownKvStateLocation(query1)); - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1))) - .thenReturn(unknownKvStateLocation); - - Future<Integer> result = client.getKvState( - jobId, - query1, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected UnknownKvStateLocation exception"); - } catch (UnknownKvStateLocation ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query1)); - - // - // UnknownKvStateKeyGroupLocation - // - String query2 = "unlucky"; - - Future<KvStateLocation> unknownKeyGroupLocation = Futures.successful( - new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2)); - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2))) - .thenReturn(unknownKeyGroupLocation); - - result = client.getKvState( - jobId, - query2, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected UnknownKvStateKeyGroupLocation exception"); - } catch (UnknownKvStateKeyGroupLocation ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query2)); - - // - // UnknownKvStateID - // - String query3 = "water"; - KvStateID kvStateId = new KvStateID(); - Future<byte[]> unknownKvStateId = Futures.failed(new UnknownKvStateID(kvStateId)); - - KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323); - KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3); - for (int i = 0; i < numKeyGroups; i++) { - location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); - } - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3))) - .thenReturn(Futures.successful(location)); - - when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) - .thenReturn(unknownKvStateId); - - result = client.getKvState( - jobId, - query3, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected UnknownKvStateID exception"); - } catch (UnknownKvStateID ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query3)); - - // - // ConnectException - // - String query4 = "space"; - Future<byte[]> connectException = Futures.failed(new ConnectException()); - kvStateId = new KvStateID(); - - serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123); - location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4); - for (int i = 0; i < numKeyGroups; i++) { - location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); - } - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4))) - .thenReturn(Futures.successful(location)); - - when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) - .thenReturn(connectException); - - result = client.getKvState( - jobId, - query4, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected ConnectException exception"); - } catch (ConnectException ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query4)); - - // - // Other Exceptions don't lead to a retry no retry - // - String query5 = "universe"; - Future<KvStateLocation> exception = Futures.failed(new RuntimeException("Test exception")); - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5))) - .thenReturn(exception); - - client.getKvState( - jobId, - query5, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5)); - } finally { - client.shutDown(); - } - } - - /** - * Tests queries against multiple servers. - * - * <p>The servers are populated with different keys and the client queries - * all available keys from all servers. - */ - @Test - public void testIntegrationWithKvStateServer() throws Exception { - // Config - int numServers = 2; - int numKeys = 1024; - int numKeyGroups = 1; - - JobID jobId = new JobID(); - JobVertexID jobVertexId = new JobVertexID(); - - KvStateServer[] servers = new KvStateServer[numServers]; - AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; - - QueryableStateClient client = null; - KvStateClient networkClient = null; - AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats(); - - MemoryStateBackend backend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - - AbstractKeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend(dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); - - try { - KvStateRegistry[] registries = new KvStateRegistry[numServers]; - KvStateID[] kvStateIds = new KvStateID[numServers]; - List<HeapValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>(); - - // Start the servers - for (int i = 0; i < numServers; i++) { - registries[i] = new KvStateRegistry(); - serverStats[i] = new AtomicKvStateRequestStats(); - servers[i] = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]); - servers[i].start(); - ValueStateDescriptor<Integer> descriptor = - new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - - RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - VoidNamespaceSerializer.INSTANCE, - IntSerializer.INSTANCE); - - // Register state - HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>( - descriptor, - new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo), - IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - kvStates.add(kvState); - - kvStateIds[i] = registries[i].registerKvState( - jobId, - new JobVertexID(), - new KeyGroupRange(i, i), - "choco", - kvState); - } - - int[] expectedRequests = new int[numServers]; - - for (int key = 0; key < numKeys; key++) { - int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers; - expectedRequests[targetKeyGroupIndex]++; - - HeapValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex); - - keyedStateBackend.setCurrentKey(key); - kvState.setCurrentNamespace(VoidNamespace.INSTANCE); - kvState.update(1337 + key); - } - - // Location lookup service - KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco"); - for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) { - location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress()); - } - - KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); - when(lookupService.getKvStateLookupInfo(eq(jobId), eq("choco"))) - .thenReturn(Futures.successful(location)); - - // The client - networkClient = new KvStateClient(1, networkClientStats); - - client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher()); - - // Send all queries - List<Future<Integer>> futures = new ArrayList<>(numKeys); - for (int key = 0; key < numKeys; key++) { - ValueStateDescriptor<Integer> descriptor = - new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - futures.add(client.getKvState( - jobId, - "choco", - key, - BasicTypeInfo.INT_TYPE_INFO, - descriptor)); - } - - // Verify results - Future<Iterable<Integer>> future = Futures.sequence(futures, testActorSystem.dispatcher()); - Iterable<Integer> results = Await.result(future, timeout); - - int index = 0; - for (int buffer : results) { - assertEquals(1337 + index, buffer); - index++; - } - - // Verify requests - for (int i = 0; i < numServers; i++) { - int numRetries = 10; - for (int retry = 0; retry < numRetries; retry++) { - try { - assertEquals("Unexpected number of requests", expectedRequests[i], serverStats[i].getNumRequests()); - assertEquals("Unexpected success requests", expectedRequests[i], serverStats[i].getNumSuccessful()); - assertEquals("Unexpected failed requests", 0, serverStats[i].getNumFailed()); - break; - } catch (Throwable t) { - // Retry - if (retry == numRetries - 1) { - throw t; - } else { - Thread.sleep(100); - } - } - } - } - } finally { - if (client != null) { - client.shutDown(); - } - - if (networkClient != null) { - networkClient.shutDown(); - } - - for (KvStateServer server : servers) { - if (server != null) { - server.shutDown(); - } - } - } - } - - /** - * Tests that the QueryableState client correctly caches location lookups - * keyed by both job and name. This test is mainly due to a previous bug due - * to which cache entries were by name only. This is a problem, because the - * same client can be used to query multiple jobs. - */ - @Test - public void testLookupMultipleJobIds() throws Exception { - String name = "unique-per-job"; - - // Exact contents don't matter here - KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name); - location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892)); - - JobID jobId1 = new JobID(); - JobID jobId2 = new JobID(); - - KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); - - when(lookupService.getKvStateLookupInfo(any(JobID.class), anyString())) - .thenReturn(Futures.successful(location)); - - KvStateClient networkClient = mock(KvStateClient.class); - when(networkClient.getKvState(any(KvStateServerAddress.class), any(KvStateID.class), any(byte[].class))) - .thenReturn(Futures.successful(new byte[0])); - - QueryableStateClient client = new QueryableStateClient( - lookupService, - networkClient, - testActorSystem.dispatcher()); - - ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE); - - // Query ies with same name, but different job IDs should lead to a - // single lookup per query and job ID. - client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); - client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); - - verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name)); - verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name)); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index b982c8e..50ef543 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -445,4 +445,20 @@ public class FutureUtils { return result; } + + // ------------------------------------------------------------------------ + // Future Completed with an exception. + // ------------------------------------------------------------------------ + + /** + * Returns a {@link CompletableFuture} that has failed with the exception + * provided as argument. + * @param throwable the exception to fail the future with. + * @return The failed future. + */ + public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) { + CompletableFuture<T> failedAttempt = new CompletableFuture<>(); + failedAttempt.completeExceptionally(throwable); + return failedAttempt; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 53503ce..d6c5d75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -29,14 +29,16 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +67,9 @@ public class NetworkEnvironment { /** Server for {@link InternalKvState} requests. */ private final KvStateServer kvStateServer; + /** Proxy for the queryable state client. */ + private final KvStateClientProxy kvStateProxy; + /** Registry for {@link InternalKvState} instances. */ private final KvStateRegistry kvStateRegistry; @@ -76,6 +81,7 @@ public class NetworkEnvironment { /** Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). */ private final int networkBuffersPerChannel; + /** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ private final int extraNetworkBuffersPerGate; @@ -88,6 +94,7 @@ public class NetworkEnvironment { TaskEventDispatcher taskEventDispatcher, KvStateRegistry kvStateRegistry, KvStateServer kvStateServer, + KvStateClientProxy kvStateClientProxy, IOMode defaultIOMode, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, @@ -101,6 +108,7 @@ public class NetworkEnvironment { this.kvStateRegistry = checkNotNull(kvStateRegistry); this.kvStateServer = kvStateServer; + this.kvStateProxy = kvStateClientProxy; this.defaultIOMode = defaultIOMode; @@ -152,6 +160,10 @@ public class NetworkEnvironment { return kvStateServer; } + public KvStateClientProxy getKvStateProxy() { + return kvStateProxy; + } + public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) { return kvStateRegistry.createTaskRegistry(jobId, jobVertexId); } @@ -291,17 +303,25 @@ public class NetworkEnvironment { try { LOG.debug("Starting network connection manager"); connectionManager.start(resultPartitionManager, taskEventDispatcher); - } - catch (IOException t) { + } catch (IOException t) { throw new IOException("Failed to instantiate network connection manager.", t); } if (kvStateServer != null) { try { - LOG.debug("Starting the KvState server."); kvStateServer.start(); + LOG.info("Started Queryable State Data Server @ {}", kvStateServer.getServerAddress()); + } catch (InterruptedException ie) { + throw new IOException("Failed to start the Queryable State Data Server.", ie); + } + } + + if (kvStateProxy != null) { + try { + kvStateProxy.start(); + LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress()); } catch (InterruptedException ie) { - throw new IOException("Failed to start the KvState server.", ie); + throw new IOException("Failed to start the Queryable State Client Proxy.", ie); } } } @@ -318,11 +338,21 @@ public class NetworkEnvironment { LOG.info("Shutting down the network environment and its components."); + if (kvStateProxy != null) { + try { + LOG.debug("Shutting down Queryable State Client Proxy."); + kvStateProxy.shutdown(); + } catch (Throwable t) { + LOG.warn("Cannot shut down Queryable State Client Proxy.", t); + } + } + if (kvStateServer != null) { try { - kvStateServer.shutDown(); + LOG.debug("Shutting down Queryable State Data Server."); + kvStateServer.shutdown(); } catch (Throwable t) { - LOG.warn("Cannot shut down KvState server.", t); + LOG.warn("Cannot shut down Queryable State Data Server.", t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java new file mode 100644 index 0000000..d605952 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateClientProxy.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.runtime.instance.ActorGateway; + +import java.util.concurrent.CompletableFuture; + +/** + * An interface for the Queryable State Client Proxy running on each Task Manager in the cluster. + * + * <p>This proxy is where the Queryable State Client (potentially running outside your Flink + * cluster) connects to, and his responsibility is to forward the client's requests to the rest + * of the entities participating in fetching the requested state, and running within the cluster. + * + * <p>These are: + * <ol> + * <li> the {@link org.apache.flink.runtime.jobmanager.JobManager Job Manager}, + * which is responsible for sending the + * {@link org.apache.flink.runtime.taskmanager.TaskManager Task Manager} storing + * the requested state, and </li> + * <li> the Task Manager having the state itself.</li> + * </ol> + */ +public interface KvStateClientProxy extends KvStateServer { + + /** + * Updates the active {@link org.apache.flink.runtime.jobmanager.JobManager Job Manager} + * in case of change. + * + * <p>This is useful in settings where high-availability is enabled and + * a failed Job Manager is replaced by a new one. + * + * <p><b>IMPORTANT: </b> this method may be called by a different thread than the {@link #getJobManagerFuture()}. + * + * @param leadingJobManager the currently leading job manager. + * */ + void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception; + + /** + * Retrieves a future containing the currently leading Job Manager. + * + * <p><b>IMPORTANT: </b> this method may be called by a different thread than the + * {@link #updateJobManager(CompletableFuture)}. + * + * @return A {@link CompletableFuture} containing the currently active Job Manager. + */ + CompletableFuture<ActorGateway> getJobManagerFuture(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java index 8a213bb..03e8238 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java @@ -31,7 +31,7 @@ import java.util.Arrays; * Location information for all key groups of a {@link InternalKvState} instance. * * <p>This is populated by the {@link KvStateLocationRegistry} and used by the - * Queryable State Client to target queries. + * queryable state to target queries. */ public class KvStateLocation implements Serializable { @@ -183,10 +183,6 @@ public class KvStateLocation implements Serializable { } } - public static long getSerialVersionUID() { - return serialVersionUID; - } - /** * Registers a KvState instance for the given key group index. * @@ -194,7 +190,7 @@ public class KvStateLocation implements Serializable { * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups * @throws IllegalArgumentException If no location information registered for a key group index in the range. */ - public void unregisterKvState(KeyGroupRange keyGroupRange) { + void unregisterKvState(KeyGroupRange keyGroupRange) { if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) { throw new IndexOutOfBoundsException("Key group index"); } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java index 9b14c49..81727fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java @@ -20,24 +20,21 @@ package org.apache.flink.runtime.query; /** * An interface for the Queryable State Server running on each Task Manager in the cluster. - * This server is responsible for serving requests coming from the Queryable State Client and - * requesting <b>locally</b> stored state. + * This server is responsible for serving requests coming from the {@link KvStateClientProxy + * Queryable State Proxy} and requesting <b>locally</b> stored state. */ public interface KvStateServer { /** - * Returns the address of this server. - * - * @return Server address + * Returns the {@link KvStateServerAddress address} the server is listening to. + * @return Server address. */ - KvStateServerAddress getAddress(); + KvStateServerAddress getServerAddress(); - /** Starts the proxy. */ + /** Starts the server. */ void start() throws InterruptedException; - /** - * Shuts down the server and all related thread pools. - */ - void shutDown(); + /** Shuts down the server and all related thread pools. */ + void shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java index 852d394..8f66734 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java @@ -36,8 +36,56 @@ public final class QueryableStateUtils { private static final Logger LOG = LoggerFactory.getLogger(QueryableStateUtils.class); /** + * Initializes the {@link KvStateClientProxy client proxy} responsible for + * receiving requests from the external (to the cluster) client and forwarding them internally. + * + * @param address the address to bind to. + * @param port the port to listen to. + * @param eventLoopThreads the number of threads to be used to process incoming requests. + * @param queryThreads the number of threads to be used to send the actual state. + * @param stats statistics to be gathered about the incoming requests. + * @return the {@link KvStateClientProxy client proxy}. + */ + public static KvStateClientProxy createKvStateClientProxy( + final InetAddress address, + final int port, + final int eventLoopThreads, + final int queryThreads, + final KvStateRequestStats stats) { + + Preconditions.checkNotNull(address, "address"); + Preconditions.checkNotNull(stats, "stats"); + + Preconditions.checkArgument(eventLoopThreads >= 1); + Preconditions.checkArgument(queryThreads >= 1); + + try { + String classname = "org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl"; + Class<? extends KvStateClientProxy> clazz = Class.forName(classname).asSubclass(KvStateClientProxy.class); + Constructor<? extends KvStateClientProxy> constructor = clazz.getConstructor( + InetAddress.class, + Integer.class, + Integer.class, + Integer.class, + KvStateRequestStats.class); + return constructor.newInstance(address, port, eventLoopThreads, queryThreads, stats); + } catch (ClassNotFoundException e) { + LOG.warn("Could not load Queryable State Client Proxy. " + + "Probable reason: flink-queryable-state is not in the classpath"); + LOG.debug("Caught exception", e); + return null; + } catch (InvocationTargetException e) { + LOG.error("Queryable State Client Proxy could not be created: ", e.getTargetException()); + return null; + } catch (Throwable t) { + LOG.error("Failed to instantiate the Queryable State Client Proxy.", t); + return null; + } + } + + /** * Initializes the {@link KvStateServer server} responsible for sending the - * requested internal state to the Queryable State Client. + * requested internal state to the {@link KvStateClientProxy client proxy}. * * @param address the address to bind to. * @param port the port to listen to. @@ -74,12 +122,12 @@ public final class QueryableStateUtils { KvStateRequestStats.class); return constructor.newInstance(address, port, eventLoopThreads, queryThreads, kvStateRegistry, stats); } catch (ClassNotFoundException e) { - LOG.info("Could not load Queryable State Server. " + + LOG.warn("Could not load Queryable State Server. " + "Probable reason: flink-queryable-state is not in the classpath"); LOG.debug("Caught exception", e); return null; } catch (InvocationTargetException e) { - LOG.error("Queryable State Server could not be created", e.getTargetException()); + LOG.error("Queryable State Server could not be created: ", e.getTargetException()); return null; } catch (Throwable t) { LOG.error("Failed to instantiate the Queryable State Server.", t); http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java index 9781e23..19caf92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.query.netty; -import org.apache.flink.runtime.query.KvStateServer; - /** - * Simple statistics for {@link KvStateServer} monitoring. + * Simple statistics for + * {@link org.apache.flink.runtime.query.KvStateServer} and + * {@link org.apache.flink.runtime.query.KvStateClientProxy} monitoring. */ public interface KvStateRequestStats { http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java index 37d28de..fed5fc0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java @@ -82,6 +82,7 @@ public class QueryableStateConfiguration { public String toString() { return "QueryableStateConfiguration {" + "enabled=" + enabled + + ", port=" + port + ", numServerThreads=" + numServerThreads + ", numQueryThreads=" + numQueryThreads + '}'; http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 7c5c830..cbf0d95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -38,10 +38,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.QueryableStateUtils; import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; @@ -66,7 +67,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; public class TaskManagerServices { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class); - /** TaskManager services */ + /** TaskManager services. */ private final TaskManagerLocation taskManagerLocation; private final MemoryManager memoryManager; private final IOManager ioManager; @@ -356,6 +357,7 @@ public class TaskManagerServices { TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); KvStateRegistry kvStateRegistry = new KvStateRegistry(); + KvStateClientProxy kvClientProxy = null; KvStateServer kvStateServer = null; if (taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) { @@ -367,11 +369,18 @@ public class TaskManagerServices { int numQueryThreads = qsConfig.numQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads(); - kvStateServer = QueryableStateUtils.createKvStateServer( + kvClientProxy = QueryableStateUtils.createKvStateClientProxy( taskManagerServicesConfiguration.getTaskManagerAddress(), qsConfig.port(), numNetworkThreads, numQueryThreads, + new DisabledKvStateRequestStats()); + + kvStateServer = QueryableStateUtils.createKvStateServer( + taskManagerServicesConfiguration.getTaskManagerAddress(), + 0, + numNetworkThreads, + numQueryThreads, kvStateRegistry, new DisabledKvStateRequestStats()); } @@ -384,6 +393,7 @@ public class TaskManagerServices { taskEventDispatcher, kvStateRegistry, kvStateServer, + kvClientProxy, networkEnvironmentConfiguration.ioMode(), networkEnvironmentConfiguration.partitionRequestInitialBackoff(), networkEnvironmentConfiguration.partitionRequestMaxBackoff(), http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index f1f7d39..e6643b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -189,8 +189,8 @@ public class TaskManagerServicesConfiguration { remoteAddress, slots); - final QueryableStateConfiguration queryableStateConfig = localCommunication ? - QueryableStateConfiguration.disabled() : + // @Ufuk todo why was it like this before ??? + final QueryableStateConfiguration queryableStateConfig = parseQueryableStateConfiguration(configuration); // extract memory settings http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 558388c..c370725 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -39,7 +39,7 @@ import org.apache.flink.runtime.blob.{BlobClient, BlobService, BlobCacheService} import org.apache.flink.runtime.broadcast.BroadcastVariableManager import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.Executors +import org.apache.flink.runtime.concurrent.{Executors, FutureUtils} import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager} @@ -47,7 +47,7 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, PartitionInf import org.apache.flink.runtime.filecache.FileCache import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} -import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID} +import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, HardwareDescription, InstanceID} import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker @@ -951,7 +951,7 @@ class TaskManager( kvStateRegistry.registerListener( new ActorGatewayKvStateRegistryListener( jobManagerGateway, - kvStateServer.getAddress)) + kvStateServer.getServerAddress)) } // start a blob service, if a blob server is specified @@ -1423,6 +1423,28 @@ class TaskManager( } override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = { + val proxy = network.getKvStateProxy + if (proxy != null) { + + val askTimeoutString = config.getConfiguration.getString(AkkaOptions.ASK_TIMEOUT) + + val timeout = Duration(askTimeoutString) + + if (!timeout.isFinite) { + throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key + + " is not a finite timeout ('" + askTimeoutString + "')") + } + + if (leaderAddress != null) { + val actorGwFuture: Future[ActorGateway] = + AkkaUtils.getActorRefFuture( + leaderAddress, context.system, timeout.asInstanceOf[FiniteDuration] + ).map(actor => new AkkaActorGateway(actor, leaderSessionID))(context.system.dispatcher) + + proxy.updateJobManager(FutureUtils.toJava(actorGwFuture)) + } + } + self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID) } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index 826ae3f..ef2d5c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -69,6 +69,7 @@ public class NetworkEnvironmentTest { new TaskEventDispatcher(), new KvStateRegistry(), null, + null, IOManager.IOMode.SYNC, 0, 0, http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 052699a..6dabcd3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -145,6 +145,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { new TaskEventDispatcher(), new KvStateRegistry(), null, + null, netConf.ioMode(), netConf.partitionRequestInitialBackoff(), netConf.partitionRequestMaxBackoff(),
