xintongsong commented on a change in pull request #13864:
URL: https://github.com/apache/flink/pull/13864#discussion_r517060157
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStore.java
##########
@@ -63,35 +45,16 @@
* <p>The root path is watched to detect concurrent modifications in corner
situations where
* multiple instances operate concurrently. The job manager acts as a {@link
JobGraphListener}
* to react to such situations.
+ *
+ * <p>NOTICE: The only reason we still have this class is that we need to
release the lock. If we completely get
+ * rid of current lock-and-lease to avoid concurrent modification, like
Kubernetes, then this class could be
+ * directly removed.
*/
-public class ZooKeeperJobGraphStore implements JobGraphStore {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ZooKeeperJobGraphStore.class);
-
- /** Lock to synchronize with the {@link JobGraphListener}. */
- private final Object cacheLock = new Object();
-
- /** The set of IDs of all added job graphs. */
- private final Set<JobID> addedJobGraphs = new HashSet<>();
+public class ZooKeeperJobGraphStore extends DefaultJobGraphStore {
Review comment:
Not sure about using inherit here. Given that we are maintaining
internal states with a lock, overriding the base class behaviors could be hard
to maintain.
It seems to me that we need this class for calling
`ZooKeeperStateHandleStore#release/releaseAll`. Would it be possible to use
composition for that purpose? E.g., having some kind of callbacks with would be
called on `stop`, `recoverJobGraph` and `releaseJobGraph`, pass in the
`StateHandleStore`, and in the ZK implementations of the callbacks convert
`StateHandleStore` to `ZooKeeperStateHandleStore` and do the release.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
Review comment:
The name `configMapNotExistSupplier` is a bit confusing. When I first
see it, I thought this supplier supplies a config map if it does not exist.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
Review comment:
Just trying to understand, why is this filter passed in as an argument?
Could we just have a util method in this class, leveraging
`Constants#JOB_GRAPH_STORE_KEY_PREFIX`?
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(
+ configMap -> {
+ final
List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new
ArrayList<>();
+ configMap.getData().entrySet().stream()
+ .filter(entry ->
filter.test(entry.getKey()))
+
.sorted(Comparator.comparing(Map.Entry::getKey))
Review comment:
Why do we need the sort here? This is not in the contract of
`StateHandleStore#getAll`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(
+ configMap -> {
+ final
List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new
ArrayList<>();
+ configMap.getData().entrySet().stream()
+ .filter(entry ->
filter.test(entry.getKey()))
+
.sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> {
+ try {
+
stateHandles.add(new Tuple2(deserializeObject(entry.getValue()),
entry.getKey()));
Review comment:
`Tuple2` -> `Tuple2<>`
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
Review comment:
Why do we need this annotation?
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import
org.apache.flink.runtime.statehandle.TestingLongStateHandleHelper.LongRetrievableStateHandle;
+import
org.apache.flink.runtime.statehandle.TestingLongStateHandleHelper.LongStateStorage;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KubernetesStateHandleStore} operations.
+ */
+public class KubernetesStateHandleStoreTest extends
KubernetesHighAvailabilityTestBase {
+
+ private static final String PREFIX = "test-prefix-";
+ private final String key = PREFIX + JobID.generate();
+ private final Predicate<String> filter = k -> k.startsWith(PREFIX);
+ private final Long state = 12345L;
+
+ private LongStateStorage longStateStorage;
+
+ @Before
+ public void setup() {
+ super.setup();
+ longStateStorage = new LongStateStorage();
+ }
+
+ @Test
+ public void testAdd() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+ store.add(key, state);
+ assertThat(store.getAll().size(),
is(1));
+
assertThat(store.get(key).retrieveState(), is(state));
+ });
+ }};
+ }
+
+ @Test
+ public void testAddAlreadyExistingKey() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+ store.add(key, state);
+
+ try {
+ store.add(key, state);
+ fail("Exception should be
thrown.");
+ } catch
(StateHandleStore.AlreadyExistException ex) {
+ final String msg =
String.format(
+ "%s already exists in
ConfigMap %s", key, LEADER_CONFIGMAP_NAME);
+ assertThat(ex.getMessage(),
containsString(msg));
+ }
+
assertThat(longStateStorage.getStateHandles().size(), is(2));
+
assertThat(longStateStorage.getStateHandles().get(1).getNumberOfDiscardCalls(),
is(1));
+ });
+ }};
+ }
+
+ @Test
+ public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws
Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+
+ try {
+ store.add(key, state);
+ fail("Exception should be
thrown.");
+ } catch (Exception ex) {
+ final String msg =
String.format("ConfigMap %s does not exist.", LEADER_CONFIGMAP_NAME);
+ assertThat(ex.getMessage(),
containsString(msg));
+ }
+
assertThat(longStateStorage.getStateHandles().size(), is(1));
+
assertThat(longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls(),
is(1));
+ });
+ }};
+ }
+
+ @Test
+ public void testReplace() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+
+ store.add(key, state);
+
+ final Long newState = 23456L;
+ final String resourceVersion =
store.exists(key);
+ store.replace(key, resourceVersion,
newState);
+
+ assertThat(store.getAll().size(),
is(1));
+
assertThat(store.get(key).retrieveState(), is(newState));
+ });
+ }};
+ }
+
+ @Test
+ public void testReplaceWithKeyNotExist() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+ final Long newState = 23456L;
+
+ try {
+ assertThat(store.exists(key),
is(StateHandleStore.NON_EXIST_RESOURCE_VERSION));
+ store.replace(key,
StateHandleStore.NON_EXIST_RESOURCE_VERSION, newState);
+ fail("Exception should be
thrown.");
+ } catch
(StateHandleStore.NotExistException e) {
+ final String msg =
String.format(
+ "Could not find %s in
ConfigMap %s", key, LEADER_CONFIGMAP_NAME);
+ assertThat(e.getMessage(),
containsString(msg));
+ }
+ assertThat(store.getAll().size(),
is(0));
+ });
+ }};
+ }
+
+ @Test
+ public void testReplaceFailedAndDiscardState() throws Exception {
Review comment:
This test case verifies state is not replaced and discarded due to lost
of leadership.
We might need another case verifies that state is not replaced due to
failures, by having the testing kube client returns `false` for
`checkAndUpdate`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesJobGraphStoreWatcher.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link JobGraphStoreWatcher} implementation for Kubernetes. It watch the
Dispatcher leader ConfigMap and call the
+ * {@link JobGraphStore.JobGraphListener} based on the received event.
+ */
+public class KubernetesJobGraphStoreWatcher implements JobGraphStoreWatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesJobGraphStoreWatcher.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private JobGraphStore.JobGraphListener jobGraphListener;
+
+ @Nullable
+ private KubernetesWatch kubernetesWatch;
+
+ public KubernetesJobGraphStoreWatcher(FlinkKubeClient kubeClient,
String configMapName) {
+ this.kubeClient = checkNotNull(kubeClient);
+ this.configMapName = checkNotNull(configMapName);
+ }
+
+ @Override
+ public void start(JobGraphStore.JobGraphListener jobGraphListener) {
+ this.jobGraphListener = checkNotNull(jobGraphListener);
+ kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new
ConfigMapCallbackHandlerImpl());
+ }
+
+ @Override
+ public void stop() {
+ if (kubernetesWatch != null) {
+ kubernetesWatch.close();
+ }
+ }
+
+ private class ConfigMapCallbackHandlerImpl implements
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+ // This is used to get the difference between current and
previous data. And then notify the listener.
+ final Set<JobID> previousJobIDs = new HashSet<>();
+
+ @Override
+ public void onAdded(List<KubernetesConfigMap> configMaps) {
+ // The ConfigMap is created by
KubernetesLeaderElectionDriver with empty data. We do not process this
+ // useless event.
+ }
+
+ @Override
+ public void onModified(List<KubernetesConfigMap> configMaps) {
+ handleConfigMapEvent(configMaps);
+ }
+
+ @Override
+ public void onDeleted(List<KubernetesConfigMap> configMaps) {
+ // The ConfigMap will be created again in the leader
election service.
+ }
+
+ @Override
+ public void onError(List<KubernetesConfigMap> configMaps) {
+ LOG.error("Error while watching the configMap {}",
configMapName);
+ }
+
+ @Override
+ public void handleFatalError(Throwable throwable) {
+ LOG.error("Error while watching the configMap {}",
configMapName, throwable);
+ }
+
+ private void handleConfigMapEvent(List<KubernetesConfigMap>
configMaps) {
+ final KubernetesConfigMap configMap =
KubernetesUtils.checkConfigMaps(configMaps, configMapName);
+ final Set<JobID> currentJobIDs = getJobIDs(configMap);
+ notifyJobGraphListenChanges(currentJobIDs,
previousJobIDs);
+ previousJobIDs.clear();
+ previousJobIDs.addAll(currentJobIDs);
Review comment:
Is it guaranteed that events are received in the order that they
happened?
I'm asking because if there's no such guaranteed, we might have the latest
information removed when an outdated event is received.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesRunningJobsRegistry.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static
org.apache.flink.kubernetes.utils.Constants.RUNNING_JOBS_REGISTRY_KEY_PREFIX;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link RunningJobsRegistry} implementation for Kubernetes. All the running
jobs will be stored in
+ * Dispatcher-leader ConfigMap. The key is the job id with prefix
+ * {@link
org.apache.flink.kubernetes.utils.Constants#RUNNING_JOBS_REGISTRY_KEY_PREFIX},
+ * and value is job status.
+ */
+public class KubernetesRunningJobsRegistry implements RunningJobsRegistry {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesRunningJobsRegistry.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final String lockIdentity;
+
+ public KubernetesRunningJobsRegistry(FlinkKubeClient kubeClient, String
configMapName, String lockIdentity) {
+ this.kubeClient = checkNotNull(kubeClient);
+ this.configMapName = checkNotNull(configMapName);
+ this.lockIdentity = checkNotNull(lockIdentity);
+ }
+
+ @Override
+ public void setJobRunning(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ writeJobStatusToConfigMap(jobID, JobSchedulingStatus.RUNNING);
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ writeJobStatusToConfigMap(jobID, JobSchedulingStatus.DONE);
+ }
+
+ @Override
+ public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws
IOException {
+ checkNotNull(jobID);
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ final String key = getKeyForJobId(jobID);
+ final String status =
configMap.getData().get(key);
+ if
(!StringUtils.isNullOrWhitespaceOnly(status)) {
+ return
JobSchedulingStatus.valueOf(status);
+ } else {
+ LOG.warn("ConfigMap {} contained
corrupted data. Ignoring the key {}.", configMapName, key);
+ }
+ return JobSchedulingStatus.PENDING;
+ })
+ .orElseThrow(() -> new IOException(
+ new KubernetesException("ConfigMap " +
configMapName + " does not exist.")));
+ }
+
+ @Override
+ public void clearJob(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ try {
+ kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if
(KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) {
+
configMap.getData().remove(getKeyForJobId(jobID));
+ return Optional.of(configMap);
Review comment:
Might be better to check the original value and try to avoid the update
if the value is not changed.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(
+ configMap -> {
+ final
List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new
ArrayList<>();
+ configMap.getData().entrySet().stream()
+ .filter(entry ->
filter.test(entry.getKey()))
+
.sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> {
+ try {
+
stateHandles.add(new Tuple2(deserializeObject(entry.getValue()),
entry.getKey()));
+ } catch (Exception e) {
+
LOG.warn("ConfigMap {} contained corrupted data. Ignoring the key {}.",
+
configMapName, entry.getKey());
+ }
+ });
+ return stateHandles;
+ })
+ .orElse(Collections.emptyList());
+ }
+
+ /**
+ * Return a list of all valid keys for state handles.
+ *
+ * @return List of valid state handle keys in Kubernetes ConfigMap
+ *
+ * @throws Exception if get state handle names from ConfigMap failed.
+ */
+ @Override
+ public Collection<String> getAllNames() throws Exception {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> configMap.getData().keySet().stream()
+ .sorted(Comparator.comparing(e -> e))
+ .filter(filter)
+ .collect(Collectors.toList()))
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Remove the key in state config map. As well as the state on external
storage will be removed.
+ * It returns the {@link RetrievableStateHandle} stored under the given
state node if any.
+ *
+ * @param key Key to be removed from ConfigMap
+ *
+ * @return True if the state handle is removed successfully
+ *
+ * @throws Exception if removing the key or discarding the state failed
+ */
+ @Override
+ public boolean remove(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ final AtomicReference<RetrievableStateHandle<T>>
stateHandleRefer = new AtomicReference<>();
+
+ try {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if
(KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) {
+ final String content =
configMap.getData().remove(key);
+ if (content != null) {
+ try {
+
stateHandleRefer.set(deserializeObject(content));
+ } catch (Exception e) {
+ LOG.warn("Could
not retrieve the state handle of {} from ConfigMap {}.",
+ key,
configMapName, e);
+ }
+ }
+ return Optional.of(configMap);
+ }
+ return Optional.empty();
+ })
+ .whenComplete((succeed, ignore) -> {
+ if (succeed) {
+ if (stateHandleRefer.get() !=
null) {
+ try {
+
stateHandleRefer.get().discardState();
+ } catch (Exception e) {
+ throw new
CompletionException(e);
+ }
+ }
+ }
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return false;
+ }
+ }
+
+ /**
+ * Remove all the keys and state in the ConfigMap.
+ *
+ * @throws Exception when removing the keys or discarding the state
failed
+ */
+ @Override
+ public void removeAll() throws Exception {
+ final Collection<String> keys = getAllNames();
+
+ Exception exception = null;
+
+ for (String key : keys) {
+ try {
+ remove(key);
Review comment:
I'm a bit concerned on the performance here. For each `remove`, we
access the config map with a retry-based check-and-update transactional
operation. Might be better to combine the removal of all keys into one
check-and-update transaction.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(
+ configMap -> {
+ final
List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new
ArrayList<>();
+ configMap.getData().entrySet().stream()
+ .filter(entry ->
filter.test(entry.getKey()))
+
.sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> {
+ try {
+
stateHandles.add(new Tuple2(deserializeObject(entry.getValue()),
entry.getKey()));
+ } catch (Exception e) {
+
LOG.warn("ConfigMap {} contained corrupted data. Ignoring the key {}.",
+
configMapName, entry.getKey());
+ }
+ });
+ return stateHandles;
+ })
+ .orElse(Collections.emptyList());
+ }
+
+ /**
+ * Return a list of all valid keys for state handles.
+ *
+ * @return List of valid state handle keys in Kubernetes ConfigMap
+ *
+ * @throws Exception if get state handle names from ConfigMap failed.
+ */
+ @Override
+ public Collection<String> getAllNames() throws Exception {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> configMap.getData().keySet().stream()
+ .sorted(Comparator.comparing(e -> e))
Review comment:
Again, why is sorting necessary?
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(
+ configMap -> {
+ final
List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new
ArrayList<>();
+ configMap.getData().entrySet().stream()
+ .filter(entry ->
filter.test(entry.getKey()))
+
.sorted(Comparator.comparing(Map.Entry::getKey))
Review comment:
In addition, the IDE suggests: `.sorted(Map.Entry.comparingByKey())`
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesJobGraphStoreWatcher.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link JobGraphStoreWatcher} implementation for Kubernetes. It watch the
Dispatcher leader ConfigMap and call the
+ * {@link JobGraphStore.JobGraphListener} based on the received event.
+ */
+public class KubernetesJobGraphStoreWatcher implements JobGraphStoreWatcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesJobGraphStoreWatcher.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private JobGraphStore.JobGraphListener jobGraphListener;
+
+ @Nullable
+ private KubernetesWatch kubernetesWatch;
+
+ public KubernetesJobGraphStoreWatcher(FlinkKubeClient kubeClient,
String configMapName) {
+ this.kubeClient = checkNotNull(kubeClient);
+ this.configMapName = checkNotNull(configMapName);
+ }
+
+ @Override
+ public void start(JobGraphStore.JobGraphListener jobGraphListener) {
+ this.jobGraphListener = checkNotNull(jobGraphListener);
+ kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new
ConfigMapCallbackHandlerImpl());
+ }
+
+ @Override
+ public void stop() {
+ if (kubernetesWatch != null) {
+ kubernetesWatch.close();
+ }
+ }
+
+ private class ConfigMapCallbackHandlerImpl implements
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+ // This is used to get the difference between current and
previous data. And then notify the listener.
+ final Set<JobID> previousJobIDs = new HashSet<>();
+
+ @Override
+ public void onAdded(List<KubernetesConfigMap> configMaps) {
+ // The ConfigMap is created by
KubernetesLeaderElectionDriver with empty data. We do not process this
+ // useless event.
+ }
+
+ @Override
+ public void onModified(List<KubernetesConfigMap> configMaps) {
+ handleConfigMapEvent(configMaps);
+ }
+
+ @Override
+ public void onDeleted(List<KubernetesConfigMap> configMaps) {
+ // The ConfigMap will be created again in the leader
election service.
+ }
+
+ @Override
+ public void onError(List<KubernetesConfigMap> configMaps) {
+ LOG.error("Error while watching the configMap {}",
configMapName);
+ }
+
+ @Override
+ public void handleFatalError(Throwable throwable) {
+ LOG.error("Error while watching the configMap {}",
configMapName, throwable);
+ }
+
+ private void handleConfigMapEvent(List<KubernetesConfigMap>
configMaps) {
+ final KubernetesConfigMap configMap =
KubernetesUtils.checkConfigMaps(configMaps, configMapName);
+ final Set<JobID> currentJobIDs = getJobIDs(configMap);
+ notifyJobGraphListenChanges(currentJobIDs,
previousJobIDs);
+ previousJobIDs.clear();
+ previousJobIDs.addAll(currentJobIDs);
+ }
+
+ private Set<JobID> getJobIDs(KubernetesConfigMap configMap) {
+ final Set<JobID> jobIDs;
+ if (configMap.getData() == null) {
+ jobIDs = Collections.emptySet();
+ } else {
+ jobIDs = configMap.getData().keySet().stream()
+ .filter(k ->
k.startsWith(JOB_GRAPH_STORE_KEY_PREFIX))
+
.map(KubernetesUtils::jobGraphConfigMapKeyToJobID)
+ .collect(Collectors.toSet());
+ }
+ return jobIDs;
+ }
+
+ private void notifyJobGraphListenChanges(Set<JobID>
currentJobIDs, Set<JobID> previousJobIDs) {
+ final Set<JobID> jobIDsToAdd = new HashSet<>();
+ final Set<JobID> jobIDsToRemove = new
HashSet<>(previousJobIDs);
+
+ currentJobIDs.forEach(jobID -> {
Review comment:
I would suggest to replace `currentJobIDs .forEach` with a foreach loop
`for (JobID jobID : currentJobIDs)`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesRunningJobsRegistry.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static
org.apache.flink.kubernetes.utils.Constants.RUNNING_JOBS_REGISTRY_KEY_PREFIX;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link RunningJobsRegistry} implementation for Kubernetes. All the running
jobs will be stored in
+ * Dispatcher-leader ConfigMap. The key is the job id with prefix
+ * {@link
org.apache.flink.kubernetes.utils.Constants#RUNNING_JOBS_REGISTRY_KEY_PREFIX},
+ * and value is job status.
+ */
+public class KubernetesRunningJobsRegistry implements RunningJobsRegistry {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesRunningJobsRegistry.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final String lockIdentity;
+
+ public KubernetesRunningJobsRegistry(FlinkKubeClient kubeClient, String
configMapName, String lockIdentity) {
+ this.kubeClient = checkNotNull(kubeClient);
+ this.configMapName = checkNotNull(configMapName);
+ this.lockIdentity = checkNotNull(lockIdentity);
+ }
+
+ @Override
+ public void setJobRunning(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ writeJobStatusToConfigMap(jobID, JobSchedulingStatus.RUNNING);
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ writeJobStatusToConfigMap(jobID, JobSchedulingStatus.DONE);
+ }
+
+ @Override
+ public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws
IOException {
+ checkNotNull(jobID);
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ final String key = getKeyForJobId(jobID);
+ final String status =
configMap.getData().get(key);
+ if
(!StringUtils.isNullOrWhitespaceOnly(status)) {
+ return
JobSchedulingStatus.valueOf(status);
+ } else {
+ LOG.warn("ConfigMap {} contained
corrupted data. Ignoring the key {}.", configMapName, key);
+ }
+ return JobSchedulingStatus.PENDING;
+ })
+ .orElseThrow(() -> new IOException(
+ new KubernetesException("ConfigMap " +
configMapName + " does not exist.")));
+ }
+
+ @Override
+ public void clearJob(JobID jobID) throws IOException {
+ checkNotNull(jobID);
+
+ try {
+ kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if
(KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) {
+
configMap.getData().remove(getKeyForJobId(jobID));
+ return Optional.of(configMap);
+ }
+ return Optional.empty();
+ }
+ ).get();
+ } catch (Exception e) {
+ throw new IOException("Failed to clear job state in
ConfigMap " + configMapName + " for job " + jobID, e);
+ }
+ }
+
+ private void writeJobStatusToConfigMap(JobID jobID, JobSchedulingStatus
status) throws IOException {
+ LOG.debug("Setting scheduling state for job {} to {}.", jobID,
status);
+ final String key = getKeyForJobId(jobID);
+ try {
+ kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if
(KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) {
+ configMap.getData().put(key,
status.name());
+ return Optional.of(configMap);
Review comment:
Might be better to check the original value and try to avoid the update
if the value is not changed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]