http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 27257d7..005c874 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -25,306 +25,117 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializer; -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.queryablestate.UnknownKeyOrNamespace; -import org.apache.flink.queryablestate.UnknownKvStateID; -import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateLocation; -import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.Client; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.dispatch.Recover; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.ConnectException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.CompletableFuture; /** - * Client for queryable state. + * Client for querying Flink's managed state. * * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}. - * The state instance created from this descriptor will be published for queries - * when it's created on the TaskManagers and the location will be reported to - * the JobManager. + * The state instance created from this descriptor will be published for queries when it's + * created on the Task Managers and the location will be reported to the Job Manager. * - * <p>The client resolves the location of the requested KvState via the - * JobManager. Resolved locations are cached. When the server address of the - * requested KvState instance is determined, the client sends out a request to - * the server. + * <p>The client connects to a {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy} + * running on a given Task Manager. The proxy is the entry point of the client to the Flink cluster. + * It forwards the requests of the client to the Job Manager and the required Task Manager, and forwards + * the final response back the client. + * + * <p>The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved + * locations are cached. When the server address of the requested KvState instance is determined, the + * client sends out a request to the server. The returned final answer is then forwarded to the Client. */ +@PublicEvolving public class QueryableStateClient { private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class); - /** - * {@link KvStateLocation} lookup to resolve the address of KvState instances. - */ - private final KvStateLocationLookupService lookupService; - - /** - * Network client for queries against {@link KvStateServer} instances. - */ - private final KvStateClient kvStateClient; - - /** - * Execution context. - */ - private final ExecutionContext executionContext; + /** The client that forwards the requests to the proxy. */ + private final Client<KvStateRequest, KvStateResponse> client; - /** - * Cache for {@link KvStateLocation} instances keyed by job and name. - */ - private final ConcurrentMap<Tuple2<JobID, String>, Future<KvStateLocation>> lookupCache = - new ConcurrentHashMap<>(); - - /** This is != null, if we started the actor system. */ - private final ActorSystem actorSystem; + /** The address of the proxy this client is connected to. */ + private final KvStateServerAddress remoteAddress; + /** The execution configuration used to instantiate the different (de-)serializers. */ private ExecutionConfig executionConfig; /** - * Creates a client from the given configuration. - * - * <p>This will create multiple Thread pools: one for the started actor - * system and another for the network client. - * - * @param config Configuration to use. - * @throws Exception Failures are forwarded + * Create the Queryable State Client. + * @param remoteHostname the hostname of the {@link org.apache.flink.runtime.query.KvStateClientProxy proxy} + * to connect to. + * @param remotePort the port of the proxy to connect to. */ - public QueryableStateClient(Configuration config) throws Exception { - this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION)); + public QueryableStateClient(final String remoteHostname, final int remotePort) throws UnknownHostException { + this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), remotePort); } /** - * Creates a client from the given configuration. - * - * <p>This will create multiple Thread pools: one for the started actor - * system and another for the network client. - * - * @param config Configuration to use. - * @param highAvailabilityServices Service factory for high availability services - * @throws Exception Failures are forwarded - * - * @deprecated This constructor is deprecated and stays only for backwards compatibility. Use the - * {@link #QueryableStateClient(Configuration)} instead. + * Create the Queryable State Client. + * @param remoteAddress the {@link InetAddress address} of the + * {@link org.apache.flink.runtime.query.KvStateClientProxy proxy} to connect to. + * @param remotePort the port of the proxy to connect to. */ - @Deprecated - public QueryableStateClient( - Configuration config, - HighAvailabilityServices highAvailabilityServices) throws Exception { - Preconditions.checkNotNull(config, "Configuration"); - - // Create a leader retrieval service - LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); - - // Get the ask timeout - String askTimeoutString = config.getString(AkkaOptions.ASK_TIMEOUT); - - Duration timeout = FiniteDuration.apply(askTimeoutString); - if (!timeout.isFinite()) { - throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key() - + " is not a finite timeout ('" + askTimeoutString + "')"); - } - - FiniteDuration askTimeout = (FiniteDuration) timeout; - - int lookupRetries = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES); - int lookupRetryDelayMillis = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY); - - // Retries if no JobManager is around - AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy = - new AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory( - lookupRetries, - FiniteDuration.apply(lookupRetryDelayMillis, "ms")); + public QueryableStateClient(final InetAddress remoteAddress, final int remotePort) { + Preconditions.checkArgument(remotePort >= 0 && remotePort <= 65536, + "Remote Port " + remotePort + " is out of valid port range (0-65536)."); - // Create the actor system - @SuppressWarnings("unchecked") - Option<Tuple2<String, Object>> remoting = new Some(new Tuple2<>("", 0)); - this.actorSystem = AkkaUtils.createActorSystem(config, remoting); + this.remoteAddress = new KvStateServerAddress(remoteAddress, remotePort); - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - actorSystem, - askTimeout, - retryStrategy); + final MessageSerializer<KvStateRequest, KvStateResponse> messageSerializer = + new MessageSerializer<>( + new KvStateRequest.KvStateRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); - int numEventLoopThreads = config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS); - - if (numEventLoopThreads == 0) { - numEventLoopThreads = Runtime.getRuntime().availableProcessors(); - } - - // Create the network client - KvStateClient networkClient = new KvStateClient( - numEventLoopThreads, + this.client = new Client<>( + "Queryable State Client", + Hardware.getNumberCPUCores(), + messageSerializer, new DisabledKvStateRequestStats()); - - this.lookupService = lookupService; - this.kvStateClient = networkClient; - this.executionContext = actorSystem.dispatcher(); - this.executionConfig = new ExecutionConfig(); - - this.lookupService.start(); - } - - /** Gets the {@link ExecutionConfig}. */ - public ExecutionConfig getExecutionConfig() { - return executionConfig; } - /** Sets the {@link ExecutionConfig}. */ - public void setExecutionConfig(ExecutionConfig config) { - this.executionConfig = config; - } - - /** - * Creates a client. - * - * @param lookupService Location lookup service - * @param kvStateClient Network client for queries - * @param executionContext Execution context for futures - */ - public QueryableStateClient( - KvStateLocationLookupService lookupService, - KvStateClient kvStateClient, - ExecutionContext executionContext) { - - this.lookupService = Preconditions.checkNotNull(lookupService, "KvStateLocationLookupService"); - this.kvStateClient = Preconditions.checkNotNull(kvStateClient, "KvStateClient"); - this.executionContext = Preconditions.checkNotNull(executionContext, "ExecutionContext"); - this.actorSystem = null; - - this.lookupService.start(); + /** Shuts down the client. */ + public void shutdown() { + client.shutdown(); } /** - * Returns the execution context of this client. - * - * @return The execution context used by the client. + * Gets the {@link ExecutionConfig}. */ - public ExecutionContext getExecutionContext() { - return executionContext; - } - - /** - * Shuts down the client and all components. - */ - public void shutDown() { - try { - lookupService.shutDown(); - } catch (Throwable t) { - LOG.error("Failed to shut down KvStateLookupService", t); - } - - try { - kvStateClient.shutDown(); - } catch (Throwable t) { - LOG.error("Failed to shut down KvStateClient", t); - } - - if (actorSystem != null) { - try { - actorSystem.shutdown(); - } catch (Throwable t) { - LOG.error("Failed to shut down ActorSystem", t); - } - } + public ExecutionConfig getExecutionConfig() { + return executionConfig; } /** - * Returns a future holding the serialized request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * - * @param jobId JobID of the job the queryable state - * belongs to - * @param queryableStateName Name under which the state is queryable - * @param keyHashCode Integer hash code of the key (result of - * a call to {@link Object#hashCode()} - * @param serializedKeyAndNamespace Serialized key and namespace to query - * KvState instance with - * @return Future holding the serialized result - */ - @SuppressWarnings("unchecked") - public Future<byte[]> getKvState( - final JobID jobId, - final String queryableStateName, - final int keyHashCode, - final byte[] serializedKeyAndNamespace) { - - return getKvState(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace, false) - .recoverWith(new Recover<Future<byte[]>>() { - @Override - public Future<byte[]> recover(Throwable failure) throws Throwable { - if (failure instanceof UnknownKvStateID || - failure instanceof UnknownKvStateKeyGroupLocation || - failure instanceof UnknownKvStateLocation || - failure instanceof ConnectException) { - // These failures are likely to be caused by out-of-sync - // KvStateLocation. Therefore we retry this query and - // force look up the location. - return getKvState( - jobId, - queryableStateName, - keyHashCode, - serializedKeyAndNamespace, - true); - } else { - return Futures.failed(failure); - } - } - }, executionContext); + * Replaces the existing {@link ExecutionConfig} (possibly {@code null}), with the provided one. + * @param config The new {@code configuration}. + * @return The old configuration, or {@code null} if none was specified. + * */ + public ExecutionConfig setExecutionConfig(ExecutionConfig config) { + ExecutionConfig prev = executionConfig; + this.executionConfig = config; + return prev; } /** - * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * + * Returns a future holding the request result. * * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateName Name under which the state is queryable. * @param key The key we are interested in. @@ -333,7 +144,7 @@ public class QueryableStateClient { * @return Future holding the result. */ @PublicEvolving - public <K, V> Future<V> getKvState( + public <K, V> CompletableFuture<V> getKvState( final JobID jobId, final String queryableStateName, final K key, @@ -347,16 +158,7 @@ public class QueryableStateClient { } /** - * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * + * Returns a future holding the request result. * * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateName Name under which the state is queryable. * @param key The key we are interested in. @@ -365,30 +167,19 @@ public class QueryableStateClient { * @return Future holding the result. */ @PublicEvolving - public <K, V> Future<V> getKvState( + public <K, V> CompletableFuture<V> getKvState( final JobID jobId, final String queryableStateName, final K key, final TypeInformation<K> keyTypeInfo, final StateDescriptor<?, V> stateDescriptor) { - Preconditions.checkNotNull(keyTypeInfo); - return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); } /** * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateName Name under which the state is queryable. * @param key The key that the state we request is associated with. @@ -399,7 +190,7 @@ public class QueryableStateClient { * @return Future holding the result. */ @PublicEvolving - public <K, V, N> Future<V> getKvState( + public <K, V, N> CompletableFuture<V> getKvState( final JobID jobId, final String queryableStateName, final K key, @@ -420,15 +211,6 @@ public class QueryableStateClient { /** * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateName Name under which the state is queryable. * @param key The key that the state we request is associated with. @@ -439,7 +221,7 @@ public class QueryableStateClient { * @return Future holding the result. */ @PublicEvolving - public <K, V, N> Future<V> getKvState( + public <K, N, V> CompletableFuture<V> getKvState( final JobID jobId, final String queryableStateName, final K key, @@ -448,8 +230,8 @@ public class QueryableStateClient { final TypeInformation<N> namespaceTypeInfo, final TypeSerializer<V> stateSerializer) { + Preconditions.checkNotNull(jobId); Preconditions.checkNotNull(queryableStateName); - Preconditions.checkNotNull(key); Preconditions.checkNotNull(namespace); @@ -457,36 +239,25 @@ public class QueryableStateClient { Preconditions.checkNotNull(namespaceTypeInfo); Preconditions.checkNotNull(stateSerializer); - if (stateSerializer instanceof ListSerializer) { - throw new IllegalArgumentException("ListState is not supported out-of-the-box yet."); - } - TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig); TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); final byte[] serializedKeyAndNamespace; try { - serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - key, - keySerializer, - namespace, - namespaceSerializer); + serializedKeyAndNamespace = KvStateSerializer + .serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer); } catch (IOException e) { - return Futures.failed(e); + return FutureUtils.getFailedFuture(e); } - return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace) - .flatMap(new Mapper<byte[], Future<V>>() { - @Override - public Future<V> apply(byte[] parameter) { - try { - return Futures.successful( - KvStateSerializer.deserializeValue(parameter, stateSerializer)); - } catch (IOException e) { - return Futures.failed(e); - } + return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( + stateResponse -> { + try { + return KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer); + } catch (IOException e) { + throw new FlinkRuntimeException(e); } - }, executionContext); + }); } /** @@ -499,92 +270,20 @@ public class QueryableStateClient { * a call to {@link Object#hashCode()} * @param serializedKeyAndNamespace Serialized key and namespace to query * KvState instance with - * @param forceLookup Flag to force lookup of the {@link KvStateLocation} * @return Future holding the serialized result */ - private Future<byte[]> getKvState( + private CompletableFuture<KvStateResponse> getKvState( final JobID jobId, final String queryableStateName, final int keyHashCode, - final byte[] serializedKeyAndNamespace, - boolean forceLookup) { - - return getKvStateLookupInfo(jobId, queryableStateName, forceLookup) - .flatMap(new Mapper<KvStateLocation, Future<byte[]>>() { - @Override - public Future<byte[]> apply(KvStateLocation lookup) { - int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups()); - - KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex); - if (serverAddress == null) { - return Futures.failed(new UnknownKvStateKeyGroupLocation()); - } else { - // Query server - KvStateID kvStateId = lookup.getKvStateID(keyGroupIndex); - return kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace); - } - } - }, executionContext); - } - - /** - * Lookup the {@link KvStateLocation} for the given job and queryable state - * name. - * - * <p>The job manager will be queried for the location only if forced or no - * cached location can be found. There are no guarantees about - * - * @param jobId JobID the state instance belongs to. - * @param queryableStateName Name under which the state instance has been published. - * @param forceUpdate Flag to indicate whether to force a update via the lookup service. - * @return Future holding the KvStateLocation - */ - private Future<KvStateLocation> getKvStateLookupInfo( - JobID jobId, - final String queryableStateName, - boolean forceUpdate) { - - if (forceUpdate) { - Future<KvStateLocation> lookupFuture = lookupService - .getKvStateLookupInfo(jobId, queryableStateName); - lookupCache.put(new Tuple2<>(jobId, queryableStateName), lookupFuture); - return lookupFuture; - } else { - Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName); - final Future<KvStateLocation> cachedFuture = lookupCache.get(cacheKey); - - if (cachedFuture == null) { - Future<KvStateLocation> lookupFuture = lookupService - .getKvStateLookupInfo(jobId, queryableStateName); - - Future<KvStateLocation> previous = lookupCache.putIfAbsent(cacheKey, lookupFuture); - if (previous == null) { - return lookupFuture; - } else { - return previous; - } - } else { - // do not retain futures which failed as they will remain in - // the cache even if the error cause is not present any more - // and a new lookup may succeed - if (cachedFuture.isCompleted() && - cachedFuture.value().get().isFailure()) { - // issue a new lookup - Future<KvStateLocation> lookupFuture = lookupService - .getKvStateLookupInfo(jobId, queryableStateName); - - // replace the existing one if it has not been replaced yet - // otherwise return the one in the cache - if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) { - return lookupFuture; - } else { - return lookupCache.get(cacheKey); - } - } else { - return cachedFuture; - } - } + final byte[] serializedKeyAndNamespace) { + LOG.info("Sending State Request to {}.", remoteAddress); + try { + KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace); + return client.sendRequest(remoteAddress, request); + } catch (Exception e) { + LOG.error("Unable to send KVStateRequest: ", e); + return FutureUtils.getFailedFuture(e); } } - }
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java new file mode 100644 index 0000000..d7191b6 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.queryablestate.UnknownKvStateIdException; +import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocationException; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerHandler; +import org.apache.flink.queryablestate.network.Client; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.server.KvStateServerImpl; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.query.KvStateClientProxy; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateMessage; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import scala.concurrent.duration.FiniteDuration; +import scala.reflect.ClassTag$; + +/** + * This handler acts as an internal (to the Flink cluster) client that receives + * the requests from external clients, executes them by contacting the Job Manager (if necessary) and + * the Task Manager holding the requested state, and forwards the answer back to the client. + */ +@Internal [email protected] +public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> { + + private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class); + + /** The proxy using this handler. */ + private final KvStateClientProxy proxy; + + /** A cache to hold the location of different states for which we have already seen requests. */ + private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache = + new ConcurrentHashMap<>(); + + /** + * Network client to forward queries to {@link KvStateServerImpl state server} + * instances inside the cluster. + */ + private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient; + + /** + * Create the handler used by the {@link KvStateClientProxyImpl}. + * + * @param proxy the {@link KvStateClientProxyImpl proxy} using the handler. + * @param queryExecutorThreads the number of threads used to process incoming requests. + * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages. + * @param stats server statistics collector. + */ + public KvStateClientProxyHandler( + final KvStateClientProxyImpl proxy, + final int queryExecutorThreads, + final MessageSerializer<KvStateRequest, KvStateResponse> serializer, + final KvStateRequestStats stats) { + + super(proxy, serializer, stats); + this.proxy = Preconditions.checkNotNull(proxy); + this.kvStateClient = createInternalClient(queryExecutorThreads); + } + + private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) { + final MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer = + new MessageSerializer<>( + new KvStateInternalRequest.KvStateInternalRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); + + return new Client<>( + "Queryable State Proxy Client", + threads, + messageSerializer, + new DisabledKvStateRequestStats()); + } + + @Override + public CompletableFuture<KvStateResponse> handleRequest( + final long requestId, + final KvStateRequest request) { + CompletableFuture<KvStateResponse> response = new CompletableFuture<>(); + executeActionAsync(response, request, false); + return response; + } + + private void executeActionAsync( + final CompletableFuture<KvStateResponse> result, + final KvStateRequest request, + final boolean update) { + + if (!result.isDone()) { + final CompletableFuture<KvStateResponse> operationFuture = getState(request, update); + operationFuture.whenCompleteAsync( + (t, throwable) -> { + if (throwable != null) { + if (throwable instanceof CancellationException) { + result.completeExceptionally(throwable); + } else if (throwable.getCause() instanceof UnknownKvStateIdException || + throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || + throwable.getCause() instanceof UnknownKvStateLocation || + throwable.getCause() instanceof ConnectException) { + + // These failures are likely to be caused by out-of-sync + // KvStateLocation. Therefore we retry this query and + // force look up the location. + + executeActionAsync(result, request, true); + } else { + result.completeExceptionally(throwable); + } + } else { + result.complete(t); + } + }, queryExecutor); + + result.whenComplete( + (t, throwable) -> operationFuture.cancel(false)); + } + } + + private CompletableFuture<KvStateResponse> getState( + final KvStateRequest request, + final boolean forceUpdate) { + + return getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate) + .thenComposeAsync((Function<KvStateLocation, CompletableFuture<KvStateResponse>>) location -> { + final int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash( + request.getKeyHashCode(), location.getNumKeyGroups()); + + final KvStateServerAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex); + if (serverAddress == null) { + return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName())); + } else { + // Query server + final KvStateID kvStateId = location.getKvStateID(keyGroupIndex); + final KvStateInternalRequest internalRequest = new KvStateInternalRequest( + kvStateId, request.getSerializedKeyAndNamespace()); + return kvStateClient.sendRequest(serverAddress, internalRequest); + } + }, queryExecutor); + } + + /** + * Lookup the {@link KvStateLocation} for the given job and queryable state name. + * + * <p>The job manager will be queried for the location only if forced or no + * cached location can be found. There are no guarantees about + * + * @param jobId JobID the state instance belongs to. + * @param queryableStateName Name under which the state instance has been published. + * @param forceUpdate Flag to indicate whether to force a update via the lookup service. + * @return Future holding the KvStateLocation + */ + private CompletableFuture<KvStateLocation> getKvStateLookupInfo( + final JobID jobId, + final String queryableStateName, + final boolean forceUpdate) { + + final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName); + final CompletableFuture<KvStateLocation> cachedFuture = lookupCache.get(cacheKey); + + if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) { + LOG.debug("Retrieving location for state={} of job={} from the cache.", jobId, queryableStateName); + return cachedFuture; + } + + LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName); + + return proxy.getJobManagerFuture().thenComposeAsync( + jobManagerGateway -> { + final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName); + final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava( + jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS)) + .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))); + + lookupCache.put(cacheKey, locationFuture); + return locationFuture; + }, queryExecutor); + } + + @Override + public void shutdown() { + kvStateClient.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java new file mode 100644 index 0000000..bca80de --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.UnknownJobManagerException; +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerBase; +import org.apache.flink.queryablestate.network.AbstractServerHandler; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.query.KvStateClientProxy; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.util.Preconditions; + +import java.net.InetAddress; +import java.util.concurrent.CompletableFuture; + +/** + * The default implementation of the {@link KvStateClientProxy}. + */ +@Internal +public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy { + + private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER = + FutureUtils.getFailedFuture(new UnknownJobManagerException()); + + /** Number of threads used to process incoming requests. */ + private final int queryExecutorThreads; + + /** Statistics collector. */ + private final KvStateRequestStats stats; + + private final Object leaderLock = new Object(); + + private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER; + + /** + * Creates the Queryable State Client Proxy. + * + * <p>The server is instantiated using reflection by the + * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats) + * QueryableStateUtils.startKvStateClientProxy(InetAddress, int, int, int, KvStateRequestStats)}. + * + * <p>The server needs to be started via {@link #start()} in order to bind + * to the configured bind address. + * + * @param bindAddress the address to listen to. + * @param bindPort the port to listen to. + * @param numEventLoopThreads number of event loop threads. + * @param numQueryThreads number of query threads. + * @param stats the statistics collector. + */ + public KvStateClientProxyImpl( + final InetAddress bindAddress, + final Integer bindPort, + final Integer numEventLoopThreads, + final Integer numQueryThreads, + final KvStateRequestStats stats) { + + super("Queryable State Proxy Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads); + Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); + this.queryExecutorThreads = numQueryThreads; + this.stats = Preconditions.checkNotNull(stats); + } + + @Override + public KvStateServerAddress getServerAddress() { + return super.getServerAddress(); + } + + @Override + public void start() throws InterruptedException { + super.start(); + } + + @Override + public void shutdown() { + super.shutdown(); + } + + @Override + public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception { + synchronized (leaderLock) { + if (leadingJobManager == null) { + jobManagerFuture = UNKNOWN_JOB_MANAGER; + } else { + jobManagerFuture = leadingJobManager; + } + } + } + + @Override + public CompletableFuture<ActorGateway> getJobManagerFuture() { + synchronized (leaderLock) { + return jobManagerFuture; + } + } + + @Override + public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler() { + MessageSerializer<KvStateRequest, KvStateResponse> serializer = + new MessageSerializer<>( + new KvStateRequest.KvStateRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); + return new KvStateClientProxyHandler(this, queryExecutorThreads, serializer, stats); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java new file mode 100644 index 0000000..eedc2a1 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.messages; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageDeserializer; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.nio.ByteBuffer; + +/** + * The request to be forwarded by the {@link org.apache.flink.runtime.query.KvStateClientProxy + * Queryable State Client Proxy} to the {@link org.apache.flink.runtime.query.KvStateServer State Server} + * of the Task Manager responsible for the requested state. + */ +@Internal +public class KvStateInternalRequest extends MessageBody { + + private final KvStateID kvStateId; + private final byte[] serializedKeyAndNamespace; + + public KvStateInternalRequest( + final KvStateID stateId, + final byte[] serializedKeyAndNamespace) { + + this.kvStateId = Preconditions.checkNotNull(stateId); + this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace); + } + + public KvStateID getKvStateId() { + return kvStateId; + } + + public byte[] getSerializedKeyAndNamespace() { + return serializedKeyAndNamespace; + } + + @Override + public byte[] serialize() { + + // KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace + final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length; + + return ByteBuffer.allocate(size) + .putLong(kvStateId.getLowerPart()) + .putLong(kvStateId.getUpperPart()) + .putInt(serializedKeyAndNamespace.length) + .put(serializedKeyAndNamespace) + .array(); + } + + /** + * A {@link MessageDeserializer deserializer} for {@link KvStateInternalRequest}. + */ + public static class KvStateInternalRequestDeserializer implements MessageDeserializer<KvStateInternalRequest> { + + @Override + public KvStateInternalRequest deserializeMessage(ByteBuf buf) { + KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong()); + + int length = buf.readInt(); + Preconditions.checkArgument(length >= 0, + "Negative length for key and namespace. " + + "This indicates a serialization error."); + + byte[] serializedKeyAndNamespace = new byte[length]; + if (length > 0) { + buf.readBytes(serializedKeyAndNamespace); + } + return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java index eb33bce..7eb39c7 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java @@ -18,72 +18,124 @@ package org.apache.flink.queryablestate.messages; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageDeserializer; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.nio.ByteBuffer; +import java.util.Arrays; + /** - * A {@link InternalKvState} instance request for a specific key and namespace. + * The request to be sent by the {@link org.apache.flink.queryablestate.client.QueryableStateClient + * Queryable State Client} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy} + * requesting a given state. */ -public final class KvStateRequest { +@Internal +public class KvStateRequest extends MessageBody { - /** ID for this request. */ - private final long requestId; + private final JobID jobId; + private final String stateName; + private final int keyHashCode; + private final byte[] serializedKeyAndNamespace; - /** ID of the requested KvState instance. */ - private final KvStateID kvStateId; + public KvStateRequest( + final JobID jobId, + final String stateName, + final int keyHashCode, + final byte[] serializedKeyAndNamespace) { - /** Serialized key and namespace to request from the KvState instance. */ - private final byte[] serializedKeyAndNamespace; + this.jobId = Preconditions.checkNotNull(jobId); + this.stateName = Preconditions.checkNotNull(stateName); + this.keyHashCode = keyHashCode; + this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace); + } - /** - * Creates a KvState instance request. - * - * @param requestId ID for this request - * @param kvStateId ID of the requested KvState instance - * @param serializedKeyAndNamespace Serialized key and namespace to request from the KvState - * instance - */ - public KvStateRequest(long requestId, KvStateID kvStateId, byte[] serializedKeyAndNamespace) { - this.requestId = requestId; - this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID"); - this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); + public JobID getJobId() { + return jobId; } - /** - * Returns the request ID. - * - * @return Request ID - */ - public long getRequestId() { - return requestId; + public String getStateName() { + return stateName; } - /** - * Returns the ID of the requested KvState instance. - * - * @return ID of the requested KvState instance - */ - public KvStateID getKvStateId() { - return kvStateId; + public int getKeyHashCode() { + return keyHashCode; } - /** - * Returns the serialized key and namespace to request from the KvState - * instance. - * - * @return Serialized key and namespace to request from the KvState instance - */ public byte[] getSerializedKeyAndNamespace() { return serializedKeyAndNamespace; } @Override + public byte[] serialize() { + + byte[] serializedStateName = stateName.getBytes(); + + // JobID + stateName + sizeOf(stateName) + hashCode + keyAndNamespace + sizeOf(keyAndNamespace) + final int size = + JobID.SIZE + + serializedStateName.length + Integer.BYTES + + Integer.BYTES + + serializedKeyAndNamespace.length + Integer.BYTES; + + return ByteBuffer.allocate(size) + .putLong(jobId.getLowerPart()) + .putLong(jobId.getUpperPart()) + .putInt(serializedStateName.length) + .put(serializedStateName) + .putInt(keyHashCode) + .putInt(serializedKeyAndNamespace.length) + .put(serializedKeyAndNamespace) + .array(); + } + + @Override public String toString() { return "KvStateRequest{" + - "requestId=" + requestId + - ", kvStateId=" + kvStateId + - ", serializedKeyAndNamespace.length=" + serializedKeyAndNamespace.length + + "jobId=" + jobId + + ", stateName='" + stateName + '\'' + + ", keyHashCode=" + keyHashCode + + ", serializedKeyAndNamespace=" + Arrays.toString(serializedKeyAndNamespace) + '}'; } + + /** + * A {@link MessageDeserializer deserializer} for {@link KvStateRequest}. + */ + public static class KvStateRequestDeserializer implements MessageDeserializer<KvStateRequest> { + + @Override + public KvStateRequest deserializeMessage(ByteBuf buf) { + JobID jobId = new JobID(buf.readLong(), buf.readLong()); + + int statenameLength = buf.readInt(); + Preconditions.checkArgument(statenameLength >= 0, + "Negative length for state name. " + + "This indicates a serialization error."); + + String stateName = ""; + if (statenameLength > 0) { + byte[] name = new byte[statenameLength]; + buf.readBytes(name); + stateName = new String(name); + } + + int keyHashCode = buf.readInt(); + + int knamespaceLength = buf.readInt(); + Preconditions.checkArgument(knamespaceLength >= 0, + "Negative length for key and namespace. " + + "This indicates a serialization error."); + + byte[] serializedKeyAndNamespace = new byte[knamespaceLength]; + if (knamespaceLength > 0) { + buf.readBytes(serializedKeyAndNamespace); + } + return new KvStateRequest(jobId, stateName, keyHashCode, serializedKeyAndNamespace); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java deleted file mode 100644 index 4015d79..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestFailure.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.messages; - -/** - * A failure response to a {@link KvStateRequest}. - */ -public final class KvStateRequestFailure { - - /** ID of the request responding to. */ - private final long requestId; - - /** Failure cause. Not allowed to be a user type. */ - private final Throwable cause; - - /** - * Creates a failure response to a {@link KvStateRequest}. - * - * @param requestId ID for the request responding to - * @param cause Failure cause (not allowed to be a user type) - */ - public KvStateRequestFailure(long requestId, Throwable cause) { - this.requestId = requestId; - this.cause = cause; - } - - /** - * Returns the request ID responding to. - * - * @return Request ID responding to - */ - public long getRequestId() { - return requestId; - } - - /** - * Returns the failure cause. - * - * @return Failure cause - */ - public Throwable getCause() { - return cause; - } - - @Override - public String toString() { - return "KvStateRequestFailure{" + - "requestId=" + requestId + - ", cause=" + cause + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java deleted file mode 100644 index 6bf2397..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequestResult.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.messages; - -import org.apache.flink.util.Preconditions; - -/** - * A successful response to a {@link KvStateRequest} containing the serialized - * result for the requested key and namespace. - */ -public final class KvStateRequestResult { - - /** ID of the request responding to. */ - private final long requestId; - - /** - * Serialized result for the requested key and namespace. If no result was - * available for the specified key and namespace, this is <code>null</code>. - */ - private final byte[] serializedResult; - - /** - * Creates a successful {@link KvStateRequestResult} response. - * - * @param requestId ID of the request responding to - * @param serializedResult Serialized result or <code>null</code> if none - */ - public KvStateRequestResult(long requestId, byte[] serializedResult) { - this.requestId = requestId; - this.serializedResult = Preconditions.checkNotNull(serializedResult, "Serialization result"); - } - - /** - * Returns the request ID responding to. - * - * @return Request ID responding to - */ - public long getRequestId() { - return requestId; - } - - /** - * Returns the serialized result or <code>null</code> if none available. - * - * @return Serialized result or <code>null</code> if none available. - */ - public byte[] getSerializedResult() { - return serializedResult; - } - - @Override - public String toString() { - return "KvStateRequestResult{" + - "requestId=" + requestId + - ", serializedResult.length=" + serializedResult.length + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java new file mode 100644 index 0000000..462135f --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.messages; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageDeserializer; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.nio.ByteBuffer; + +/** + * The response containing the (serialized) state sent by the {@link org.apache.flink.runtime.query.KvStateServer + * State Server} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}, and then forwarded + * by the proxy to the original {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State + * Client}. + */ +@Internal +public class KvStateResponse extends MessageBody { + + private final byte[] content; + + public KvStateResponse(final byte[] content) { + this.content = Preconditions.checkNotNull(content); + } + + public byte[] getContent() { + return content; + } + + @Override + public byte[] serialize() { + final int size = Integer.BYTES + content.length; + return ByteBuffer.allocate(size) + .putInt(content.length) + .put(content) + .array(); + } + + /** + * A {@link MessageDeserializer deserializer} for {@link KvStateResponseDeserializer}. + */ + public static class KvStateResponseDeserializer implements MessageDeserializer<KvStateResponse> { + + @Override + public KvStateResponse deserializeMessage(ByteBuf buf) { + int length = buf.readInt(); + Preconditions.checkArgument(length >= 0, + "Negative length for state content. " + + "This indicates a serialization error."); + byte[] content = new byte[length]; + buf.readBytes(content); + + return new KvStateResponse(content); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java new file mode 100644 index 0000000..4bf8e98 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.runtime.io.network.netty.NettyBufferPool; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * The base class for every server in the queryable state module. + * It is using pure netty to send and receive messages of type {@link MessageBody}. + * + * @param <REQ> the type of request the server expects to receive. + * @param <RESP> the type of response the server will send. + */ +@Internal +public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class); + + /** AbstractServerBase config: low water mark. */ + private static final int LOW_WATER_MARK = 8 * 1024; + + /** AbstractServerBase config: high water mark. */ + private static final int HIGH_WATER_MARK = 32 * 1024; + + private final String serverName; + + /** Netty's ServerBootstrap. */ + private final ServerBootstrap bootstrap; + + /** Query executor thread pool. */ + private final ExecutorService queryExecutor; + + /** Address of this server. */ + private KvStateServerAddress serverAddress; + + /** The handler used for the incoming messages. */ + private AbstractServerHandler<REQ, RESP> handler; + + /** + * Creates the {@link AbstractServerBase}. + * + * <p>The server needs to be started via {@link #start()} in order to bind + * to the configured bind address. + * + * @param serverName the name of the server + * @param bindAddress address to bind to + * @param bindPort port to bind to (random port if 0) + * @param numEventLoopThreads number of event loop threads + */ + protected AbstractServerBase( + final String serverName, + final InetAddress bindAddress, + final Integer bindPort, + final Integer numEventLoopThreads, + final Integer numQueryThreads) { + + Preconditions.checkNotNull(bindAddress); + Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort + " out of valid range (0-65536)."); + Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); + Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); + + this.serverName = Preconditions.checkNotNull(serverName); + this.queryExecutor = createQueryExecutor(numQueryThreads); + + final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); + + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Flink " + serverName + " EventLoop Thread %d") + .build(); + + final NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); + + bootstrap = new ServerBootstrap() + // Bind address and port + .localAddress(bindAddress, bindPort) + // NIO server channels + .group(nioGroup) + .channel(NioServerSocketChannel.class) + // AbstractServerBase channel Options + .option(ChannelOption.ALLOCATOR, bufferPool) + // Child channel options + .childOption(ChannelOption.ALLOCATOR, bufferPool) + .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK); + } + + /** + * Creates a thread pool for the query execution. + * + * @param numQueryThreads Number of query threads. + * @return Thread pool for query execution + */ + private ExecutorService createQueryExecutor(int numQueryThreads) { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Flink " + getServerName() + " Thread %d") + .build(); + + return Executors.newFixedThreadPool(numQueryThreads, threadFactory); + } + + protected ExecutorService getQueryExecutor() { + return queryExecutor; + } + + public String getServerName() { + return serverName; + } + + public abstract AbstractServerHandler<REQ, RESP> initializeHandler(); + + /** + * Starts the server by binding to the configured bind address (blocking). + * @throws InterruptedException If interrupted during the bind operation + */ + public void start() throws InterruptedException { + Preconditions.checkState(serverAddress == null, + "Server " + serverName + " has already been started @ " + serverAddress + '.'); + + this.handler = initializeHandler(); + bootstrap.childHandler(new ServerChannelInitializer<>(handler)); + + Channel channel = bootstrap.bind().sync().channel(); + InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); + serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); + + LOG.info("Started server {} @ {}", serverName, serverAddress); + } + + /** + * Returns the address of this server. + * + * @return AbstractServerBase address + * @throws IllegalStateException If server has not been started yet + */ + public KvStateServerAddress getServerAddress() { + Preconditions.checkState(serverAddress != null, "Server " + serverName + " has not been started."); + return serverAddress; + } + + /** + * Shuts down the server and all related thread pools. + */ + public void shutdown() { + LOG.info("Shutting down server {} @ {}", serverName, serverAddress); + + if (handler != null) { + handler.shutdown(); + } + + if (queryExecutor != null) { + queryExecutor.shutdown(); + } + + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + } + } + serverAddress = null; + } + + /** + * Channel pipeline initializer. + * + * <p>The request handler is shared, whereas the other handlers are created + * per channel. + */ + private static final class ServerChannelInitializer<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInitializer<SocketChannel> { + + /** The shared request handler. */ + private final AbstractServerHandler<REQ, RESP> sharedRequestHandler; + + /** + * Creates the channel pipeline initializer with the shared request handler. + * + * @param sharedRequestHandler Shared request handler. + */ + ServerChannelInitializer(AbstractServerHandler<REQ, RESP> sharedRequestHandler) { + this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "MessageBody handler"); + } + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addLast(new ChunkedWriteHandler()) + .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) + .addLast(sharedRequestHandler); + } + } + + @VisibleForTesting + public boolean isExecutorShutdown() { + return queryExecutor.isShutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java new file mode 100644 index 0000000..b9bf671 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * The base class of every handler used by an {@link AbstractServerBase}. + * + * @param <REQ> the type of request the server expects to receive. + * @param <RESP> the type of response the server will send. + */ +@Internal [email protected] +public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class); + + /** The owning server of this handler. */ + private final AbstractServerBase<REQ, RESP> server; + + /** The serializer used to (de-)serialize messages. */ + private final MessageSerializer<REQ, RESP> serializer; + + /** Thread pool for query execution. */ + protected final ExecutorService queryExecutor; + + /** Exposed server statistics. */ + private final KvStateRequestStats stats; + + /** + * Create the handler. + * + * @param serializer the serializer used to (de-)serialize messages + * @param stats statistics collector + */ + public AbstractServerHandler( + final AbstractServerBase<REQ, RESP> server, + final MessageSerializer<REQ, RESP> serializer, + final KvStateRequestStats stats) { + + this.server = Preconditions.checkNotNull(server); + this.serializer = Preconditions.checkNotNull(serializer); + this.queryExecutor = server.getQueryExecutor(); + this.stats = Preconditions.checkNotNull(stats); + } + + protected String getServerName() { + return server.getServerName(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + stats.reportActiveConnection(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + stats.reportInactiveConnection(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + REQ request = null; + long requestId = -1L; + + try { + final ByteBuf buf = (ByteBuf) msg; + final MessageType msgType = MessageSerializer.deserializeHeader(buf); + + requestId = MessageSerializer.getRequestId(buf); + + if (msgType == MessageType.REQUEST) { + + // ------------------------------------------------------------ + // MessageBody + // ------------------------------------------------------------ + request = serializer.deserializeRequest(buf); + stats.reportRequest(); + + // Execute actual query async, because it is possibly + // blocking (e.g. file I/O). + // + // A submission failure is not treated as fatal. todo here if there is a shared resource e.g. registry, then I will have to sync on that. + queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats)); + + } else { + // ------------------------------------------------------------ + // Unexpected + // ------------------------------------------------------------ + + final String errMsg = "Unexpected message type " + msgType + ". Expected " + MessageType.REQUEST + "."; + final ByteBuf failure = MessageSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException(errMsg)); + + LOG.debug(errMsg); + ctx.writeAndFlush(failure); + } + } catch (Throwable t) { + final String stringifiedCause = ExceptionUtils.stringifyException(t); + + String errMsg; + ByteBuf err; + if (request != null) { + errMsg = "Failed request with ID " + requestId + ". Caused by: " + stringifiedCause; + err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg)); + stats.reportFailedRequest(); + } else { + errMsg = "Failed incoming message. Caused by: " + stringifiedCause; + err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg)); + } + + LOG.debug(errMsg); + ctx.writeAndFlush(err); + + } finally { + // IMPORTANT: We have to always recycle the incoming buffer. + // Otherwise we will leak memory out of Netty's buffer pool. + // + // If any operation ever holds on to the buffer, it is the + // responsibility of that operation to retain the buffer and + // release it later. + ReferenceCountUtil.release(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + final String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(cause); + final ByteBuf err = serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg)); + + LOG.debug(msg); + ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE); + } + + /** + * Handles an incoming request and returns a {@link CompletableFuture} containing the corresponding response. + * + * <p><b>NOTE:</b> This method is called by multiple threads. + * + * @param requestId the id of the received request to be handled. + * @param request the request to be handled. + * @return A future with the response to be forwarded to the client. + */ + public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request); + + /** + * Shuts down any handler specific resources, e.g. thread pools etc. + */ + public abstract void shutdown(); + + /** + * Task to execute the actual query against the {@link InternalKvState} instance. + */ + private static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody> implements Runnable { + + private final AbstractServerHandler<REQ, RESP> handler; + + private final ChannelHandlerContext ctx; + + private final long requestId; + + private final REQ request; + + private final KvStateRequestStats stats; + + private final long creationNanos; + + AsyncRequestTask( + final AbstractServerHandler<REQ, RESP> handler, + final ChannelHandlerContext ctx, + final long requestId, + final REQ request, + final KvStateRequestStats stats) { + + this.handler = Preconditions.checkNotNull(handler); + this.ctx = Preconditions.checkNotNull(ctx); + this.requestId = requestId; + this.request = Preconditions.checkNotNull(request); + this.stats = Preconditions.checkNotNull(stats); + this.creationNanos = System.nanoTime(); + } + + @Override + public void run() { + + if (!ctx.channel().isActive()) { + return; + } + + handler.handleRequest(requestId, request).whenComplete((resp, throwable) -> { + try { + if (throwable != null) { + throw throwable instanceof CompletionException + ? throwable.getCause() + : throwable; + } + + if (resp == null) { + throw new BadRequestException(handler.getServerName(), "NULL returned for request with ID " + requestId + "."); + } + + final ByteBuf serialResp = MessageSerializer.serializeResponse(ctx.alloc(), requestId, resp); + + int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark(); + + ChannelFuture write; + if (serialResp.readableBytes() <= highWatermark) { + write = ctx.writeAndFlush(serialResp); + } else { + write = ctx.writeAndFlush(new ChunkedByteBuf(serialResp, highWatermark)); + } + write.addListener(new RequestWriteListener()); + + } catch (BadRequestException e) { + try { + stats.reportFailedRequest(); + final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, e); + ctx.writeAndFlush(err); + } catch (IOException io) { + LOG.error("Failed to respond with the error after failed request", io); + } + } catch (Throwable t) { + try { + stats.reportFailedRequest(); + + final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t); + final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg)); + ctx.writeAndFlush(err); + } catch (IOException io) { + LOG.error("Failed to respond with the error after failed request", io); + } + } + }); + } + + @Override + public String toString() { + return "AsyncRequestTask{" + + "requestId=" + requestId + + ", request=" + request + + '}'; + } + + /** + * Callback after query result has been written. + * + * <p>Gathers stats and logs errors. + */ + private class RequestWriteListener implements ChannelFutureListener { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + long durationNanos = System.nanoTime() - creationNanos; + long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); + + if (future.isSuccess()) { + LOG.debug("Request {} was successfully answered after {} ms.", request, durationMillis); + stats.reportSuccessfulRequest(durationMillis); + } else { + LOG.debug("Request {} failed after {} ms : ", request, durationMillis, future.cause()); + stats.reportFailedRequest(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java new file mode 100644 index 0000000..3c0c484 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +/** + * Base class for exceptions thrown during querying Flink's managed state. + */ +@Internal +public class BadRequestException extends Exception { + + private static final long serialVersionUID = 3458743952407632903L; + + public BadRequestException(String serverName, String message) { + super(Preconditions.checkNotNull(serverName) + " : " + message); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java new file mode 100644 index 0000000..9c56025 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; + +/** + * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler}, + * respecting the high and low watermarks. + * + * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a> + */ +@Internal +public class ChunkedByteBuf implements ChunkedInput<ByteBuf> { + + /** The buffer to chunk. */ + private final ByteBuf buf; + + /** Size of chunks. */ + private final int chunkSize; + + /** Closed flag. */ + private boolean isClosed; + + /** End of input flag. */ + private boolean isEndOfInput; + + public ChunkedByteBuf(ByteBuf buf, int chunkSize) { + this.buf = Preconditions.checkNotNull(buf, "Buffer"); + Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size"); + this.chunkSize = chunkSize; + } + + @Override + public boolean isEndOfInput() throws Exception { + return isClosed || isEndOfInput; + } + + @Override + public void close() throws Exception { + if (!isClosed) { + // If we did not consume the whole buffer yet, we have to release + // it here. Otherwise, it's the responsibility of the consumer. + if (!isEndOfInput) { + buf.release(); + } + + isClosed = true; + } + } + + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + if (isClosed) { + return null; + } else if (buf.readableBytes() <= chunkSize) { + isEndOfInput = true; + + // Don't retain as the consumer is responsible to release it + return buf.slice(); + } else { + // Return a chunk sized slice of the buffer. The ref count is + // shared with the original buffer. That's why we need to retain + // a reference here. + return buf.readSlice(chunkSize).retain(); + } + } + + @Override + public String toString() { + return "ChunkedByteBuf{" + + "buf=" + buf + + ", chunkSize=" + chunkSize + + ", isClosed=" + isClosed + + ", isEndOfInput=" + isEndOfInput + + '}'; + } +}
