This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 579c7795b03689c8c02859c37900864033df8873 Author: Yi Zhang <[email protected]> AuthorDate: Thu Mar 5 18:50:13 2026 +0800 [FLINK-38975][runtime] Add ApplicationStore --- .../expert_high_availability_zk_section.html | 6 + .../generated/high_availability_configuration.html | 6 + .../configuration/HighAvailabilityOptions.java | 8 + .../KubernetesApplicationStoreUtil.java | 54 ++++ .../apache/flink/kubernetes/utils/Constants.java | 2 + .../flink/kubernetes/utils/KubernetesUtils.java | 62 ++++ .../flink/runtime/jobmanager/ApplicationStore.java | 49 +++ .../runtime/jobmanager/ApplicationStoreEntry.java | 58 ++++ .../runtime/jobmanager/ApplicationStoreUtil.java | 44 +++ .../runtime/jobmanager/ApplicationWriter.java | 44 +++ .../jobmanager/DefaultApplicationStore.java | 328 +++++++++++++++++++++ .../jobmanager/StandaloneApplicationStore.java | 59 ++++ .../jobmanager/ZooKeeperApplicationStoreUtil.java | 37 +++ .../apache/flink/runtime/util/ZooKeeperUtils.java | 52 ++++ .../jobmanager/DefaultApplicationStoreTest.java | 279 ++++++++++++++++++ .../jobmanager/StandaloneApplicationStoreTest.java | 54 ++++ .../jobmanager/TestingApplicationStoreEntry.java | 80 +++++ .../ZooKeeperApplicationStoreITCase.java | 297 +++++++++++++++++++ 18 files changed, 1519 insertions(+) diff --git a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html index c7ce745452d..4261b544870 100644 --- a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html +++ b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html @@ -56,6 +56,12 @@ <td>Boolean</td> <td>Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader information to be invalidated or not. In case you set this option to <code class="highlighter-rouge">true</code>, Flink will wait until a ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the effect that Flink is more resilient against temporary connection instabilities at the cost of running more likely into timing issues with [...] </tr> + <tr> + <td><h5>high-availability.zookeeper.path.applications</h5></td> + <td style="word-wrap: break-word;">"/applications"</td> + <td>String</td> + <td>ZooKeeper root path (ZNode) for applications</td> + </tr> <tr> <td><h5>high-availability.zookeeper.path.execution-plans</h5></td> <td style="word-wrap: break-word;">"/execution-plans"</td> diff --git a/docs/layouts/shortcodes/generated/high_availability_configuration.html b/docs/layouts/shortcodes/generated/high_availability_configuration.html index 26a6b300907..116086bd88d 100644 --- a/docs/layouts/shortcodes/generated/high_availability_configuration.html +++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html @@ -98,6 +98,12 @@ <td>Boolean</td> <td>Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader information to be invalidated or not. In case you set this option to <code class="highlighter-rouge">true</code>, Flink will wait until a ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the effect that Flink is more resilient against temporary connection instabilities at the cost of running more likely into timing issues with [...] </tr> + <tr> + <td><h5>high-availability.zookeeper.path.applications</h5></td> + <td style="word-wrap: break-word;">"/applications"</td> + <td>String</td> + <td>ZooKeeper root path (ZNode) for applications</td> + </tr> <tr> <td><h5>high-availability.zookeeper.path.execution-plans</h5></td> <td style="word-wrap: break-word;">"/execution-plans"</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index 30480de619e..9e54491902e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -138,6 +138,14 @@ public class HighAvailabilityOptions { "high-availability.zookeeper.path.jobgraphs") .withDescription("ZooKeeper root path (ZNode) for execution plans"); + /** ZooKeeper root path (ZNode) for applications. */ + @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) + public static final ConfigOption<String> HA_ZOOKEEPER_APPLICATIONS_PATH = + key("high-availability.zookeeper.path.applications") + .stringType() + .defaultValue("/applications") + .withDescription("ZooKeeper root path (ZNode) for applications"); + // ------------------------------------------------------------------------ // ZooKeeper Client Settings // ------------------------------------------------------------------------ diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesApplicationStoreUtil.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesApplicationStoreUtil.java new file mode 100644 index 00000000000..dae6ff5a8c8 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesApplicationStoreUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.jobmanager.ApplicationStoreUtil; + +import static org.apache.flink.kubernetes.utils.Constants.APPLICATION_STORE_KEY_PREFIX; + +/** Singleton {@link ApplicationStoreUtil} implementation for Kubernetes. */ +public enum KubernetesApplicationStoreUtil implements ApplicationStoreUtil { + INSTANCE; + + /** + * Convert a key in ConfigMap to {@link ApplicationID}. The key is stored with prefix {@link + * Constants#APPLICATION_STORE_KEY_PREFIX}. + * + * @param key application key in ConfigMap. + * @return the parsed {@link ApplicationID}. + */ + @Override + public ApplicationID nameToApplicationId(String key) { + return ApplicationID.fromHexString(key.substring(APPLICATION_STORE_KEY_PREFIX.length())); + } + + /** + * Convert a {@link ApplicationID} to config map key. We will add prefix {@link + * Constants#APPLICATION_STORE_KEY_PREFIX}. + * + * @param applicationId application id + * @return a key to store application in the ConfigMap + */ + @Override + public String applicationIdToName(ApplicationID applicationId) { + return APPLICATION_STORE_KEY_PREFIX + applicationId; + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java index 21d36b66d11..6359364248c 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java @@ -110,6 +110,8 @@ public class Constants { public static final String LEADER_SESSION_ID_KEY = "sessionId"; public static final String EXECUTION_PLAN_STORE_KEY_PREFIX = "executionPlan-"; public static final String SUBMITTED_EXECUTION_PLAN_FILE_PREFIX = "submittedExecutionPlan"; + public static final String APPLICATION_STORE_KEY_PREFIX = "application-"; + public static final String SUBMITTED_APPLICATION_FILE_PREFIX = "submittedApplication"; public static final String CHECKPOINT_COUNTER_KEY = "counter"; public static final String CHECKPOINT_ID_KEY_PREFIX = "checkpointID-"; public static final String COMPLETED_CHECKPOINT_FILE_SUFFIX = "completedCheckpoint"; diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java index f097354fdc3..7786c0add56 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.RecoveryClaimMode; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.highavailability.KubernetesApplicationStoreUtil; import org.apache.flink.kubernetes.highavailability.KubernetesCheckpointStoreUtil; import org.apache.flink.kubernetes.highavailability.KubernetesExecutionPlanStoreUtil; import org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore; @@ -37,6 +38,9 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobmanager.ApplicationStore; +import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry; +import org.apache.flink.runtime.jobmanager.DefaultApplicationStore; import org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStore; import org.apache.flink.runtime.jobmanager.ExecutionPlanStore; import org.apache.flink.runtime.jobmanager.NoOpExecutionPlanStoreWatcher; @@ -83,6 +87,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.utils.Constants.APPLICATION_STORE_KEY_PREFIX; import static org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_ID_KEY_PREFIX; import static org.apache.flink.kubernetes.utils.Constants.COMPLETED_CHECKPOINT_FILE_SUFFIX; import static org.apache.flink.kubernetes.utils.Constants.DNS_POLICY_DEFAULT; @@ -90,6 +95,7 @@ import static org.apache.flink.kubernetes.utils.Constants.DNS_POLICY_HOSTNETWORK import static org.apache.flink.kubernetes.utils.Constants.EXECUTION_PLAN_STORE_KEY_PREFIX; import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY; import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY; +import static org.apache.flink.kubernetes.utils.Constants.SUBMITTED_APPLICATION_FILE_PREFIX; import static org.apache.flink.kubernetes.utils.Constants.SUBMITTED_EXECUTION_PLAN_FILE_PREFIX; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -234,6 +240,62 @@ public class KubernetesUtils { return LeaderInformation.known(sessionID, leaderAddress); } + /** + * Create a {@link DefaultApplicationStore}. + * + * @param configuration configuration to build a RetrievableStateStorageHelper + * @param flinkKubeClient flink kubernetes client + * @param configMapName ConfigMap name + * @param lockIdentity lock identity to check the leadership + * @return a {@link DefaultExecutionPlanStore} + * @throws Exception when create the storage helper + */ + public static ApplicationStore createApplicationStore( + Configuration configuration, + FlinkKubeClient flinkKubeClient, + String configMapName, + String lockIdentity) + throws Exception { + + final KubernetesStateHandleStore<ApplicationStoreEntry> stateHandleStore = + createApplicationStateHandleStore( + configuration, flinkKubeClient, configMapName, lockIdentity); + return new DefaultApplicationStore<>( + stateHandleStore, KubernetesApplicationStoreUtil.INSTANCE); + } + + /** + * Create a {@link KubernetesStateHandleStore} which storing {@link ApplicationStoreEntry}. + * + * @param configuration configuration to build a RetrievableStateStorageHelper + * @param flinkKubeClient flink kubernetes client + * @param configMapName ConfigMap name + * @param lockIdentity lock identity to check the leadership + * @return a {@link KubernetesStateHandleStore} which storing {@link ApplicationStoreEntry}. + * @throws Exception when create the storage helper + */ + public static KubernetesStateHandleStore<ApplicationStoreEntry> + createApplicationStateHandleStore( + Configuration configuration, + FlinkKubeClient flinkKubeClient, + String configMapName, + String lockIdentity) + throws Exception { + + final RetrievableStateStorageHelper<ApplicationStoreEntry> stateStorage = + new FileSystemStateStorageHelper<>( + HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath( + configuration), + SUBMITTED_APPLICATION_FILE_PREFIX); + + return new KubernetesStateHandleStore<>( + flinkKubeClient, + configMapName, + stateStorage, + k -> k.startsWith(APPLICATION_STORE_KEY_PREFIX), + lockIdentity); + } + /** * Create a {@link DefaultExecutionPlanStore} with {@link NoOpExecutionPlanStoreWatcher}. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStore.java new file mode 100644 index 00000000000..88a6cd8e326 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStore.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; + +import java.util.Collection; +import java.util.Optional; + +/** {@link ApplicationStoreEntry} instances for recovery. */ +public interface ApplicationStore extends ApplicationWriter { + + /** Starts the {@link ApplicationStore} service. */ + void start() throws Exception; + + /** Stops the {@link ApplicationStore} service. */ + void stop() throws Exception; + + /** + * Returns the {@link ApplicationStoreEntry} with the given {@link ApplicationID} or {@link + * Optional#empty()} if no application was registered. + */ + Optional<ApplicationStoreEntry> recoverApplication(ApplicationID applicationId) + throws Exception; + + /** + * Get all application ids of submitted applications to the submitted application store. + * + * @return Collection of submitted application ids + * @throws Exception if the operation fails + */ + Collection<ApplicationID> getApplicationIds() throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreEntry.java new file mode 100644 index 00000000000..10a55473122 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreEntry.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.runtime.application.AbstractApplication; +import org.apache.flink.runtime.blob.PermanentBlobService; + +import java.io.Serializable; +import java.util.Collection; + +/** Entry of {@code ApplicationStore}. */ +public interface ApplicationStoreEntry extends Serializable { + + /** + * Get the application from this entry. + * + * @param blobService The blob service to retrieve user artifacts + * @param recoveredJobInfos The info of jobs recovered from a previous run + * @param recoveredTerminalJobInfos The info of terminal jobs from a previous run + * @return The application + */ + AbstractApplication getApplication( + PermanentBlobService blobService, + Collection<JobInfo> recoveredJobInfos, + Collection<JobInfo> recoveredTerminalJobInfos); + + /** + * Gets the unique identifier of the application. + * + * @return the application id + */ + ApplicationID getApplicationId(); + + /** + * Gets the name of the application. + * + * @return the application name + */ + String getName(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java new file mode 100644 index 00000000000..0ec9f9d47c5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; + +/** + * ApplicationStore utility interfaces. For example, convert a name(e.g. ZooKeeper path, key name in + * Kubernetes ConfigMap) to {@link ApplicationID}, or vice versa. + */ +public interface ApplicationStoreUtil { + + /** + * Get the name in external storage from application id. + * + * @param applicationId application id + * @return Key name in ConfigMap or child path name in ZooKeeper + */ + String applicationIdToName(ApplicationID applicationId); + + /** + * Get the application id from name. + * + * @param name Key name in ConfigMap or child path name in ZooKeeper + * @return parsed application id. + */ + ApplicationID nameToApplicationId(String name); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java new file mode 100644 index 00000000000..f49956b412e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableApplicationResource; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** Allows to store and remove applications. */ +@Internal +public interface ApplicationWriter extends GloballyCleanableApplicationResource { + + /** + * Adds the {@link ApplicationStoreEntry} instance. + * + * <p>If an application with the same {@link ApplicationID} exists, it is replaced. + */ + void putApplication(ApplicationStoreEntry application) throws Exception; + + @Override + default CompletableFuture<Void> globalCleanupAsync( + ApplicationID applicationId, Executor executor) { + return CompletableFuture.completedFuture(null); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java new file mode 100644 index 00000000000..d332a246ad1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.persistence.ResourceVersion; +import org.apache.flink.runtime.persistence.StateHandleStore; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Default implementation for {@link ApplicationStore}. Combined with different {@link + * StateHandleStore}, we could persist the applications to various distributed storage. + */ +public class DefaultApplicationStore<R extends ResourceVersion<R>> implements ApplicationStore { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultApplicationStore.class); + + private final Object lock = new Object(); + + /** The set of IDs of all added applications. */ + @GuardedBy("lock") + private final Set<ApplicationID> addedApplications = new HashSet<>(); + + /** Submitted applications handle store. */ + private final StateHandleStore<ApplicationStoreEntry, R> applicationStateHandleStore; + + private final ApplicationStoreUtil applicationStoreUtil; + + /** Flag indicating whether this instance is running. */ + @GuardedBy("lock") + private volatile boolean running; + + public DefaultApplicationStore( + StateHandleStore<ApplicationStoreEntry, R> stateHandleStore, + ApplicationStoreUtil applicationStoreUtil) { + this.applicationStateHandleStore = checkNotNull(stateHandleStore); + this.applicationStoreUtil = checkNotNull(applicationStoreUtil); + + this.running = false; + } + + @Override + public void start() throws Exception { + synchronized (lock) { + if (!running) { + running = true; + } + } + } + + @Override + public void stop() throws Exception { + synchronized (lock) { + if (running) { + running = false; + LOG.info("Stopping DefaultApplicationStore."); + Exception exception = null; + + try { + applicationStateHandleStore.releaseAll(); + } catch (Exception e) { + exception = e; + } + + if (exception != null) { + throw new FlinkException( + "Could not properly stop the DefaultApplicationStore.", exception); + } + } + } + } + + @Override + public Optional<ApplicationStoreEntry> recoverApplication(ApplicationID applicationId) + throws Exception { + checkNotNull(applicationId, "Application ID"); + + LOG.debug("Recovering application {} from {}.", applicationId, applicationStateHandleStore); + + final String name = applicationStoreUtil.applicationIdToName(applicationId); + + synchronized (lock) { + verifyIsRunning(); + + boolean success = false; + + RetrievableStateHandle<ApplicationStoreEntry> applicationRetrievableStateHandle; + + try { + try { + applicationRetrievableStateHandle = + applicationStateHandleStore.getAndLock(name); + } catch (StateHandleStore.NotExistException ignored) { + success = true; + return Optional.empty(); + } catch (Exception e) { + throw new FlinkException( + "Could not retrieve the submitted application state handle " + + "for " + + name + + " from the submitted application store.", + e); + } + + ApplicationStoreEntry application; + try { + application = applicationRetrievableStateHandle.retrieveState(); + } catch (ClassNotFoundException cnfe) { + throw new FlinkException( + "Could not retrieve submitted application from state handle under " + + name + + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", + cnfe); + } catch (IOException ioe) { + throw new FlinkException( + "Could not retrieve submitted application from state handle under " + + name + + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle " + + "store.", + ioe); + } + + addedApplications.add(applicationId); + + LOG.info("Recovered {} ({}).", application.getName(), applicationId); + + success = true; + return Optional.of(application); + } finally { + if (!success) { + applicationStateHandleStore.release(name); + } + } + } + } + + @Override + public void putApplication(ApplicationStoreEntry application) throws Exception { + checkNotNull(application, "Application"); + + final ApplicationID applicationId = application.getApplicationId(); + final String name = applicationStoreUtil.applicationIdToName(applicationId); + + LOG.debug("Adding application {} to {}.", applicationId, applicationStateHandleStore); + + boolean success = false; + + while (!success) { + synchronized (lock) { + verifyIsRunning(); + + final R currentVersion = applicationStateHandleStore.exists(name); + + if (!currentVersion.isExisting()) { + try { + applicationStateHandleStore.addAndLock(name, application); + + addedApplications.add(applicationId); + + success = true; + } catch (StateHandleStore.AlreadyExistException ignored) { + LOG.warn( + "{} already exists in {}.", + application, + applicationStateHandleStore); + } + } else if (addedApplications.contains(applicationId)) { + try { + applicationStateHandleStore.replace(name, currentVersion, application); + LOG.info("Updated {} in {}.", application, getClass().getSimpleName()); + + success = true; + } catch (StateHandleStore.NotExistException ignored) { + LOG.warn( + "{} does not exists in {}.", + application, + applicationStateHandleStore); + } + } else { + throw new IllegalStateException( + "Trying to update an application you didn't " + + "#getAllSubmittedApplications() or #putApplication() yourself before."); + } + } + } + + LOG.info("Added {} to {}.", application, applicationStateHandleStore); + } + + @Override + public CompletableFuture<Void> globalCleanupAsync( + ApplicationID applicationId, Executor executor) { + checkNotNull(applicationId, "Application ID"); + + return runAsyncWithLockAssertRunning( + () -> { + LOG.debug( + "Removing application {} from {}.", + applicationId, + applicationStateHandleStore); + + final String name = applicationStoreUtil.applicationIdToName(applicationId); + releaseAndRemoveOrThrowCompletionException(applicationId, name); + + addedApplications.remove(applicationId); + + LOG.info( + "Removed application {} from {}.", + applicationId, + applicationStateHandleStore); + }, + executor); + } + + @GuardedBy("lock") + private void releaseAndRemoveOrThrowCompletionException( + ApplicationID applicationId, String applicationName) { + boolean success; + try { + success = applicationStateHandleStore.releaseAndTryRemove(applicationName); + } catch (Exception e) { + throw new CompletionException(e); + } + + if (!success) { + throw new CompletionException( + new FlinkException( + String.format( + "Could not remove application with application id %s from %s.", + applicationId, applicationStateHandleStore))); + } + } + + private CompletableFuture<Void> runAsyncWithLockAssertRunning( + ThrowingRunnable<Exception> runnable, Executor executor) { + return CompletableFuture.runAsync( + () -> { + synchronized (lock) { + verifyIsRunning(); + try { + runnable.run(); + } catch (Exception e) { + throw new CompletionException(e); + } + } + }, + executor); + } + + @Override + public Collection<ApplicationID> getApplicationIds() throws Exception { + LOG.debug("Retrieving all stored application ids from {}.", applicationStateHandleStore); + + final Collection<String> names; + try { + names = applicationStateHandleStore.getAllHandles(); + } catch (Exception e) { + throw new Exception( + "Failed to retrieve all application ids from " + + applicationStateHandleStore + + ".", + e); + } + + final List<ApplicationID> applicationIds = new ArrayList<>(names.size()); + + for (String name : names) { + try { + applicationIds.add(applicationStoreUtil.nameToApplicationId(name)); + } catch (Exception exception) { + LOG.warn( + "Could not parse application id from {}. This indicates a malformed name.", + name, + exception); + } + } + + LOG.info( + "Retrieved application ids {} from {}", + applicationIds, + applicationStateHandleStore); + + return applicationIds; + } + + /** Verifies that the state is running. */ + private void verifyIsRunning() { + checkState(running, "Not running. Forgot to call start()?"); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java new file mode 100644 index 00000000000..d35f3ed7e7e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; + +/** + * {@link ApplicationStore} instances for JobManagers running in {@link HighAvailabilityMode#NONE}. + * + * <p>All operations are NoOps, because {@link ApplicationStoreEntry} instances cannot be recovered + * in this recovery mode. + */ +public class StandaloneApplicationStore implements ApplicationStore { + + @Override + public void start() throws Exception { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public void putApplication(ApplicationStoreEntry application) { + // Nothing to do + } + + @Override + public Collection<ApplicationID> getApplicationIds() { + return Collections.emptyList(); + } + + @Override + public Optional<ApplicationStoreEntry> recoverApplication(ApplicationID applicationID) { + return Optional.empty(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java new file mode 100644 index 00000000000..3682ead2e93 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +/** Singleton {@link ApplicationStoreUtil} implementation for ZooKeeper. */ +public enum ZooKeeperApplicationStoreUtil implements ApplicationStoreUtil { + INSTANCE; + + @Override + public String applicationIdToName(ApplicationID applicationId) { + return ZooKeeperUtils.getPathForApplication(applicationId); + } + + @Override + public ApplicationID nameToApplicationId(String name) { + return ApplicationID.fromHexString(name); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index a1ed8f78f0e..e814c35987e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.util; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -35,9 +36,13 @@ import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; +import org.apache.flink.runtime.jobmanager.ApplicationStore; +import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry; +import org.apache.flink.runtime.jobmanager.DefaultApplicationStore; import org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStore; import org.apache.flink.runtime.jobmanager.ExecutionPlanStore; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmanager.ZooKeeperApplicationStoreUtil; import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreUtil; import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreWatcher; import org.apache.flink.runtime.leaderelection.LeaderInformation; @@ -102,6 +107,9 @@ public class ZooKeeperUtils { public static final String HA_STORAGE_SUBMITTED_EXECUTION_PLAN_PREFIX = "submittedExecutionPlan"; + /** The prefix of the submitted application file. */ + public static final String HA_STORAGE_SUBMITTED_APPLICATION_PREFIX = "submittedApplication"; + /** The prefix of the completed checkpoint file. */ public static final String HA_STORAGE_COMPLETED_CHECKPOINT = "completedCheckpoint"; @@ -561,6 +569,44 @@ public class ZooKeeperUtils { ZooKeeperExecutionPlanStoreUtil.INSTANCE); } + /** + * Creates a {@link DefaultApplicationStore} instance with {@link ZooKeeperStateHandleStore}, + * and {@link ZooKeeperApplicationStoreUtil}. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param configuration {@link Configuration} object + * @return {@link DefaultApplicationStore} instance + * @throws Exception if the submitted application store cannot be created + */ + public static ApplicationStore createApplicationStore( + CuratorFramework client, Configuration configuration) throws Exception { + + checkNotNull(configuration, "Configuration"); + + RetrievableStateStorageHelper<ApplicationStoreEntry> stateStorage = + createFileSystemStateStorage( + configuration, HA_STORAGE_SUBMITTED_APPLICATION_PREFIX); + + // ZooKeeper submitted applications root dir + String zooKeeperApplicationsPath = + configuration.get(HighAvailabilityOptions.HA_ZOOKEEPER_APPLICATIONS_PATH); + + // Ensure that the applications path exists + client.newNamespaceAwareEnsurePath(zooKeeperApplicationsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + CuratorFramework facade = + client.usingNamespace(client.getNamespace() + zooKeeperApplicationsPath); + + final ZooKeeperStateHandleStore<ApplicationStoreEntry> + zooKeeperApplicationStateHandleStore = + new ZooKeeperStateHandleStore<>(facade, stateStorage); + + return new DefaultApplicationStore<>( + zooKeeperApplicationStateHandleStore, ZooKeeperApplicationStoreUtil.INSTANCE); + } + /** * Creates a {@link DefaultCompletedCheckpointStore} instance with {@link * ZooKeeperStateHandleStore}. @@ -616,6 +662,12 @@ public class ZooKeeperUtils { return String.format("/%s", jobId); } + /** Returns the ApplicationID as a String (with leading slash). */ + public static String getPathForApplication(ApplicationID applicationId) { + checkNotNull(applicationId, "Application ID"); + return String.format("/%s", applicationId); + } + /** * Creates an instance of {@link ZooKeeperStateHandleStore}. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStoreTest.java new file mode 100644 index 00000000000..e37b4ca91ab --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStoreTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper; +import org.apache.flink.runtime.persistence.IntegerResourceVersion; +import org.apache.flink.runtime.persistence.StateHandleStore; +import org.apache.flink.runtime.persistence.TestingStateHandleStore; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link DefaultApplicationStore} with {@link TestingStateHandleStore} and {@link + * TestingApplicationStoreEntry}. + */ +public class DefaultApplicationStoreTest extends TestLogger { + + private final ApplicationStoreEntry testingApplication = + TestingApplicationStoreEntry.newBuilder().build(); + private final long timeout = 100L; + + private TestingStateHandleStore.Builder<ApplicationStoreEntry> builder; + private TestingRetrievableStateStorageHelper<ApplicationStoreEntry> storageHelper; + + @Before + public void setup() { + builder = TestingStateHandleStore.newBuilder(); + storageHelper = new TestingRetrievableStateStorageHelper<>(); + } + + @Test + public void testRecoverApplication() throws Exception { + final RetrievableStateHandle<ApplicationStoreEntry> stateHandle = + storageHelper.store(testingApplication); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setGetFunction(ignore -> stateHandle).build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + + final Optional<ApplicationStoreEntry> recoveredApplication = + applicationStore.recoverApplication(testingApplication.getApplicationId()); + assertThat(recoveredApplication).isPresent(); + assertThat(recoveredApplication.get().getApplicationId()) + .isEqualTo(testingApplication.getApplicationId()); + } + + @Test + public void testRecoverApplicationWhenNotExist() throws Exception { + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setGetFunction( + ignore -> { + throw new StateHandleStore.NotExistException("Not exist."); + }) + .build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + + final Optional<ApplicationStoreEntry> recoveredApplication = + applicationStore.recoverApplication(testingApplication.getApplicationId()); + assertThat(recoveredApplication).isEmpty(); + } + + @Test + public void testRecoverApplicationFailedShouldReleaseHandle() throws Exception { + final CompletableFuture<String> releaseFuture = new CompletableFuture<>(); + final FlinkException testException = new FlinkException("Test exception."); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setGetFunction( + ignore -> { + throw testException; + }) + .setReleaseConsumer(releaseFuture::complete) + .build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + + assertThatThrownBy( + () -> + applicationStore.recoverApplication( + testingApplication.getApplicationId())) + .hasCause(testException); + String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(testingApplication.getApplicationId()).hasToString(actual); + } + + @Test + public void testPutApplicationWhenNotExist() throws Exception { + final CompletableFuture<ApplicationStoreEntry> addFuture = new CompletableFuture<>(); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setExistsFunction(ignore -> IntegerResourceVersion.notExisting()) + .setAddFunction( + (ignore, state) -> { + addFuture.complete(state); + return storageHelper.store(state); + }) + .build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + applicationStore.putApplication(testingApplication); + + final ApplicationStoreEntry actual = addFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(actual.getApplicationId()).isEqualTo(testingApplication.getApplicationId()); + } + + @Test + public void testPutApplicationWhenAlreadyExist() throws Exception { + final CompletableFuture<Tuple3<String, IntegerResourceVersion, ApplicationStoreEntry>> + replaceFuture = new CompletableFuture<>(); + final int resourceVersion = 100; + final AtomicBoolean alreadyExist = new AtomicBoolean(false); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setExistsFunction( + ignore -> { + if (alreadyExist.get()) { + return IntegerResourceVersion.valueOf(resourceVersion); + } else { + alreadyExist.set(true); + return IntegerResourceVersion.notExisting(); + } + }) + .setAddFunction((ignore, state) -> storageHelper.store(state)) + .setReplaceConsumer(replaceFuture::complete) + .build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + applicationStore.putApplication(testingApplication); + // Replace + applicationStore.putApplication(testingApplication); + + final Tuple3<String, IntegerResourceVersion, ApplicationStoreEntry> actual = + replaceFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(actual.f0).isEqualTo(testingApplication.getApplicationId().toString()); + assertThat(actual.f1).isEqualTo(IntegerResourceVersion.valueOf(resourceVersion)); + assertThat(actual.f2.getApplicationId()).isEqualTo(testingApplication.getApplicationId()); + } + + @Test + public void testGlobalCleanup() throws Exception { + final CompletableFuture<ApplicationID> removeFuture = new CompletableFuture<>(); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setAddFunction((ignore, state) -> storageHelper.store(state)) + .setRemoveFunction( + name -> { + removeFuture.complete(ApplicationID.fromHexString(name)); + return true; + }) + .build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + + applicationStore.putApplication(testingApplication); + applicationStore + .globalCleanupAsync( + testingApplication.getApplicationId(), Executors.directExecutor()) + .join(); + final ApplicationID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(actual).isEqualTo(testingApplication.getApplicationId()); + } + + @Test + public void testGlobalCleanupWithNonExistName() throws Exception { + final CompletableFuture<ApplicationID> removeFuture = new CompletableFuture<>(); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setRemoveFunction( + name -> { + removeFuture.complete(ApplicationID.fromHexString(name)); + return true; + }) + .build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + applicationStore + .globalCleanupAsync( + testingApplication.getApplicationId(), Executors.directExecutor()) + .join(); + + assertThat(removeFuture).isDone(); + } + + @Test + public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception { + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setRemoveFunction(name -> false).build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + assertThatThrownBy( + () -> + applicationStore + .globalCleanupAsync( + testingApplication.getApplicationId(), + Executors.directExecutor()) + .get()) + .isInstanceOf(ExecutionException.class); + } + + @Test + public void testGetApplicationIds() throws Exception { + final Collection<ApplicationID> existingApplicationIds = + Arrays.asList(new ApplicationID(), new ApplicationID()); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setGetAllHandlesSupplier( + () -> + existingApplicationIds.stream() + .map(ApplicationID::toString) + .collect(Collectors.toList())) + .build(); + + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + final Collection<ApplicationID> applicationIds = applicationStore.getApplicationIds(); + assertThat(applicationIds).containsExactlyInAnyOrderElementsOf(existingApplicationIds); + } + + @Test + public void testStoppingApplicationStoreShouldReleaseAllHandles() throws Exception { + final CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore = + builder.setReleaseAllHandlesRunnable(() -> completableFuture.complete(null)) + .build(); + final ApplicationStore applicationStore = createAndStartApplicationStore(stateHandleStore); + applicationStore.stop(); + + assertThat(completableFuture).isDone(); + } + + private ApplicationStore createAndStartApplicationStore( + TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore) throws Exception { + final ApplicationStore applicationStore = + new DefaultApplicationStore<>( + stateHandleStore, + new ApplicationStoreUtil() { + @Override + public String applicationIdToName(ApplicationID applicationId) { + return applicationId.toString(); + } + + @Override + public ApplicationID nameToApplicationId(String name) { + return ApplicationID.fromHexString(name); + } + }); + applicationStore.start(); + return applicationStore; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStoreTest.java new file mode 100644 index 00000000000..984554db8e9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStoreTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** Tests for the {@link StandaloneApplicationStore}. */ +public class StandaloneApplicationStoreTest { + + /** Tests that all operations work and don't change the state. */ + @Test + public void testNoOps() throws Exception { + StandaloneApplicationStore applicationStore = new StandaloneApplicationStore(); + applicationStore.start(); + + ApplicationID applicationId = new ApplicationID(); + ApplicationStoreEntry applicationStoreEntry = + TestingApplicationStoreEntry.newBuilder().setApplicationId(applicationId).build(); + + assertEquals(0, applicationStore.getApplicationIds().size()); + + applicationStore.putApplication(applicationStoreEntry); + assertEquals(0, applicationStore.getApplicationIds().size()); + + applicationStore.globalCleanupAsync(applicationId, Executors.directExecutor()).join(); + assertEquals(0, applicationStore.getApplicationIds().size()); + + assertFalse(applicationStore.recoverApplication(applicationId).isPresent()); + + applicationStore.stop(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TestingApplicationStoreEntry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TestingApplicationStoreEntry.java new file mode 100644 index 00000000000..7148c9e8e47 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TestingApplicationStoreEntry.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.runtime.application.AbstractApplication; +import org.apache.flink.runtime.blob.PermanentBlobService; + +import java.util.Collection; + +/** {@link ApplicationStoreEntry} implementation for testing purposes. */ +public class TestingApplicationStoreEntry implements ApplicationStoreEntry { + + private final ApplicationID applicationId; + private final String name; + + public TestingApplicationStoreEntry(ApplicationID applicationId, String name) { + this.applicationId = applicationId; + this.name = name; + } + + @Override + public AbstractApplication getApplication( + PermanentBlobService blobService, + Collection<JobInfo> recoveredJobInfos, + Collection<JobInfo> recoveredTerminalJobInfos) { + throw new UnsupportedOperationException(); + } + + @Override + public ApplicationID getApplicationId() { + return applicationId; + } + + @Override + public String getName() { + return name; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** Builder for creating {@link TestingApplicationStoreEntry} instances. */ + public static class Builder { + private ApplicationID applicationId = new ApplicationID(); + private String name = "TestingApplication"; + + public Builder setApplicationId(ApplicationID applicationId) { + this.applicationId = applicationId; + return this; + } + + public Builder setName(String name) { + this.name = name; + return this; + } + + public TestingApplicationStoreEntry build() { + return new TestingApplicationStoreEntry(applicationId, name); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreITCase.java new file mode 100644 index 00000000000..2cd4b89bb3b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreITCase.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; +import org.apache.flink.runtime.state.RetrievableStreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; + +import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** + * IT tests for {@link DefaultApplicationStore} with all ZooKeeper components(e.g. {@link + * ZooKeeperStateHandleStore}, {@link ZooKeeperApplicationStoreUtil}). + */ +public class ZooKeeperApplicationStoreITCase extends TestLogger { + + private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension(); + + @RegisterExtension + final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = + new EachCallbackWrapper<>(zooKeeperExtension); + + @RegisterExtension + final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = + new TestingFatalErrorHandlerExtension(); + + private static final RetrievableStateStorageHelper<ApplicationStoreEntry> localStateStorage = + applicationStoreEntry -> { + ByteStreamStateHandle byteStreamStateHandle = + new ByteStreamStateHandle( + String.valueOf(java.util.UUID.randomUUID()), + InstantiationUtil.serializeObject(applicationStoreEntry)); + return new RetrievableStreamStateHandle<>(byteStreamStateHandle); + }; + + @Test + public void testPutAndRemoveApplication() throws Exception { + ApplicationStore applicationStore = + createZooKeeperApplicationStore("/testPutAndRemoveApplication"); + + try { + applicationStore.start(); + + ApplicationStoreEntry applicationEntry = + createApplicationStoreEntry(new ApplicationID(), "AppName"); + + // Empty state + assertThat(applicationStore.getApplicationIds()).isEmpty(); + + // Add initial + applicationStore.putApplication(applicationEntry); + + // Verify initial application + Collection<ApplicationID> applicationIds = applicationStore.getApplicationIds(); + assertThat(applicationIds).hasSize(1); + + ApplicationID applicationId = applicationIds.iterator().next(); + + Optional<ApplicationStoreEntry> recoveredEntry = + applicationStore.recoverApplication(applicationId); + assertThat(recoveredEntry).isPresent(); + verifyApplicationStoreEntries(applicationEntry, recoveredEntry.get()); + + // Update (same ID) + applicationEntry = + createApplicationStoreEntry( + applicationEntry.getApplicationId(), "Updated AppName"); + applicationStore.putApplication(applicationEntry); + + // Verify updated + applicationIds = applicationStore.getApplicationIds(); + assertThat(applicationIds).hasSize(1); + + applicationId = applicationIds.iterator().next(); + + recoveredEntry = applicationStore.recoverApplication(applicationId); + assertThat(recoveredEntry).isPresent(); + verifyApplicationStoreEntries(applicationEntry, recoveredEntry.get()); + + // Remove + applicationStore + .globalCleanupAsync( + applicationEntry.getApplicationId(), Executors.directExecutor()) + .join(); + + // Empty state + assertThat(applicationStore.getApplicationIds()).isEmpty(); + + // Don't fail if called again + applicationStore + .globalCleanupAsync( + applicationEntry.getApplicationId(), Executors.directExecutor()) + .join(); + } finally { + applicationStore.stop(); + } + } + + @Nonnull + private ApplicationStore createZooKeeperApplicationStore(String fullPath) throws Exception { + final CuratorFramework client = + zooKeeperExtension.getZooKeeperClient( + testingFatalErrorHandlerResource.getTestingFatalErrorHandler()); + // Ensure that the applications path exists + client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient()); + + // All operations will have the path as root + CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath); + final ZooKeeperStateHandleStore<ApplicationStoreEntry> zooKeeperStateHandleStore = + new ZooKeeperStateHandleStore<>(facade, localStateStorage); + return new DefaultApplicationStore<>( + zooKeeperStateHandleStore, ZooKeeperApplicationStoreUtil.INSTANCE); + } + + @Test + public void testRecoverApplications() throws Exception { + ApplicationStore applicationStore = + createZooKeeperApplicationStore("/testRecoverApplications"); + + try { + applicationStore.start(); + + HashMap<ApplicationID, ApplicationStoreEntry> expected = new HashMap<>(); + ApplicationID[] applicationIds = + new ApplicationID[] { + new ApplicationID(), new ApplicationID(), new ApplicationID() + }; + + expected.put(applicationIds[0], createApplicationStoreEntry(applicationIds[0])); + expected.put(applicationIds[1], createApplicationStoreEntry(applicationIds[1])); + expected.put(applicationIds[2], createApplicationStoreEntry(applicationIds[2])); + + // Add all + for (ApplicationStoreEntry applicationStoreEntry : expected.values()) { + applicationStore.putApplication(applicationStoreEntry); + } + + Collection<ApplicationID> actual = applicationStore.getApplicationIds(); + + assertThat(actual).hasSameSizeAs(expected.entrySet()); + + for (ApplicationID applicationId : actual) { + Optional<ApplicationStoreEntry> applicationStoreEntry = + applicationStore.recoverApplication(applicationId); + assertThat(applicationStoreEntry).isPresent(); + assertThat(expected).containsKey(applicationStoreEntry.get().getApplicationId()); + + verifyApplicationStoreEntries( + expected.get(applicationStoreEntry.get().getApplicationId()), + applicationStoreEntry.get()); + + applicationStore + .globalCleanupAsync( + applicationStoreEntry.get().getApplicationId(), + Executors.directExecutor()) + .join(); + } + + // Empty state + assertThat(applicationStore.getApplicationIds()).isEmpty(); + } finally { + applicationStore.stop(); + } + } + + @Test + public void testUpdateApplicationYouDidNotGetOrAdd() throws Exception { + ApplicationStore applicationStore = + createZooKeeperApplicationStore("/testUpdateApplicationYouDidNotGetOrAdd"); + + ApplicationStore otherApplicationStore = + createZooKeeperApplicationStore("/testUpdateApplicationYouDidNotGetOrAdd"); + + applicationStore.start(); + otherApplicationStore.start(); + + ApplicationStoreEntry applicationEntry = createApplicationStoreEntry(new ApplicationID()); + + applicationStore.putApplication(applicationEntry); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> otherApplicationStore.putApplication(applicationEntry)); + + applicationStore.stop(); + otherApplicationStore.stop(); + } + + /** + * Tests that we fail with an exception if the application cannot be removed from the + * ZooKeeperApplicationStore. + * + * <p>Tests that a close ZooKeeperApplicationStore no longer holds any locks. + */ + @Test + public void testApplicationRemovalFailureAndLockRelease() throws Exception { + final ApplicationStore submittedApplicationStore = + createZooKeeperApplicationStore("/testApplicationRemovalFailureAndLockRelease"); + final ApplicationStore otherSubmittedApplicationStore = + createZooKeeperApplicationStore("/testApplicationRemovalFailureAndLockRelease"); + + submittedApplicationStore.start(); + otherSubmittedApplicationStore.start(); + + final ApplicationStoreEntry applicationEntry = + createApplicationStoreEntry(new ApplicationID()); + submittedApplicationStore.putApplication(applicationEntry); + + final Optional<ApplicationStoreEntry> recoveredApplicationEntry = + otherSubmittedApplicationStore.recoverApplication( + applicationEntry.getApplicationId()); + + assertThat(recoveredApplicationEntry).isPresent(); + + assertThatExceptionOfType(Exception.class) + .as( + "It should not be possible to remove the ApplicationStoreEntry since the first store still has a lock on it.") + .isThrownBy( + () -> + otherSubmittedApplicationStore + .globalCleanupAsync( + recoveredApplicationEntry.get().getApplicationId(), + Executors.directExecutor()) + .join()); + + submittedApplicationStore.stop(); + + // now we should be able to delete the application entry + otherSubmittedApplicationStore + .globalCleanupAsync( + recoveredApplicationEntry.get().getApplicationId(), + Executors.directExecutor()) + .join(); + + assertThat( + otherSubmittedApplicationStore.recoverApplication( + recoveredApplicationEntry.get().getApplicationId())) + .isEmpty(); + + otherSubmittedApplicationStore.stop(); + } + + // --------------------------------------------------------------------------------------------- + + private ApplicationStoreEntry createApplicationStoreEntry(ApplicationID applicationId) { + return createApplicationStoreEntry(applicationId, "Test Application"); + } + + private ApplicationStoreEntry createApplicationStoreEntry( + ApplicationID applicationId, String name) { + return TestingApplicationStoreEntry.newBuilder() + .setApplicationId(applicationId) + .setName(name) + .build(); + } + + private void verifyApplicationStoreEntries( + ApplicationStoreEntry expected, ApplicationStoreEntry actual) { + assertThat(actual.getName()).isEqualTo(expected.getName()); + assertThat(actual.getApplicationId()).isEqualTo(expected.getApplicationId()); + } +}
