http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java deleted file mode 100644 index 70bccf0..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.queryablestate.client.state.ImmutableStateBinder; -import org.apache.flink.queryablestate.messages.KvStateRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.Client; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; -import org.apache.flink.runtime.util.Hardware; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.CompletableFuture; - -/** - * Client for querying Flink's managed state. - * - * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}. - * The state instance created from this descriptor will be published for queries when it's - * created on the Task Managers and the location will be reported to the Job Manager. - * - * <p>The client connects to a {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy} - * running on a given Task Manager. The proxy is the entry point of the client to the Flink cluster. - * It forwards the requests of the client to the Job Manager and the required Task Manager, and forwards - * the final response back the client. - * - * <p>The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved - * locations are cached. When the server address of the requested KvState instance is determined, the - * client sends out a request to the server. The returned final answer is then forwarded to the Client. - */ -@PublicEvolving -public class QueryableStateClient { - - private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class); - - /** The client that forwards the requests to the proxy. */ - private final Client<KvStateRequest, KvStateResponse> client; - - /** The address of the proxy this client is connected to. */ - private final KvStateServerAddress remoteAddress; - - /** The execution configuration used to instantiate the different (de-)serializers. */ - private ExecutionConfig executionConfig; - - /** - * Create the Queryable State Client. - * @param remoteHostname the hostname of the {@link org.apache.flink.runtime.query.KvStateClientProxy proxy} - * to connect to. - * @param remotePort the port of the proxy to connect to. - */ - public QueryableStateClient(final String remoteHostname, final int remotePort) throws UnknownHostException { - this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), remotePort); - } - - /** - * Create the Queryable State Client. - * @param remoteAddress the {@link InetAddress address} of the - * {@link org.apache.flink.runtime.query.KvStateClientProxy proxy} to connect to. - * @param remotePort the port of the proxy to connect to. - */ - public QueryableStateClient(final InetAddress remoteAddress, final int remotePort) { - Preconditions.checkArgument(remotePort >= 0 && remotePort <= 65536, - "Remote Port " + remotePort + " is out of valid port range (0-65536)."); - - this.remoteAddress = new KvStateServerAddress(remoteAddress, remotePort); - - final MessageSerializer<KvStateRequest, KvStateResponse> messageSerializer = - new MessageSerializer<>( - new KvStateRequest.KvStateRequestDeserializer(), - new KvStateResponse.KvStateResponseDeserializer()); - - this.client = new Client<>( - "Queryable State Client", - Hardware.getNumberCPUCores(), - messageSerializer, - new DisabledKvStateRequestStats()); - } - - /** Shuts down the client. */ - public void shutdown() { - client.shutdown(); - } - - /** - * Gets the {@link ExecutionConfig}. - */ - public ExecutionConfig getExecutionConfig() { - return executionConfig; - } - - /** - * Replaces the existing {@link ExecutionConfig} (possibly {@code null}), with the provided one. - * @param config The new {@code configuration}. - * @return The old configuration, or {@code null} if none was specified. - * */ - public ExecutionConfig setExecutionConfig(ExecutionConfig config) { - ExecutionConfig prev = executionConfig; - this.executionConfig = config; - return prev; - } - - /** - * Returns a future holding the request result. * - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key we are interested in. - * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. - * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the immutable {@link State} object containing the result. - */ - @PublicEvolving - public <K, S extends State, V> CompletableFuture<S> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final TypeHint<K> keyTypeHint, - final StateDescriptor<S, V> stateDescriptor) { - - Preconditions.checkNotNull(keyTypeHint); - - TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo(); - return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor); - } - - /** - * Returns a future holding the request result. * - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key we are interested in. - * @param keyTypeInfo The {@link TypeInformation} of the key. - * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the immutable {@link State} object containing the result. - */ - @PublicEvolving - public <K, S extends State, V> CompletableFuture<S> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final TypeInformation<K> keyTypeInfo, - final StateDescriptor<S, V> stateDescriptor) { - - return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, - keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); - } - - /** - * Returns a future holding the request result. - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key that the state we request is associated with. - * @param namespace The namespace of the state. - * @param keyTypeInfo The {@link TypeInformation} of the keys. - * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. - * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the immutable {@link State} object containing the result. - */ - @PublicEvolving - public <K, N, S extends State, V> CompletableFuture<S> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final N namespace, - final TypeInformation<K> keyTypeInfo, - final TypeInformation<N> namespaceTypeInfo, - final StateDescriptor<S, V> stateDescriptor) { - - Preconditions.checkNotNull(jobId); - Preconditions.checkNotNull(queryableStateName); - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(namespace); - - Preconditions.checkNotNull(keyTypeInfo); - Preconditions.checkNotNull(namespaceTypeInfo); - Preconditions.checkNotNull(stateDescriptor); - - TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig); - TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); - - final byte[] serializedKeyAndNamespace; - try { - serializedKeyAndNamespace = KvStateSerializer - .serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer); - } catch (IOException e) { - return FutureUtils.getFailedFuture(e); - } - - return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( - stateResponse -> { - try { - return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent())); - } catch (Exception e) { - throw new FlinkRuntimeException(e); - } - }); - } - - /** - * Returns a future holding the serialized request result. - * - * @param jobId JobID of the job the queryable state - * belongs to - * @param queryableStateName Name under which the state is queryable - * @param keyHashCode Integer hash code of the key (result of - * a call to {@link Object#hashCode()} - * @param serializedKeyAndNamespace Serialized key and namespace to query - * KvState instance with - * @return Future holding the serialized result - */ - private CompletableFuture<KvStateResponse> getKvState( - final JobID jobId, - final String queryableStateName, - final int keyHashCode, - final byte[] serializedKeyAndNamespace) { - LOG.info("Sending State Request to {}.", remoteAddress); - try { - KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace); - return client.sendRequest(remoteAddress, request); - } catch (Exception e) { - LOG.error("Unable to send KVStateRequest: ", e); - return FutureUtils.getFailedFuture(e); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java deleted file mode 100644 index d7191b6..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.proxy; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.queryablestate.UnknownKvStateIdException; -import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocationException; -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.AbstractServerHandler; -import org.apache.flink.queryablestate.network.Client; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.server.KvStateServerImpl; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.query.KvStateClientProxy; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateLocation; -import org.apache.flink.runtime.query.KvStateMessage; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.UnknownKvStateLocation; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.ConnectException; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -/** - * This handler acts as an internal (to the Flink cluster) client that receives - * the requests from external clients, executes them by contacting the Job Manager (if necessary) and - * the Task Manager holding the requested state, and forwards the answer back to the client. - */ -@Internal [email protected] -public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class); - - /** The proxy using this handler. */ - private final KvStateClientProxy proxy; - - /** A cache to hold the location of different states for which we have already seen requests. */ - private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache = - new ConcurrentHashMap<>(); - - /** - * Network client to forward queries to {@link KvStateServerImpl state server} - * instances inside the cluster. - */ - private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient; - - /** - * Create the handler used by the {@link KvStateClientProxyImpl}. - * - * @param proxy the {@link KvStateClientProxyImpl proxy} using the handler. - * @param queryExecutorThreads the number of threads used to process incoming requests. - * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages. - * @param stats server statistics collector. - */ - public KvStateClientProxyHandler( - final KvStateClientProxyImpl proxy, - final int queryExecutorThreads, - final MessageSerializer<KvStateRequest, KvStateResponse> serializer, - final KvStateRequestStats stats) { - - super(proxy, serializer, stats); - this.proxy = Preconditions.checkNotNull(proxy); - this.kvStateClient = createInternalClient(queryExecutorThreads); - } - - private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) { - final MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer = - new MessageSerializer<>( - new KvStateInternalRequest.KvStateInternalRequestDeserializer(), - new KvStateResponse.KvStateResponseDeserializer()); - - return new Client<>( - "Queryable State Proxy Client", - threads, - messageSerializer, - new DisabledKvStateRequestStats()); - } - - @Override - public CompletableFuture<KvStateResponse> handleRequest( - final long requestId, - final KvStateRequest request) { - CompletableFuture<KvStateResponse> response = new CompletableFuture<>(); - executeActionAsync(response, request, false); - return response; - } - - private void executeActionAsync( - final CompletableFuture<KvStateResponse> result, - final KvStateRequest request, - final boolean update) { - - if (!result.isDone()) { - final CompletableFuture<KvStateResponse> operationFuture = getState(request, update); - operationFuture.whenCompleteAsync( - (t, throwable) -> { - if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || - throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || - throwable.getCause() instanceof UnknownKvStateLocation || - throwable.getCause() instanceof ConnectException) { - - // These failures are likely to be caused by out-of-sync - // KvStateLocation. Therefore we retry this query and - // force look up the location. - - executeActionAsync(result, request, true); - } else { - result.completeExceptionally(throwable); - } - } else { - result.complete(t); - } - }, queryExecutor); - - result.whenComplete( - (t, throwable) -> operationFuture.cancel(false)); - } - } - - private CompletableFuture<KvStateResponse> getState( - final KvStateRequest request, - final boolean forceUpdate) { - - return getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate) - .thenComposeAsync((Function<KvStateLocation, CompletableFuture<KvStateResponse>>) location -> { - final int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash( - request.getKeyHashCode(), location.getNumKeyGroups()); - - final KvStateServerAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex); - if (serverAddress == null) { - return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName())); - } else { - // Query server - final KvStateID kvStateId = location.getKvStateID(keyGroupIndex); - final KvStateInternalRequest internalRequest = new KvStateInternalRequest( - kvStateId, request.getSerializedKeyAndNamespace()); - return kvStateClient.sendRequest(serverAddress, internalRequest); - } - }, queryExecutor); - } - - /** - * Lookup the {@link KvStateLocation} for the given job and queryable state name. - * - * <p>The job manager will be queried for the location only if forced or no - * cached location can be found. There are no guarantees about - * - * @param jobId JobID the state instance belongs to. - * @param queryableStateName Name under which the state instance has been published. - * @param forceUpdate Flag to indicate whether to force a update via the lookup service. - * @return Future holding the KvStateLocation - */ - private CompletableFuture<KvStateLocation> getKvStateLookupInfo( - final JobID jobId, - final String queryableStateName, - final boolean forceUpdate) { - - final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName); - final CompletableFuture<KvStateLocation> cachedFuture = lookupCache.get(cacheKey); - - if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) { - LOG.debug("Retrieving location for state={} of job={} from the cache.", jobId, queryableStateName); - return cachedFuture; - } - - LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName); - - return proxy.getJobManagerFuture().thenComposeAsync( - jobManagerGateway -> { - final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName); - final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava( - jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS)) - .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))); - - lookupCache.put(cacheKey, locationFuture); - return locationFuture; - }, queryExecutor); - } - - @Override - public void shutdown() { - kvStateClient.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java deleted file mode 100644 index 196641d..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.proxy; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.UnknownJobManagerException; -import org.apache.flink.queryablestate.messages.KvStateRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.AbstractServerBase; -import org.apache.flink.queryablestate.network.AbstractServerHandler; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.query.KvStateClientProxy; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.util.Preconditions; - -import java.net.InetAddress; -import java.util.Iterator; -import java.util.concurrent.CompletableFuture; - -/** - * The default implementation of the {@link KvStateClientProxy}. - */ -@Internal -public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy { - - private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER = - FutureUtils.getFailedFuture(new UnknownJobManagerException()); - - /** Number of threads used to process incoming requests. */ - private final int queryExecutorThreads; - - /** Statistics collector. */ - private final KvStateRequestStats stats; - - private final Object leaderLock = new Object(); - - private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER; - - /** - * Creates the Queryable State Client Proxy. - * - * <p>The server is instantiated using reflection by the - * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats) - * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)}. - * - * <p>The server needs to be started via {@link #start()} in order to bind - * to the configured bind address. - * - * @param bindAddress the address to listen to. - * @param bindPortIterator the port range to try to bind to. - * @param numEventLoopThreads number of event loop threads. - * @param numQueryThreads number of query threads. - * @param stats the statistics collector. - */ - public KvStateClientProxyImpl( - final InetAddress bindAddress, - final Iterator<Integer> bindPortIterator, - final Integer numEventLoopThreads, - final Integer numQueryThreads, - final KvStateRequestStats stats) { - - super("Queryable State Proxy Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads); - Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); - this.queryExecutorThreads = numQueryThreads; - this.stats = Preconditions.checkNotNull(stats); - } - - @Override - public KvStateServerAddress getServerAddress() { - return super.getServerAddress(); - } - - @Override - public void start() throws Throwable { - super.start(); - } - - @Override - public void shutdown() { - super.shutdown(); - } - - @Override - public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception { - synchronized (leaderLock) { - if (leadingJobManager == null) { - jobManagerFuture = UNKNOWN_JOB_MANAGER; - } else { - jobManagerFuture = leadingJobManager; - } - } - } - - @Override - public CompletableFuture<ActorGateway> getJobManagerFuture() { - synchronized (leaderLock) { - return jobManagerFuture; - } - } - - @Override - public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler() { - MessageSerializer<KvStateRequest, KvStateResponse> serializer = - new MessageSerializer<>( - new KvStateRequest.KvStateRequestDeserializer(), - new KvStateResponse.KvStateResponseDeserializer()); - return new KvStateClientProxyHandler(this, queryExecutorThreads, serializer, stats); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java deleted file mode 100644 index b853cfc..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.state.AggregatingState; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * A read-only {@link AggregatingState} that <b>does not</b> allow for modifications. - * - * <p>This is the type of the result returned when querying Flink's keyed state using the - * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and - * providing an {@link AggregatingStateDescriptor}. - */ -@PublicEvolving -public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> { - - private final OUT value; - - private ImmutableAggregatingState(OUT value) { - this.value = Preconditions.checkNotNull(value); - } - - @Override - public OUT get() { - return value; - } - - @Override - public void add(Object newValue) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public void clear() { - throw MODIFICATION_ATTEMPT_ERROR; - } - - public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState( - final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor, - final byte[] serializedValue) throws IOException { - - final ACC accumulator = KvStateSerializer.deserializeValue( - serializedValue, - stateDescriptor.getSerializer()); - - final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator); - return new ImmutableAggregatingState<>(state); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java deleted file mode 100644 index a12adaa..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * A read-only {@link FoldingState} that does not allow for modifications. - * - * <p>This is the result returned when querying Flink's keyed state using the - * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and - * providing an {@link FoldingStateDescriptor}. - */ -@PublicEvolving -@Deprecated -public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> { - - private final ACC value; - - private ImmutableFoldingState(ACC value) { - this.value = Preconditions.checkNotNull(value); - } - - @Override - public ACC get() { - return value; - } - - @Override - public void add(Object newValue) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public void clear() { - throw MODIFICATION_ATTEMPT_ERROR; - } - - public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState( - final FoldingStateDescriptor<IN, ACC> stateDescriptor, - final byte[] serializedState) throws IOException { - - final ACC state = KvStateSerializer.deserializeValue( - serializedState, - stateDescriptor.getSerializer()); - return new ImmutableFoldingState<>(state); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java deleted file mode 100644 index 8416905..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.List; - -/** - * A read-only {@link ListState} that does not allow for modifications. - * - * <p>This is the result returned when querying Flink's keyed state using the - * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and - * providing an {@link ListStateDescriptor}. - */ -@PublicEvolving -public final class ImmutableListState<V> extends ImmutableState implements ListState<V> { - - private final List<V> listState; - - private ImmutableListState(final List<V> state) { - this.listState = Preconditions.checkNotNull(state); - } - - @Override - public Iterable<V> get() { - return listState; - } - - @Override - public void add(V value) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public void clear() { - throw MODIFICATION_ATTEMPT_ERROR; - } - - public static <V> ImmutableListState<V> createState( - final ListStateDescriptor<V> stateDescriptor, - final byte[] serializedState) throws IOException { - - final List<V> state = KvStateSerializer.deserializeList( - serializedState, - stateDescriptor.getElementSerializer()); - return new ImmutableListState<>(state); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java deleted file mode 100644 index c216d5d..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -/** - * A read-only {@link MapState} that does not allow for modifications. - * - * <p>This is the result returned when querying Flink's keyed state using the - * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and - * providing an {@link MapStateDescriptor}. - */ -@PublicEvolving -public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> { - - private final Map<K, V> state; - - private ImmutableMapState(final Map<K, V> mapState) { - this.state = Preconditions.checkNotNull(mapState); - } - - @Override - public V get(K key) { - return state.get(key); - } - - @Override - public void put(K key, V value) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public void putAll(Map<K, V> map) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public void remove(K key) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public boolean contains(K key) { - return state.containsKey(key); - } - - /** - * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}. - * - * @return A read-only iterable view of all the key-value pairs in the state. - * - * @throws Exception Thrown if the system cannot access the state. - */ - @Override - public Iterable<Map.Entry<K, V>> entries() { - return Collections.unmodifiableSet(state.entrySet()); - } - - /** - * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}. - * - * @return A read-only iterable view of all the keys in the state. - * - * @throws Exception Thrown if the system cannot access the state. - */ - @Override - public Iterable<K> keys() { - return Collections.unmodifiableSet(state.keySet()); - } - - /** - * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}. - * - * @return A read-only iterable view of all the values in the state. - * - * @throws Exception Thrown if the system cannot access the state. - */ - @Override - public Iterable<V> values() { - return Collections.unmodifiableCollection(state.values()); - } - - /** - * Iterates over all the mappings in the state. The iterator cannot - * remove elements. - * - * @return A read-only iterator over all the mappings in the state - * - * @throws Exception Thrown if the system cannot access the state. - */ - @Override - public Iterator<Map.Entry<K, V>> iterator() { - return Collections.unmodifiableSet(state.entrySet()).iterator(); - } - - @Override - public void clear() { - throw MODIFICATION_ATTEMPT_ERROR; - } - - public static <K, V> ImmutableMapState<K, V> createState( - final MapStateDescriptor<K, V> stateDescriptor, - final byte[] serializedState) throws IOException { - - final Map<K, V> state = KvStateSerializer.deserializeMap( - serializedState, - stateDescriptor.getKeySerializer(), - stateDescriptor.getValueSerializer()); - return new ImmutableMapState<>(state); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java deleted file mode 100644 index da08c53..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * A read-only {@link ReducingState} that does not allow for modifications. - * - * <p>This is the result returned when querying Flink's keyed state using the - * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and - * providing an {@link ReducingStateDescriptor}. - */ -@PublicEvolving -public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> { - - private final V value; - - private ImmutableReducingState(V value) { - this.value = Preconditions.checkNotNull(value); - } - - @Override - public V get() { - return value; - } - - @Override - public void add(V newValue) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public void clear() { - throw MODIFICATION_ATTEMPT_ERROR; - } - - public static <V> ImmutableReducingState<V> createState( - final ReducingStateDescriptor<V> stateDescriptor, - final byte[] serializedState) throws IOException { - - final V state = KvStateSerializer.deserializeValue( - serializedState, - stateDescriptor.getSerializer()); - return new ImmutableReducingState<>(state); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java deleted file mode 100644 index 863f07b..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -/** - * A base class for the <b>read-only</b> types of state returned - * as results from the Queryable State Client. - */ -abstract class ImmutableState { - - protected static final UnsupportedOperationException MODIFICATION_ATTEMPT_ERROR = - new UnsupportedOperationException("State is read-only. No modifications allowed."); -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java deleted file mode 100644 index 6ce2787..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -import org.apache.flink.api.common.state.AggregatingState; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.StateBinder; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.util.Preconditions; - -/** - * A {@link StateBinder} used to deserialize the results returned by the - * {@link org.apache.flink.queryablestate.client.QueryableStateClient}. - * - * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State} - * object containing the requested result. - */ -public class ImmutableStateBinder implements StateBinder { - - private final byte[] serializedState; - - public ImmutableStateBinder(final byte[] content) { - serializedState = Preconditions.checkNotNull(content); - } - - @Override - public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception { - return ImmutableValueState.createState(stateDesc, serializedState); - } - - @Override - public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception { - return ImmutableListState.createState(stateDesc, serializedState); - } - - @Override - public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception { - return ImmutableReducingState.createState(stateDesc, serializedState); - } - - @Override - public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception { - return ImmutableAggregatingState.createState(stateDesc, serializedState); - } - - @Override - public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { - return ImmutableFoldingState.createState(stateDesc, serializedState); - } - - @Override - public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception { - return ImmutableMapState.createState(stateDesc, serializedState); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java deleted file mode 100644 index 7fd6457..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * A read-only {@link ValueState} that does not allow for modifications. - * - * <p>This is the result returned when querying Flink's keyed state using the - * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and - * providing an {@link ValueStateDescriptor}. - */ -@PublicEvolving -public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> { - - private final V value; - - private ImmutableValueState(V value) { - this.value = Preconditions.checkNotNull(value); - } - - @Override - public V value() { - return value; - } - - @Override - public void update(V newValue) { - throw MODIFICATION_ATTEMPT_ERROR; - } - - @Override - public void clear() { - throw MODIFICATION_ATTEMPT_ERROR; - } - - public static <V> ImmutableValueState<V> createState( - final ValueStateDescriptor<V> stateDescriptor, - final byte[] serializedState) throws IOException { - - final V state = KvStateSerializer.deserializeValue( - serializedState, - stateDescriptor.getSerializer()); - return new ImmutableValueState<>(state); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java deleted file mode 100644 index eedc2a1..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.messages; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.queryablestate.network.messages.MessageDeserializer; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; - -import java.nio.ByteBuffer; - -/** - * The request to be forwarded by the {@link org.apache.flink.runtime.query.KvStateClientProxy - * Queryable State Client Proxy} to the {@link org.apache.flink.runtime.query.KvStateServer State Server} - * of the Task Manager responsible for the requested state. - */ -@Internal -public class KvStateInternalRequest extends MessageBody { - - private final KvStateID kvStateId; - private final byte[] serializedKeyAndNamespace; - - public KvStateInternalRequest( - final KvStateID stateId, - final byte[] serializedKeyAndNamespace) { - - this.kvStateId = Preconditions.checkNotNull(stateId); - this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace); - } - - public KvStateID getKvStateId() { - return kvStateId; - } - - public byte[] getSerializedKeyAndNamespace() { - return serializedKeyAndNamespace; - } - - @Override - public byte[] serialize() { - - // KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace - final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length; - - return ByteBuffer.allocate(size) - .putLong(kvStateId.getLowerPart()) - .putLong(kvStateId.getUpperPart()) - .putInt(serializedKeyAndNamespace.length) - .put(serializedKeyAndNamespace) - .array(); - } - - /** - * A {@link MessageDeserializer deserializer} for {@link KvStateInternalRequest}. - */ - public static class KvStateInternalRequestDeserializer implements MessageDeserializer<KvStateInternalRequest> { - - @Override - public KvStateInternalRequest deserializeMessage(ByteBuf buf) { - KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong()); - - int length = buf.readInt(); - Preconditions.checkArgument(length >= 0, - "Negative length for key and namespace. " + - "This indicates a serialization error."); - - byte[] serializedKeyAndNamespace = new byte[length]; - if (length > 0) { - buf.readBytes(serializedKeyAndNamespace); - } - return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java deleted file mode 100644 index 7eb39c7..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.messages; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobID; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.queryablestate.network.messages.MessageDeserializer; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -/** - * The request to be sent by the {@link org.apache.flink.queryablestate.client.QueryableStateClient - * Queryable State Client} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy} - * requesting a given state. - */ -@Internal -public class KvStateRequest extends MessageBody { - - private final JobID jobId; - private final String stateName; - private final int keyHashCode; - private final byte[] serializedKeyAndNamespace; - - public KvStateRequest( - final JobID jobId, - final String stateName, - final int keyHashCode, - final byte[] serializedKeyAndNamespace) { - - this.jobId = Preconditions.checkNotNull(jobId); - this.stateName = Preconditions.checkNotNull(stateName); - this.keyHashCode = keyHashCode; - this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace); - } - - public JobID getJobId() { - return jobId; - } - - public String getStateName() { - return stateName; - } - - public int getKeyHashCode() { - return keyHashCode; - } - - public byte[] getSerializedKeyAndNamespace() { - return serializedKeyAndNamespace; - } - - @Override - public byte[] serialize() { - - byte[] serializedStateName = stateName.getBytes(); - - // JobID + stateName + sizeOf(stateName) + hashCode + keyAndNamespace + sizeOf(keyAndNamespace) - final int size = - JobID.SIZE + - serializedStateName.length + Integer.BYTES + - Integer.BYTES + - serializedKeyAndNamespace.length + Integer.BYTES; - - return ByteBuffer.allocate(size) - .putLong(jobId.getLowerPart()) - .putLong(jobId.getUpperPart()) - .putInt(serializedStateName.length) - .put(serializedStateName) - .putInt(keyHashCode) - .putInt(serializedKeyAndNamespace.length) - .put(serializedKeyAndNamespace) - .array(); - } - - @Override - public String toString() { - return "KvStateRequest{" + - "jobId=" + jobId + - ", stateName='" + stateName + '\'' + - ", keyHashCode=" + keyHashCode + - ", serializedKeyAndNamespace=" + Arrays.toString(serializedKeyAndNamespace) + - '}'; - } - - /** - * A {@link MessageDeserializer deserializer} for {@link KvStateRequest}. - */ - public static class KvStateRequestDeserializer implements MessageDeserializer<KvStateRequest> { - - @Override - public KvStateRequest deserializeMessage(ByteBuf buf) { - JobID jobId = new JobID(buf.readLong(), buf.readLong()); - - int statenameLength = buf.readInt(); - Preconditions.checkArgument(statenameLength >= 0, - "Negative length for state name. " + - "This indicates a serialization error."); - - String stateName = ""; - if (statenameLength > 0) { - byte[] name = new byte[statenameLength]; - buf.readBytes(name); - stateName = new String(name); - } - - int keyHashCode = buf.readInt(); - - int knamespaceLength = buf.readInt(); - Preconditions.checkArgument(knamespaceLength >= 0, - "Negative length for key and namespace. " + - "This indicates a serialization error."); - - byte[] serializedKeyAndNamespace = new byte[knamespaceLength]; - if (knamespaceLength > 0) { - buf.readBytes(serializedKeyAndNamespace); - } - return new KvStateRequest(jobId, stateName, keyHashCode, serializedKeyAndNamespace); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java deleted file mode 100644 index 462135f..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateResponse.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.messages; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.queryablestate.network.messages.MessageDeserializer; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; - -import java.nio.ByteBuffer; - -/** - * The response containing the (serialized) state sent by the {@link org.apache.flink.runtime.query.KvStateServer - * State Server} to the {@link org.apache.flink.runtime.query.KvStateClientProxy Client Proxy}, and then forwarded - * by the proxy to the original {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State - * Client}. - */ -@Internal -public class KvStateResponse extends MessageBody { - - private final byte[] content; - - public KvStateResponse(final byte[] content) { - this.content = Preconditions.checkNotNull(content); - } - - public byte[] getContent() { - return content; - } - - @Override - public byte[] serialize() { - final int size = Integer.BYTES + content.length; - return ByteBuffer.allocate(size) - .putInt(content.length) - .put(content) - .array(); - } - - /** - * A {@link MessageDeserializer deserializer} for {@link KvStateResponseDeserializer}. - */ - public static class KvStateResponseDeserializer implements MessageDeserializer<KvStateResponse> { - - @Override - public KvStateResponse deserializeMessage(ByteBuf buf) { - int length = buf.readInt(); - Preconditions.checkArgument(length >= 0, - "Negative length for state content. " + - "This indicates a serialization error."); - byte[] content = new byte[length]; - buf.readBytes(content); - - return new KvStateResponse(content); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java deleted file mode 100644 index be852fb..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.runtime.io.network.netty.NettyBufferPool; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * The base class for every server in the queryable state module. - * It is using pure netty to send and receive messages of type {@link MessageBody}. - * - * @param <REQ> the type of request the server expects to receive. - * @param <RESP> the type of response the server will send. - */ -@Internal -public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class); - - /** AbstractServerBase config: low water mark. */ - private static final int LOW_WATER_MARK = 8 * 1024; - - /** AbstractServerBase config: high water mark. */ - private static final int HIGH_WATER_MARK = 32 * 1024; - - /** The name of the server, useful for debugging. */ - private final String serverName; - - /** The {@link InetAddress address} to listen to. */ - private final InetAddress bindAddress; - - /** A port range on which to try to connect. */ - private final Set<Integer> bindPortRange; - - /** The number of threads to be allocated to the event loop. */ - private final int numEventLoopThreads; - - /** The number of threads to be used for query serving. */ - private final int numQueryThreads; - - /** Netty's ServerBootstrap. */ - private ServerBootstrap bootstrap; - - /** Query executor thread pool. */ - private ExecutorService queryExecutor; - - /** Address of this server. */ - private KvStateServerAddress serverAddress; - - /** The handler used for the incoming messages. */ - private AbstractServerHandler<REQ, RESP> handler; - - /** - * Creates the {@link AbstractServerBase}. - * - * <p>The server needs to be started via {@link #start()}. - * - * @param serverName the name of the server - * @param bindAddress address to bind to - * @param bindPortIterator port to bind to - * @param numEventLoopThreads number of event loop threads - */ - protected AbstractServerBase( - final String serverName, - final InetAddress bindAddress, - final Iterator<Integer> bindPortIterator, - final Integer numEventLoopThreads, - final Integer numQueryThreads) { - - Preconditions.checkNotNull(bindPortIterator); - Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); - Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); - - this.serverName = Preconditions.checkNotNull(serverName); - this.bindAddress = Preconditions.checkNotNull(bindAddress); - this.numEventLoopThreads = numEventLoopThreads; - this.numQueryThreads = numQueryThreads; - - this.bindPortRange = new HashSet<>(); - while (bindPortIterator.hasNext()) { - int port = bindPortIterator.next(); - Preconditions.checkArgument(port >= 0 && port <= 65535, - "Invalid port configuration. Port must be between 0 and 65535, but was " + port + "."); - bindPortRange.add(port); - } - } - - /** - * Creates a thread pool for the query execution. - * @return Thread pool for query execution - */ - private ExecutorService createQueryExecutor() { - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink " + getServerName() + " Thread %d") - .build(); - return Executors.newFixedThreadPool(numQueryThreads, threadFactory); - } - - /** - * Returns the thread-pool responsible for processing incoming requests. - */ - protected ExecutorService getQueryExecutor() { - return queryExecutor; - } - - /** - * Gets the name of the server. This is useful for debugging. - * @return The name of the server. - */ - public String getServerName() { - return serverName; - } - - /** - * Returns the {@link AbstractServerHandler handler} to be used for - * serving the incoming requests. - */ - public abstract AbstractServerHandler<REQ, RESP> initializeHandler(); - - /** - * Returns the address of this server. - * - * @return AbstractServerBase address - * @throws IllegalStateException If server has not been started yet - */ - public KvStateServerAddress getServerAddress() { - Preconditions.checkState(serverAddress != null, "Server " + serverName + " has not been started."); - return serverAddress; - } - - /** - * Starts the server by binding to the configured bind address (blocking). - * @throws Exception If something goes wrong during the bind operation. - */ - public void start() throws Throwable { - Preconditions.checkState(serverAddress == null, - "Server " + serverName + " already running @ " + serverAddress + '.'); - - Iterator<Integer> portIterator = bindPortRange.iterator(); - while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {} - - if (serverAddress != null) { - LOG.info("Started server {} @ {}.", serverName, serverAddress); - } else { - LOG.info("Unable to start server {}. All ports in provided range are occupied.", serverName); - throw new FlinkRuntimeException("Unable to start server " + serverName + ". All ports in provided range are occupied."); - } - } - - /** - * Tries to start the server at the provided port. - * - * <p>This, in conjunction with {@link #start()}, try to start the - * server on a free port among the port range provided at the constructor. - * - * @param port the port to try to bind the server to. - * @throws Exception If something goes wrong during the bind operation. - */ - private boolean attemptToBind(final int port) throws Throwable { - LOG.debug("Attempting to start server {} on port {}.", serverName, port); - - this.queryExecutor = createQueryExecutor(); - this.handler = initializeHandler(); - - final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); - - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink " + serverName + " EventLoop Thread %d") - .build(); - - final NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); - - this.bootstrap = new ServerBootstrap() - .localAddress(bindAddress, port) - .group(nioGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.ALLOCATOR, bufferPool) - .childOption(ChannelOption.ALLOCATOR, bufferPool) - .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) - .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK) - .childHandler(new ServerChannelInitializer<>(handler)); - - try { - final ChannelFuture future = bootstrap.bind().sync(); - if (future.isSuccess()) { - final InetSocketAddress localAddress = (InetSocketAddress) future.channel().localAddress(); - serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); - return true; - } - - // the following throw is to bypass Netty's "optimization magic" - // and catch the bind exception. - // the exception is thrown by the sync() call above. - - throw future.cause(); - } catch (BindException e) { - LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage()); - shutdown(); - } - // any other type of exception we let it bubble up. - return false; - } - - /** - * Shuts down the server and all related thread pools. - */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } - - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } - serverAddress = null; - } - - /** - * Channel pipeline initializer. - * - * <p>The request handler is shared, whereas the other handlers are created - * per channel. - */ - private static final class ServerChannelInitializer<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInitializer<SocketChannel> { - - /** The shared request handler. */ - private final AbstractServerHandler<REQ, RESP> sharedRequestHandler; - - /** - * Creates the channel pipeline initializer with the shared request handler. - * - * @param sharedRequestHandler Shared request handler. - */ - ServerChannelInitializer(AbstractServerHandler<REQ, RESP> sharedRequestHandler) { - this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "MessageBody handler"); - } - - @Override - protected void initChannel(SocketChannel channel) throws Exception { - channel.pipeline() - .addLast(new ChunkedWriteHandler()) - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - .addLast(sharedRequestHandler); - } - } - - @VisibleForTesting - public boolean isExecutorShutdown() { - return queryExecutor.isShutdown(); - } -}
