[FLINK-7769][QS] Move queryable state outside the runtime. Creates a separate for the queryable state and moves the client code outside the runtime. The Task Manager is now instantiating the KvStateServer using reflection.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29a6e995 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29a6e995 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29a6e995 Branch: refs/heads/master Commit: 29a6e9952ebb2c7349d25d3696e2ec1d7e8e620a Parents: bc4638a Author: kkloudas <[email protected]> Authored: Wed Oct 4 19:11:09 2017 +0200 Committer: kkloudas <[email protected]> Committed: Wed Oct 11 15:33:30 2017 +0200 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 6 +- .../streaming/state/RocksDBMapState.java | 8 +- .../flink-queryable-state-java/pom.xml | 143 +++ .../flink/queryablestate/UnknownJobManager.java | 35 + .../queryablestate/UnknownKeyOrNamespace.java | 31 + .../flink/queryablestate/UnknownKvStateID.java | 35 + .../UnknownKvStateKeyGroupLocation.java | 31 + .../AkkaKvStateLocationLookupService.java | 325 +++++ .../queryablestate/client/KvStateClient.java | 583 +++++++++ .../client/KvStateClientHandler.java | 107 ++ .../client/KvStateClientHandlerCallback.java | 54 + .../client/KvStateLocationLookupService.java | 51 + .../client/QueryableStateClient.java | 590 +++++++++ .../queryablestate/messages/KvStateRequest.java | 89 ++ .../messages/KvStateRequestFailure.java | 68 ++ .../messages/KvStateRequestResult.java | 74 ++ .../network/messages/MessageSerializer.java | 332 ++++++ .../network/messages/MessageType.java | 39 + .../queryablestate/server/ChunkedByteBuf.java | 98 ++ .../server/KvStateServerHandler.java | 308 +++++ .../server/KvStateServerImpl.java | 230 ++++ .../itcases/AbstractQueryableStateITCase.java | 1128 ++++++++++++++++++ .../itcases/HAAbstractQueryableStateITCase.java | 101 ++ .../HAQueryableStateITCaseFsBackend.java | 39 + .../HAQueryableStateITCaseRocksDBBackend.java | 39 + .../KVStateRequestSerializerRocksDBTest.java | 167 +++ .../NonHAAbstractQueryableStateITCase.java | 81 ++ .../NonHAQueryableStateITCaseFsBackend.java | 39 + ...NonHAQueryableStateITCaseRocksDBBackend.java | 39 + .../AkkaKvStateLocationLookupServiceTest.java | 399 +++++++ .../network/KvStateClientHandlerTest.java | 117 ++ .../network/KvStateClientTest.java | 752 ++++++++++++ .../network/KvStateRequestSerializerTest.java | 214 ++++ .../network/KvStateServerHandlerTest.java | 728 +++++++++++ .../network/KvStateServerTest.java | 201 ++++ .../network/QueryableStateClientTest.java | 458 +++++++ .../src/test/resources/log4j-test.properties | 31 + flink-queryable-state/pom.xml | 54 + .../runtime/io/network/NetworkEnvironment.java | 2 +- .../query/AkkaKvStateLocationLookupService.java | 322 ----- .../flink/runtime/query/KvStateLocation.java | 10 +- .../query/KvStateLocationLookupService.java | 50 - .../flink/runtime/query/KvStateRegistry.java | 1 - .../flink/runtime/query/KvStateServer.java | 43 + .../runtime/query/KvStateServerAddress.java | 6 +- .../runtime/query/QueryableStateClient.java | 587 --------- .../runtime/query/QueryableStateUtils.java | 89 ++ .../flink/runtime/query/UnknownJobManager.java | 33 - .../query/UnknownKvStateKeyGroupLocation.java | 29 - .../runtime/query/netty/ChunkedByteBuf.java | 98 -- .../runtime/query/netty/KvStateClient.java | 579 --------- .../query/netty/KvStateClientHandler.java | 106 -- .../netty/KvStateClientHandlerCallback.java | 54 - .../query/netty/KvStateRequestStats.java | 2 + .../runtime/query/netty/KvStateServer.java | 239 ---- .../query/netty/KvStateServerHandler.java | 305 ----- .../query/netty/UnknownKeyOrNamespace.java | 31 - .../runtime/query/netty/UnknownKvStateID.java | 35 - .../query/netty/message/KvStateRequest.java | 89 -- .../netty/message/KvStateRequestFailure.java | 68 -- .../netty/message/KvStateRequestResult.java | 74 -- .../netty/message/KvStateRequestSerializer.java | 568 --------- .../query/netty/message/KvStateRequestType.java | 40 - .../query/netty/message/KvStateSerializer.java | 267 +++++ .../flink/runtime/query/netty/package-info.java | 80 -- .../flink/runtime/query/package-info.java | 60 - .../runtime/state/heap/AbstractHeapState.java | 6 +- .../flink/runtime/state/heap/HeapMapState.java | 4 +- .../QueryableStateConfiguration.java | 6 +- .../taskexecutor/TaskManagerServices.java | 34 +- .../TaskManagerServicesConfiguration.java | 9 +- .../AkkaKvStateLocationLookupServiceTest.java | 398 ------ .../runtime/query/QueryableStateClientTest.java | 449 ------- .../query/netty/KvStateClientHandlerTest.java | 115 -- .../runtime/query/netty/KvStateClientTest.java | 747 ------------ .../query/netty/KvStateServerHandlerTest.java | 721 ----------- .../runtime/query/netty/KvStateServerTest.java | 196 --- .../message/KvStateRequestSerializerTest.java | 219 +--- .../runtime/state/StateBackendTestBase.java | 14 +- .../query/AbstractQueryableStateITCase.java | 1128 ------------------ .../query/HAAbstractQueryableStateITCase.java | 102 -- .../query/HAQueryableStateITCaseFsBackend.java | 39 - .../HAQueryableStateITCaseRocksDBBackend.java | 39 - .../KVStateRequestSerializerRocksDBTest.java | 168 --- .../NonHAAbstractQueryableStateITCase.java | 81 -- .../NonHAQueryableStateITCaseFsBackend.java | 39 - ...NonHAQueryableStateITCaseRocksDBBackend.java | 39 - pom.xml | 1 + 88 files changed, 8299 insertions(+), 7946 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index c061835..cf365b4 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -27,7 +27,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; @@ -125,8 +125,8 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - //TODO make KvStateRequestSerializer key-group aware to save this round trip and key-group computation - Tuple2<K, N> des = KvStateRequestSerializer.<K, N>deserializeKeyAndNamespace( + //TODO make KvStateSerializer key-group aware to save this round trip and key-group computation + Tuple2<K, N> des = KvStateSerializer.<K, N>deserializeKeyAndNamespace( serializedKeyAndNamespace, backend.getKeySerializer(), namespaceSerializer); http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 75c1651..421bb2e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.Preconditions; @@ -223,8 +223,8 @@ public class RocksDBMapState<K, N, UK, UV> public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - //TODO make KvStateRequestSerializer key-group aware to save this round trip and key-group computation - Tuple2<K, N> des = KvStateRequestSerializer.deserializeKeyAndNamespace( + //TODO make KvStateSerializer key-group aware to save this round trip and key-group computation + Tuple2<K, N> des = KvStateSerializer.deserializeKeyAndNamespace( serializedKeyAndNamespace, backend.getKeySerializer(), namespaceSerializer); @@ -248,7 +248,7 @@ public class RocksDBMapState<K, N, UK, UV> return null; } - return KvStateRequestSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() { + return KvStateSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() { @Override public Iterator<Map.Entry<UK, UV>> iterator() { return iterator; http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/pom.xml b/flink-queryable-state/flink-queryable-state-java/pom.xml new file mode 100644 index 0000000..63403df --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/pom.xml @@ -0,0 +1,143 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-queryable-state</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-queryable-state-java_${scala.binary.version}</artifactId> + <name>flink-queryable-state-java</name> + <packaging>jar</packaging> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <!-- =================================================== + Testing + =================================================== --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.data-artisans</groupId> + <artifactId>flakka-testkit_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java new file mode 100644 index 0000000..93f2ba5 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate; + +import org.apache.flink.queryablestate.client.KvStateLocationLookupService; + +/** + * Exception to fail Future with if no JobManager is currently registered at + * the {@link KvStateLocationLookupService}. + */ +public class UnknownJobManager extends Exception { + + private static final long serialVersionUID = 1L; + + public UnknownJobManager() { + super("Unknown JobManager. Either the JobManager has not registered yet " + + "or has lost leadership."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java new file mode 100644 index 0000000..e921e40 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate; + +/** + * Thrown if the KvState does not hold any state for the given key or namespace. + */ +public class UnknownKeyOrNamespace extends IllegalStateException { + + private static final long serialVersionUID = 1L; + + public UnknownKeyOrNamespace() { + super("KvState does not hold any state for key/namespace."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java new file mode 100644 index 0000000..d5ff828 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate; + +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.util.Preconditions; + +/** + * Thrown if no KvState with the given ID cannot found by the server handler. + */ +public class UnknownKvStateID extends IllegalStateException { + + private static final long serialVersionUID = 1L; + + public UnknownKvStateID(KvStateID kvStateId) { + super("No KvState registered with ID " + Preconditions.checkNotNull(kvStateId, "KvStateID") + + " at TaskManager."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java new file mode 100644 index 0000000..fd25fae --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate; + +import org.apache.flink.runtime.query.KvStateLocation; + +/** + * Exception thrown if there is no location information available for the given + * key group in a {@link KvStateLocation} instance. + */ +public class UnknownKvStateKeyGroupLocation extends Exception { + + private static final long serialVersionUID = 1L; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java new file mode 100644 index 0000000..f42e008 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.UnknownJobManager; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateMessage; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.dispatch.Mapper; +import akka.dispatch.Recover; +import akka.pattern.Patterns; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; +import java.util.concurrent.Callable; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import scala.reflect.ClassTag$; + +/** + * Akka-based {@link KvStateLocationLookupService} that retrieves the current + * JobManager address and uses it for lookups. + */ +public class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener { + + private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class); + + /** Future returned when no JobManager is available. */ + private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager()); + + /** Leader retrieval service to retrieve the current job manager. */ + private final LeaderRetrievalService leaderRetrievalService; + + /** The actor system used to resolve the JobManager address. */ + private final ActorSystem actorSystem; + + /** Timeout for JobManager ask-requests. */ + private final FiniteDuration askTimeout; + + /** Retry strategy factory on future failures. */ + private final LookupRetryStrategyFactory retryStrategyFactory; + + /** Current job manager future. */ + private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER; + + /** + * Creates the Akka-based {@link KvStateLocationLookupService}. + * + * @param leaderRetrievalService Leader retrieval service to use. + * @param actorSystem Actor system to use. + * @param askTimeout Timeout for JobManager ask-requests. + * @param retryStrategyFactory Retry strategy if no JobManager available. + */ + public AkkaKvStateLocationLookupService( + LeaderRetrievalService leaderRetrievalService, + ActorSystem actorSystem, + FiniteDuration askTimeout, + LookupRetryStrategyFactory retryStrategyFactory) { + + this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service"); + this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system"); + this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout"); + this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory"); + } + + public void start() { + try { + leaderRetrievalService.start(this); + } catch (Exception e) { + LOG.error("Failed to start leader retrieval service", e); + throw new RuntimeException(e); + } + } + + public void shutDown() { + try { + leaderRetrievalService.stop(); + } catch (Exception e) { + LOG.error("Failed to stop leader retrieval service", e); + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("unchecked") + public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) { + return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy()); + } + + /** + * Returns a future holding the {@link KvStateLocation} for the given job + * and KvState registration name. + * + * <p>If there is currently no JobManager registered with the service, the + * request is retried. The retry behaviour is specified by the + * {@link LookupRetryStrategy} of the lookup service. + * + * @param jobId JobID the KvState instance belongs to + * @param registrationName Name under which the KvState has been registered + * @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures. + * @return Future holding the {@link KvStateLocation} + */ + @SuppressWarnings("unchecked") + private Future<KvStateLocation> getKvStateLookupInfo( + final JobID jobId, + final String registrationName, + final LookupRetryStrategy lookupRetryStrategy) { + + return jobManagerFuture + .flatMap(new Mapper<ActorGateway, Future<Object>>() { + @Override + public Future<Object> apply(ActorGateway jobManager) { + // Lookup the KvStateLocation + Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName); + return jobManager.ask(msg, askTimeout); + } + }, actorSystem.dispatcher()) + .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)) + .recoverWith(new Recover<Future<KvStateLocation>>() { + @Override + public Future<KvStateLocation> recover(Throwable failure) throws Throwable { + // If the Future fails with UnknownJobManager, retry + // the request. Otherwise all Futures will be failed + // during the start up phase, when the JobManager did + // not notify this service yet or leadership is lost + // intermittently. + if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) { + return Patterns.after( + lookupRetryStrategy.getRetryDelay(), + actorSystem.scheduler(), + actorSystem.dispatcher(), + new Callable<Future<KvStateLocation>>() { + @Override + public Future<KvStateLocation> call() throws Exception { + return getKvStateLookupInfo( + jobId, + registrationName, + lookupRetryStrategy); + } + }); + } else { + return Futures.failed(failure); + } + } + }, actorSystem.dispatcher()); + } + + @Override + public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID); + } + + if (leaderAddress == null) { + jobManagerFuture = UNKNOWN_JOB_MANAGER; + } else { + jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout) + .map(new Mapper<ActorRef, ActorGateway>() { + @Override + public ActorGateway apply(ActorRef actorRef) { + return new AkkaActorGateway(actorRef, leaderSessionID); + } + }, actorSystem.dispatcher()); + } + } + + @Override + public void handleError(Exception exception) { + jobManagerFuture = Futures.failed(exception); + } + + // ------------------------------------------------------------------------ + + /** + * Retry strategy for failed lookups. + * + * <p>Usage: + * <pre> + * LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create(); + * + * if (retryStrategy.tryRetry()) { + * // OK to retry + * FiniteDuration retryDelay = retryStrategy.getRetryDelay(); + * } + * </pre> + */ + public interface LookupRetryStrategy { + + /** + * Returns the current retry. + * + * @return Current retry delay. + */ + FiniteDuration getRetryDelay(); + + /** + * Tries another retry and returns whether it is allowed or not. + * + * @return Whether it is allowed to do another restart or not. + */ + boolean tryRetry(); + + } + + /** + * Factory for retry strategies. + */ + public interface LookupRetryStrategyFactory { + + /** + * Creates a new retry strategy. + * + * @return The retry strategy. + */ + LookupRetryStrategy createRetryStrategy(); + + } + + /** + * Factory for disabled retries. + */ + public static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory { + + private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy(); + + @Override + public LookupRetryStrategy createRetryStrategy() { + return RETRY_STRATEGY; + } + + private static class DisabledLookupRetryStrategy implements LookupRetryStrategy { + + @Override + public FiniteDuration getRetryDelay() { + return FiniteDuration.Zero(); + } + + @Override + public boolean tryRetry() { + return false; + } + } + + } + + /** + * Factory for fixed delay retries. + */ + public static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory { + + private final int maxRetries; + private final FiniteDuration retryDelay; + + FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) { + this.maxRetries = maxRetries; + this.retryDelay = retryDelay; + } + + @Override + public LookupRetryStrategy createRetryStrategy() { + return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay); + } + + private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy { + + private final Object retryLock = new Object(); + private final int maxRetries; + private final FiniteDuration retryDelay; + private int numRetries; + + public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) { + Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries"); + this.maxRetries = maxRetries; + this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay"); + } + + @Override + public FiniteDuration getRetryDelay() { + synchronized (retryLock) { + return retryDelay; + } + } + + @Override + public boolean tryRetry() { + synchronized (retryLock) { + if (numRetries < maxRetries) { + numRetries++; + return true; + } else { + return false; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java new file mode 100644 index 0000000..d456cd7 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.queryablestate.UnknownKeyOrNamespace; +import org.apache.flink.queryablestate.UnknownKvStateID; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.runtime.io.network.netty.NettyBufferPool; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; + +import akka.dispatch.Futures; + +import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import scala.concurrent.Future; +import scala.concurrent.Promise; + +/** + * Netty-based client querying {@link KvStateServer} instances. + * + * <p>This client can be used by multiple threads concurrently. Operations are + * executed asynchronously and return Futures to their result. + * + * <p>The incoming pipeline looks as follows: + * <pre> + * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler + * </pre> + * + * <p>Received binary messages are expected to contain a frame length field. Netty's + * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before + * giving it to our {@link KvStateClientHandler}. + * + * <p>Connections are established and closed by the client. The server only + * closes the connection on a fatal failure that cannot be recovered. + */ +public class KvStateClient { + + /** Netty's Bootstrap. */ + private final Bootstrap bootstrap; + + /** Statistics tracker. */ + private final KvStateRequestStats stats; + + /** Established connections. */ + private final ConcurrentHashMap<KvStateServerAddress, EstablishedConnection> establishedConnections = + new ConcurrentHashMap<>(); + + /** Pending connections. */ + private final ConcurrentHashMap<KvStateServerAddress, PendingConnection> pendingConnections = + new ConcurrentHashMap<>(); + + /** Atomic shut down flag. */ + private final AtomicBoolean shutDown = new AtomicBoolean(); + + /** + * Creates a client with the specified number of event loop threads. + * + * @param numEventLoopThreads Number of event loop threads (minimum 1). + */ + public KvStateClient(int numEventLoopThreads, KvStateRequestStats stats) { + Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); + NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Flink KvStateClient Event Loop Thread %d") + .build(); + + NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); + + this.bootstrap = new Bootstrap() + .group(nioGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.ALLOCATOR, bufferPool) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) + // ChunkedWriteHandler respects Channel writability + .addLast(new ChunkedWriteHandler()); + } + }); + + this.stats = Preconditions.checkNotNull(stats, "Statistics tracker"); + } + + /** + * Returns a future holding the serialized request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param serverAddress Address of the server to query + * @param kvStateId ID of the KvState instance to query + * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance with + * @return Future holding the serialized result + */ + public Future<byte[]> getKvState( + KvStateServerAddress serverAddress, + KvStateID kvStateId, + byte[] serializedKeyAndNamespace) { + + if (shutDown.get()) { + return Futures.failed(new IllegalStateException("Shut down")); + } + + EstablishedConnection connection = establishedConnections.get(serverAddress); + + if (connection != null) { + return connection.getKvState(kvStateId, serializedKeyAndNamespace); + } else { + PendingConnection pendingConnection = pendingConnections.get(serverAddress); + if (pendingConnection != null) { + // There was a race, use the existing pending connection. + return pendingConnection.getKvState(kvStateId, serializedKeyAndNamespace); + } else { + // We try to connect to the server. + PendingConnection pending = new PendingConnection(serverAddress); + PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending); + + if (previous == null) { + // OK, we are responsible to connect. + bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()) + .addListener(pending); + + return pending.getKvState(kvStateId, serializedKeyAndNamespace); + } else { + // There was a race, use the existing pending connection. + return previous.getKvState(kvStateId, serializedKeyAndNamespace); + } + } + } + } + + /** + * Shuts down the client and closes all connections. + * + * <p>After a call to this method, all returned futures will be failed. + */ + public void shutDown() { + if (shutDown.compareAndSet(false, true)) { + for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) { + if (establishedConnections.remove(conn.getKey(), conn.getValue())) { + conn.getValue().close(); + } + } + + for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) { + if (pendingConnections.remove(conn.getKey()) != null) { + conn.getValue().close(); + } + } + + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0, 10, TimeUnit.SECONDS); + } + } + } + } + + /** + * Closes the connection to the given server address if it exists. + * + * <p>If there is a request to the server a new connection will be established. + * + * @param serverAddress Target address of the connection to close + */ + public void closeConnection(KvStateServerAddress serverAddress) { + PendingConnection pending = pendingConnections.get(serverAddress); + if (pending != null) { + pending.close(); + } + + EstablishedConnection established = establishedConnections.remove(serverAddress); + if (established != null) { + established.close(); + } + } + + /** + * A pending connection that is in the process of connecting. + */ + private class PendingConnection implements ChannelFutureListener { + + /** Lock to guard the connect call, channel hand in, etc. */ + private final Object connectLock = new Object(); + + /** Address of the server we are connecting to. */ + private final KvStateServerAddress serverAddress; + + /** Queue of requests while connecting. */ + private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>(); + + /** The established connection after the connect succeeds. */ + private EstablishedConnection established; + + /** Closed flag. */ + private boolean closed; + + /** Failure cause if something goes wrong. */ + private Throwable failureCause; + + /** + * Creates a pending connection to the given server. + * + * @param serverAddress Address of the server to connect to. + */ + private PendingConnection(KvStateServerAddress serverAddress) { + this.serverAddress = serverAddress; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Callback from the Bootstrap's connect call. + if (future.isSuccess()) { + handInChannel(future.channel()); + } else { + close(future.cause()); + } + } + + /** + * Returns a future holding the serialized request result. + * + * <p>If the channel has been established, forward the call to the + * established channel, otherwise queue it for when the channel is + * handed in. + * + * @param kvStateId ID of the KvState instance to query + * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance + * with + * @return Future holding the serialized result + */ + public Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) { + synchronized (connectLock) { + if (failureCause != null) { + return Futures.failed(failureCause); + } else if (closed) { + return Futures.failed(new ClosedChannelException()); + } else { + if (established != null) { + return established.getKvState(kvStateId, serializedKeyAndNamespace); + } else { + // Queue this and handle when connected + PendingRequest pending = new PendingRequest(kvStateId, serializedKeyAndNamespace); + queuedRequests.add(pending); + return pending.promise.future(); + } + } + } + } + + /** + * Hands in a channel after a successful connection. + * + * @param channel Channel to hand in + */ + private void handInChannel(Channel channel) { + synchronized (connectLock) { + if (closed || failureCause != null) { + // Close the channel and we are done. Any queued requests + // are removed on the close/failure call and after that no + // new ones can be enqueued. + channel.close(); + } else { + established = new EstablishedConnection(serverAddress, channel); + + PendingRequest pending; + while ((pending = queuedRequests.poll()) != null) { + Future<byte[]> resultFuture = established.getKvState( + pending.kvStateId, + pending.serializedKeyAndNamespace); + + pending.promise.completeWith(resultFuture); + } + + // Publish the channel for the general public + establishedConnections.put(serverAddress, established); + pendingConnections.remove(serverAddress); + + // Check shut down for possible race with shut down. We + // don't want any lingering connections after shut down, + // which can happen if we don't check this here. + if (shutDown.get()) { + if (establishedConnections.remove(serverAddress, established)) { + established.close(); + } + } + } + } + } + + /** + * Close the connecting channel with a ClosedChannelException. + */ + private void close() { + close(new ClosedChannelException()); + } + + /** + * Close the connecting channel with an Exception (can be + * <code>null</code>) or forward to the established channel. + */ + private void close(Throwable cause) { + synchronized (connectLock) { + if (!closed) { + if (failureCause == null) { + failureCause = cause; + } + + if (established != null) { + established.close(); + } else { + PendingRequest pending; + while ((pending = queuedRequests.poll()) != null) { + pending.promise.tryFailure(cause); + } + } + + closed = true; + } + } + } + + /** + * A pending request queued while the channel is connecting. + */ + private final class PendingRequest { + + private final KvStateID kvStateId; + private final byte[] serializedKeyAndNamespace; + private final Promise<byte[]> promise; + + private PendingRequest(KvStateID kvStateId, byte[] serializedKeyAndNamespace) { + this.kvStateId = kvStateId; + this.serializedKeyAndNamespace = serializedKeyAndNamespace; + this.promise = Futures.promise(); + } + } + + @Override + public String toString() { + synchronized (connectLock) { + return "PendingConnection{" + + "serverAddress=" + serverAddress + + ", queuedRequests=" + queuedRequests.size() + + ", established=" + (established != null) + + ", closed=" + closed + + '}'; + } + } + } + + /** + * An established connection that wraps the actual channel instance and is + * registered at the {@link KvStateClientHandler} for callbacks. + */ + private class EstablishedConnection implements KvStateClientHandlerCallback { + + /** Address of the server we are connected to. */ + private final KvStateServerAddress serverAddress; + + /** The actual TCP channel. */ + private final Channel channel; + + /** Pending requests keyed by request ID. */ + private final ConcurrentHashMap<Long, PromiseAndTimestamp> pendingRequests = new ConcurrentHashMap<>(); + + /** Current request number used to assign unique request IDs. */ + private final AtomicLong requestCount = new AtomicLong(); + + /** Reference to a failure that was reported by the channel. */ + private final AtomicReference<Throwable> failureCause = new AtomicReference<>(); + + /** + * Creates an established connection with the given channel. + * + * @param serverAddress Address of the server connected to + * @param channel The actual TCP channel + */ + EstablishedConnection(KvStateServerAddress serverAddress, Channel channel) { + this.serverAddress = Preconditions.checkNotNull(serverAddress, "KvStateServerAddress"); + this.channel = Preconditions.checkNotNull(channel, "Channel"); + + // Add the client handler with the callback + channel.pipeline().addLast("KvStateClientHandler", new KvStateClientHandler(this)); + + stats.reportActiveConnection(); + } + + /** + * Close the channel with a ClosedChannelException. + */ + void close() { + close(new ClosedChannelException()); + } + + /** + * Close the channel with a cause. + * + * @param cause The cause to close the channel with. + * @return Channel close future + */ + private boolean close(Throwable cause) { + if (failureCause.compareAndSet(null, cause)) { + channel.close(); + stats.reportInactiveConnection(); + + for (long requestId : pendingRequests.keySet()) { + PromiseAndTimestamp pending = pendingRequests.remove(requestId); + if (pending != null && pending.promise.tryFailure(cause)) { + stats.reportFailedRequest(); + } + } + + return true; + } + + return false; + } + + /** + * Returns a future holding the serialized request result. + * + * @param kvStateId ID of the KvState instance to query + * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance + * with + * @return Future holding the serialized result + */ + Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) { + PromiseAndTimestamp requestPromiseTs = new PromiseAndTimestamp( + Futures.<byte[]>promise(), + System.nanoTime()); + + try { + final long requestId = requestCount.getAndIncrement(); + pendingRequests.put(requestId, requestPromiseTs); + + stats.reportRequest(); + + ByteBuf buf = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + requestId, + kvStateId, + serializedKeyAndNamespace); + + channel.writeAndFlush(buf).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // Fail promise if not failed to write + PromiseAndTimestamp pending = pendingRequests.remove(requestId); + if (pending != null && pending.promise.tryFailure(future.cause())) { + stats.reportFailedRequest(); + } + } + } + }); + + // Check failure for possible race. We don't want any lingering + // promises after a failure, which can happen if we don't check + // this here. Note that close is treated as a failure as well. + Throwable failure = failureCause.get(); + if (failure != null) { + // Remove from pending requests to guard against concurrent + // removal and to make sure that we only count it once as failed. + PromiseAndTimestamp p = pendingRequests.remove(requestId); + if (p != null && p.promise.tryFailure(failure)) { + stats.reportFailedRequest(); + } + } + } catch (Throwable t) { + requestPromiseTs.promise.tryFailure(t); + } + + return requestPromiseTs.promise.future(); + } + + @Override + public void onRequestResult(long requestId, byte[] serializedValue) { + PromiseAndTimestamp pending = pendingRequests.remove(requestId); + if (pending != null && pending.promise.trySuccess(serializedValue)) { + long durationMillis = (System.nanoTime() - pending.timestamp) / 1_000_000; + stats.reportSuccessfulRequest(durationMillis); + } + } + + @Override + public void onRequestFailure(long requestId, Throwable cause) { + PromiseAndTimestamp pending = pendingRequests.remove(requestId); + if (pending != null && pending.promise.tryFailure(cause)) { + stats.reportFailedRequest(); + } + } + + @Override + public void onFailure(Throwable cause) { + if (close(cause)) { + // Remove from established channels, otherwise future + // requests will be handled by this failed channel. + establishedConnections.remove(serverAddress, this); + } + } + + @Override + public String toString() { + return "EstablishedConnection{" + + "serverAddress=" + serverAddress + + ", channel=" + channel + + ", pendingRequests=" + pendingRequests.size() + + ", requestCount=" + requestCount + + ", failureCause=" + failureCause + + '}'; + } + + /** + * Pair of promise and a timestamp. + */ + private class PromiseAndTimestamp { + + private final Promise<byte[]> promise; + private final long timestamp; + + public PromiseAndTimestamp(Promise<byte[]> promise, long timestamp) { + this.promise = promise; + this.timestamp = timestamp; + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java new file mode 100644 index 0000000..36a2b31 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.queryablestate.messages.KvStateRequestFailure; +import org.apache.flink.queryablestate.messages.KvStateRequestResult; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.server.KvStateServerHandler; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.channels.ClosedChannelException; + +/** + * This handler expects responses from {@link KvStateServerHandler}. + * + * <p>It deserializes the response and calls the registered callback, which is + * responsible for actually handling the result (see {@link KvStateClient.EstablishedConnection}). + */ +public class KvStateClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(KvStateClientHandler.class); + + private final KvStateClientHandlerCallback callback; + + /** + * Creates a {@link KvStateClientHandler} with the callback. + * + * @param callback Callback for responses. + */ + public KvStateClientHandler(KvStateClientHandlerCallback callback) { + this.callback = callback; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + ByteBuf buf = (ByteBuf) msg; + MessageType msgType = MessageSerializer.deserializeHeader(buf); + + if (msgType == MessageType.REQUEST_RESULT) { + KvStateRequestResult result = MessageSerializer.deserializeKvStateRequestResult(buf); + callback.onRequestResult(result.getRequestId(), result.getSerializedResult()); + } else if (msgType == MessageType.REQUEST_FAILURE) { + KvStateRequestFailure failure = MessageSerializer.deserializeKvStateRequestFailure(buf); + callback.onRequestFailure(failure.getRequestId(), failure.getCause()); + } else if (msgType == MessageType.SERVER_FAILURE) { + throw MessageSerializer.deserializeServerFailure(buf); + } else { + throw new IllegalStateException("Unexpected response type '" + msgType + "'"); + } + } catch (Throwable t1) { + try { + callback.onFailure(t1); + } catch (Throwable t2) { + LOG.error("Failed to notify callback about failure", t2); + } + } finally { + ReferenceCountUtil.release(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + try { + callback.onFailure(cause); + } catch (Throwable t) { + LOG.error("Failed to notify callback about failure", t); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // Only the client is expected to close the channel. Otherwise it + // indicates a failure. Note that this will be invoked in both cases + // though. If the callback closed the channel, the callback must be + // ignored. + try { + callback.onFailure(new ClosedChannelException()); + } catch (Throwable t) { + LOG.error("Failed to notify callback about failure", t); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java new file mode 100644 index 0000000..98718fa --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.queryablestate.messages.KvStateRequest; + +/** + * Callback for {@link KvStateClientHandler}. + */ +public interface KvStateClientHandlerCallback { + + /** + * Called on a successful {@link KvStateRequest}. + * + * @param requestId ID of the request + * @param serializedValue Serialized value for the request + */ + void onRequestResult(long requestId, byte[] serializedValue); + + /** + * Called on a failed {@link KvStateRequest}. + * + * @param requestId ID of the request + * @param cause Cause of the request failure + */ + void onRequestFailure(long requestId, Throwable cause); + + /** + * Called on any failure, which is not related to a specific request. + * + * <p>This can be for example a caught Exception in the channel pipeline + * or an unexpected channel close. + * + * @param cause Cause of the failure + */ + void onFailure(Throwable cause); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java new file mode 100644 index 0000000..635cbae --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.query.KvStateLocation; + +import scala.concurrent.Future; + +/** + * {@link KvStateLocation} lookup service. + */ +public interface KvStateLocationLookupService { + + /** + * Starts the lookup service. + */ + void start(); + + /** + * Shuts down the lookup service. + */ + void shutDown(); + + /** + * Returns a future holding the {@link KvStateLocation} for the given job + * and KvState registration name. + * + * @param jobId JobID the KvState instance belongs to + * @param registrationName Name under which the KvState has been registered + * @return Future holding the {@link KvStateLocation} + */ + Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java new file mode 100644 index 0000000..27257d7 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.queryablestate.UnknownKeyOrNamespace; +import org.apache.flink.queryablestate.UnknownKvStateID; +import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; +import org.apache.flink.util.Preconditions; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.dispatch.Mapper; +import akka.dispatch.Recover; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +/** + * Client for queryable state. + * + * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}. + * The state instance created from this descriptor will be published for queries + * when it's created on the TaskManagers and the location will be reported to + * the JobManager. + * + * <p>The client resolves the location of the requested KvState via the + * JobManager. Resolved locations are cached. When the server address of the + * requested KvState instance is determined, the client sends out a request to + * the server. + */ +public class QueryableStateClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class); + + /** + * {@link KvStateLocation} lookup to resolve the address of KvState instances. + */ + private final KvStateLocationLookupService lookupService; + + /** + * Network client for queries against {@link KvStateServer} instances. + */ + private final KvStateClient kvStateClient; + + /** + * Execution context. + */ + private final ExecutionContext executionContext; + + /** + * Cache for {@link KvStateLocation} instances keyed by job and name. + */ + private final ConcurrentMap<Tuple2<JobID, String>, Future<KvStateLocation>> lookupCache = + new ConcurrentHashMap<>(); + + /** This is != null, if we started the actor system. */ + private final ActorSystem actorSystem; + + private ExecutionConfig executionConfig; + + /** + * Creates a client from the given configuration. + * + * <p>This will create multiple Thread pools: one for the started actor + * system and another for the network client. + * + * @param config Configuration to use. + * @throws Exception Failures are forwarded + */ + public QueryableStateClient(Configuration config) throws Exception { + this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices( + config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION)); + } + + /** + * Creates a client from the given configuration. + * + * <p>This will create multiple Thread pools: one for the started actor + * system and another for the network client. + * + * @param config Configuration to use. + * @param highAvailabilityServices Service factory for high availability services + * @throws Exception Failures are forwarded + * + * @deprecated This constructor is deprecated and stays only for backwards compatibility. Use the + * {@link #QueryableStateClient(Configuration)} instead. + */ + @Deprecated + public QueryableStateClient( + Configuration config, + HighAvailabilityServices highAvailabilityServices) throws Exception { + Preconditions.checkNotNull(config, "Configuration"); + + // Create a leader retrieval service + LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); + + // Get the ask timeout + String askTimeoutString = config.getString(AkkaOptions.ASK_TIMEOUT); + + Duration timeout = FiniteDuration.apply(askTimeoutString); + if (!timeout.isFinite()) { + throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key() + + " is not a finite timeout ('" + askTimeoutString + "')"); + } + + FiniteDuration askTimeout = (FiniteDuration) timeout; + + int lookupRetries = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES); + int lookupRetryDelayMillis = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY); + + // Retries if no JobManager is around + AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy = + new AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory( + lookupRetries, + FiniteDuration.apply(lookupRetryDelayMillis, "ms")); + + // Create the actor system + @SuppressWarnings("unchecked") + Option<Tuple2<String, Object>> remoting = new Some(new Tuple2<>("", 0)); + this.actorSystem = AkkaUtils.createActorSystem(config, remoting); + + AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( + leaderRetrievalService, + actorSystem, + askTimeout, + retryStrategy); + + int numEventLoopThreads = config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS); + + if (numEventLoopThreads == 0) { + numEventLoopThreads = Runtime.getRuntime().availableProcessors(); + } + + // Create the network client + KvStateClient networkClient = new KvStateClient( + numEventLoopThreads, + new DisabledKvStateRequestStats()); + + this.lookupService = lookupService; + this.kvStateClient = networkClient; + this.executionContext = actorSystem.dispatcher(); + this.executionConfig = new ExecutionConfig(); + + this.lookupService.start(); + } + + /** Gets the {@link ExecutionConfig}. */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + /** Sets the {@link ExecutionConfig}. */ + public void setExecutionConfig(ExecutionConfig config) { + this.executionConfig = config; + } + + /** + * Creates a client. + * + * @param lookupService Location lookup service + * @param kvStateClient Network client for queries + * @param executionContext Execution context for futures + */ + public QueryableStateClient( + KvStateLocationLookupService lookupService, + KvStateClient kvStateClient, + ExecutionContext executionContext) { + + this.lookupService = Preconditions.checkNotNull(lookupService, "KvStateLocationLookupService"); + this.kvStateClient = Preconditions.checkNotNull(kvStateClient, "KvStateClient"); + this.executionContext = Preconditions.checkNotNull(executionContext, "ExecutionContext"); + this.actorSystem = null; + + this.lookupService.start(); + } + + /** + * Returns the execution context of this client. + * + * @return The execution context used by the client. + */ + public ExecutionContext getExecutionContext() { + return executionContext; + } + + /** + * Shuts down the client and all components. + */ + public void shutDown() { + try { + lookupService.shutDown(); + } catch (Throwable t) { + LOG.error("Failed to shut down KvStateLookupService", t); + } + + try { + kvStateClient.shutDown(); + } catch (Throwable t) { + LOG.error("Failed to shut down KvStateClient", t); + } + + if (actorSystem != null) { + try { + actorSystem.shutdown(); + } catch (Throwable t) { + LOG.error("Failed to shut down ActorSystem", t); + } + } + } + + /** + * Returns a future holding the serialized request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state + * belongs to + * @param queryableStateName Name under which the state is queryable + * @param keyHashCode Integer hash code of the key (result of + * a call to {@link Object#hashCode()} + * @param serializedKeyAndNamespace Serialized key and namespace to query + * KvState instance with + * @return Future holding the serialized result + */ + @SuppressWarnings("unchecked") + public Future<byte[]> getKvState( + final JobID jobId, + final String queryableStateName, + final int keyHashCode, + final byte[] serializedKeyAndNamespace) { + + return getKvState(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace, false) + .recoverWith(new Recover<Future<byte[]>>() { + @Override + public Future<byte[]> recover(Throwable failure) throws Throwable { + if (failure instanceof UnknownKvStateID || + failure instanceof UnknownKvStateKeyGroupLocation || + failure instanceof UnknownKvStateLocation || + failure instanceof ConnectException) { + // These failures are likely to be caused by out-of-sync + // KvStateLocation. Therefore we retry this query and + // force look up the location. + return getKvState( + jobId, + queryableStateName, + keyHashCode, + serializedKeyAndNamespace, + true); + } else { + return Futures.failed(failure); + } + } + }, executionContext); + } + + /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeHint<K> keyTypeHint, + final StateDescriptor<?, V> stateDescriptor) { + + Preconditions.checkNotNull(keyTypeHint); + + TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo(); + return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor); + } + + /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeInfo The {@link TypeInformation} of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeInformation<K> keyTypeInfo, + final StateDescriptor<?, V> stateDescriptor) { + + Preconditions.checkNotNull(keyTypeInfo); + + return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, + keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); + } + + /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key that the state we request is associated with. + * @param namespace The namespace of the state. + * @param keyTypeInfo The {@link TypeInformation} of the keys. + * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V, N> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final N namespace, + final TypeInformation<K> keyTypeInfo, + final TypeInformation<N> namespaceTypeInfo, + final StateDescriptor<?, V> stateDescriptor) { + + Preconditions.checkNotNull(stateDescriptor); + + // initialize the value serializer based on the execution config. + stateDescriptor.initializeSerializerUnlessSet(executionConfig); + TypeSerializer<V> stateSerializer = stateDescriptor.getSerializer(); + + return getKvState(jobId, queryableStateName, key, + namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer); + } + + /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key that the state we request is associated with. + * @param namespace The namespace of the state. + * @param keyTypeInfo The {@link TypeInformation} of the keys. + * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. + * @param stateSerializer The {@link TypeSerializer} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V, N> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final N namespace, + final TypeInformation<K> keyTypeInfo, + final TypeInformation<N> namespaceTypeInfo, + final TypeSerializer<V> stateSerializer) { + + Preconditions.checkNotNull(queryableStateName); + + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(namespace); + + Preconditions.checkNotNull(keyTypeInfo); + Preconditions.checkNotNull(namespaceTypeInfo); + Preconditions.checkNotNull(stateSerializer); + + if (stateSerializer instanceof ListSerializer) { + throw new IllegalArgumentException("ListState is not supported out-of-the-box yet."); + } + + TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig); + TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); + + final byte[] serializedKeyAndNamespace; + try { + serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + key, + keySerializer, + namespace, + namespaceSerializer); + } catch (IOException e) { + return Futures.failed(e); + } + + return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace) + .flatMap(new Mapper<byte[], Future<V>>() { + @Override + public Future<V> apply(byte[] parameter) { + try { + return Futures.successful( + KvStateSerializer.deserializeValue(parameter, stateSerializer)); + } catch (IOException e) { + return Futures.failed(e); + } + } + }, executionContext); + } + + /** + * Returns a future holding the serialized request result. + * + * @param jobId JobID of the job the queryable state + * belongs to + * @param queryableStateName Name under which the state is queryable + * @param keyHashCode Integer hash code of the key (result of + * a call to {@link Object#hashCode()} + * @param serializedKeyAndNamespace Serialized key and namespace to query + * KvState instance with + * @param forceLookup Flag to force lookup of the {@link KvStateLocation} + * @return Future holding the serialized result + */ + private Future<byte[]> getKvState( + final JobID jobId, + final String queryableStateName, + final int keyHashCode, + final byte[] serializedKeyAndNamespace, + boolean forceLookup) { + + return getKvStateLookupInfo(jobId, queryableStateName, forceLookup) + .flatMap(new Mapper<KvStateLocation, Future<byte[]>>() { + @Override + public Future<byte[]> apply(KvStateLocation lookup) { + int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups()); + + KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex); + if (serverAddress == null) { + return Futures.failed(new UnknownKvStateKeyGroupLocation()); + } else { + // Query server + KvStateID kvStateId = lookup.getKvStateID(keyGroupIndex); + return kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace); + } + } + }, executionContext); + } + + /** + * Lookup the {@link KvStateLocation} for the given job and queryable state + * name. + * + * <p>The job manager will be queried for the location only if forced or no + * cached location can be found. There are no guarantees about + * + * @param jobId JobID the state instance belongs to. + * @param queryableStateName Name under which the state instance has been published. + * @param forceUpdate Flag to indicate whether to force a update via the lookup service. + * @return Future holding the KvStateLocation + */ + private Future<KvStateLocation> getKvStateLookupInfo( + JobID jobId, + final String queryableStateName, + boolean forceUpdate) { + + if (forceUpdate) { + Future<KvStateLocation> lookupFuture = lookupService + .getKvStateLookupInfo(jobId, queryableStateName); + lookupCache.put(new Tuple2<>(jobId, queryableStateName), lookupFuture); + return lookupFuture; + } else { + Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName); + final Future<KvStateLocation> cachedFuture = lookupCache.get(cacheKey); + + if (cachedFuture == null) { + Future<KvStateLocation> lookupFuture = lookupService + .getKvStateLookupInfo(jobId, queryableStateName); + + Future<KvStateLocation> previous = lookupCache.putIfAbsent(cacheKey, lookupFuture); + if (previous == null) { + return lookupFuture; + } else { + return previous; + } + } else { + // do not retain futures which failed as they will remain in + // the cache even if the error cause is not present any more + // and a new lookup may succeed + if (cachedFuture.isCompleted() && + cachedFuture.value().get().isFailure()) { + // issue a new lookup + Future<KvStateLocation> lookupFuture = lookupService + .getKvStateLookupInfo(jobId, queryableStateName); + + // replace the existing one if it has not been replaced yet + // otherwise return the one in the cache + if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) { + return lookupFuture; + } else { + return lookupCache.get(cacheKey); + } + } else { + return cachedFuture; + } + } + } + } + +}
