This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 579c7795b03689c8c02859c37900864033df8873
Author: Yi Zhang <[email protected]>
AuthorDate: Thu Mar 5 18:50:13 2026 +0800

    [FLINK-38975][runtime] Add ApplicationStore
---
 .../expert_high_availability_zk_section.html       |   6 +
 .../generated/high_availability_configuration.html |   6 +
 .../configuration/HighAvailabilityOptions.java     |   8 +
 .../KubernetesApplicationStoreUtil.java            |  54 ++++
 .../apache/flink/kubernetes/utils/Constants.java   |   2 +
 .../flink/kubernetes/utils/KubernetesUtils.java    |  62 ++++
 .../flink/runtime/jobmanager/ApplicationStore.java |  49 +++
 .../runtime/jobmanager/ApplicationStoreEntry.java  |  58 ++++
 .../runtime/jobmanager/ApplicationStoreUtil.java   |  44 +++
 .../runtime/jobmanager/ApplicationWriter.java      |  44 +++
 .../jobmanager/DefaultApplicationStore.java        | 328 +++++++++++++++++++++
 .../jobmanager/StandaloneApplicationStore.java     |  59 ++++
 .../jobmanager/ZooKeeperApplicationStoreUtil.java  |  37 +++
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |  52 ++++
 .../jobmanager/DefaultApplicationStoreTest.java    | 279 ++++++++++++++++++
 .../jobmanager/StandaloneApplicationStoreTest.java |  54 ++++
 .../jobmanager/TestingApplicationStoreEntry.java   |  80 +++++
 .../ZooKeeperApplicationStoreITCase.java           | 297 +++++++++++++++++++
 18 files changed, 1519 insertions(+)

diff --git 
a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html 
b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html
index c7ce745452d..4261b544870 100644
--- a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html
+++ b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html
@@ -56,6 +56,12 @@
             <td>Boolean</td>
             <td>Defines whether a suspended ZooKeeper connection will be 
treated as an error that causes the leader information to be invalidated or 
not. In case you set this option to <code 
class="highlighter-rouge">true</code>, Flink will wait until a ZooKeeper 
connection is marked as lost before it revokes the leadership of components. 
This has the effect that Flink is more resilient against temporary connection 
instabilities at the cost of running more likely into timing issues with  [...]
         </tr>
+        <tr>
+            <td><h5>high-availability.zookeeper.path.applications</h5></td>
+            <td style="word-wrap: break-word;">"/applications"</td>
+            <td>String</td>
+            <td>ZooKeeper root path (ZNode) for applications</td>
+        </tr>
         <tr>
             <td><h5>high-availability.zookeeper.path.execution-plans</h5></td>
             <td style="word-wrap: break-word;">"/execution-plans"</td>
diff --git 
a/docs/layouts/shortcodes/generated/high_availability_configuration.html 
b/docs/layouts/shortcodes/generated/high_availability_configuration.html
index 26a6b300907..116086bd88d 100644
--- a/docs/layouts/shortcodes/generated/high_availability_configuration.html
+++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html
@@ -98,6 +98,12 @@
             <td>Boolean</td>
             <td>Defines whether a suspended ZooKeeper connection will be 
treated as an error that causes the leader information to be invalidated or 
not. In case you set this option to <code 
class="highlighter-rouge">true</code>, Flink will wait until a ZooKeeper 
connection is marked as lost before it revokes the leadership of components. 
This has the effect that Flink is more resilient against temporary connection 
instabilities at the cost of running more likely into timing issues with  [...]
         </tr>
+        <tr>
+            <td><h5>high-availability.zookeeper.path.applications</h5></td>
+            <td style="word-wrap: break-word;">"/applications"</td>
+            <td>String</td>
+            <td>ZooKeeper root path (ZNode) for applications</td>
+        </tr>
         <tr>
             <td><h5>high-availability.zookeeper.path.execution-plans</h5></td>
             <td style="word-wrap: break-word;">"/execution-plans"</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 30480de619e..9e54491902e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -138,6 +138,14 @@ public class HighAvailabilityOptions {
                             "high-availability.zookeeper.path.jobgraphs")
                     .withDescription("ZooKeeper root path (ZNode) for 
execution plans");
 
+    /** ZooKeeper root path (ZNode) for applications. */
+    
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
+    public static final ConfigOption<String> HA_ZOOKEEPER_APPLICATIONS_PATH =
+            key("high-availability.zookeeper.path.applications")
+                    .stringType()
+                    .defaultValue("/applications")
+                    .withDescription("ZooKeeper root path (ZNode) for 
applications");
+
     // ------------------------------------------------------------------------
     //  ZooKeeper Client Settings
     // ------------------------------------------------------------------------
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesApplicationStoreUtil.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesApplicationStoreUtil.java
new file mode 100644
index 00000000000..dae6ff5a8c8
--- /dev/null
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesApplicationStoreUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreUtil;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.APPLICATION_STORE_KEY_PREFIX;
+
+/** Singleton {@link ApplicationStoreUtil} implementation for Kubernetes. */
+public enum KubernetesApplicationStoreUtil implements ApplicationStoreUtil {
+    INSTANCE;
+
+    /**
+     * Convert a key in ConfigMap to {@link ApplicationID}. The key is stored 
with prefix {@link
+     * Constants#APPLICATION_STORE_KEY_PREFIX}.
+     *
+     * @param key application key in ConfigMap.
+     * @return the parsed {@link ApplicationID}.
+     */
+    @Override
+    public ApplicationID nameToApplicationId(String key) {
+        return 
ApplicationID.fromHexString(key.substring(APPLICATION_STORE_KEY_PREFIX.length()));
+    }
+
+    /**
+     * Convert a {@link ApplicationID} to config map key. We will add prefix 
{@link
+     * Constants#APPLICATION_STORE_KEY_PREFIX}.
+     *
+     * @param applicationId application id
+     * @return a key to store application in the ConfigMap
+     */
+    @Override
+    public String applicationIdToName(ApplicationID applicationId) {
+        return APPLICATION_STORE_KEY_PREFIX + applicationId;
+    }
+}
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index 21d36b66d11..6359364248c 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -110,6 +110,8 @@ public class Constants {
     public static final String LEADER_SESSION_ID_KEY = "sessionId";
     public static final String EXECUTION_PLAN_STORE_KEY_PREFIX = 
"executionPlan-";
     public static final String SUBMITTED_EXECUTION_PLAN_FILE_PREFIX = 
"submittedExecutionPlan";
+    public static final String APPLICATION_STORE_KEY_PREFIX = "application-";
+    public static final String SUBMITTED_APPLICATION_FILE_PREFIX = 
"submittedApplication";
     public static final String CHECKPOINT_COUNTER_KEY = "counter";
     public static final String CHECKPOINT_ID_KEY_PREFIX = "checkpointID-";
     public static final String COMPLETED_CHECKPOINT_FILE_SUFFIX = 
"completedCheckpoint";
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index f097354fdc3..7786c0add56 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.RecoveryClaimMode;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.highavailability.KubernetesApplicationStoreUtil;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesCheckpointStoreUtil;
 import 
org.apache.flink.kubernetes.highavailability.KubernetesExecutionPlanStoreUtil;
 import org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore;
@@ -37,6 +38,9 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
 import 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobmanager.ApplicationStore;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry;
+import org.apache.flink.runtime.jobmanager.DefaultApplicationStore;
 import org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStore;
 import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
 import org.apache.flink.runtime.jobmanager.NoOpExecutionPlanStoreWatcher;
@@ -83,6 +87,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.kubernetes.utils.Constants.APPLICATION_STORE_KEY_PREFIX;
 import static 
org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_ID_KEY_PREFIX;
 import static 
org.apache.flink.kubernetes.utils.Constants.COMPLETED_CHECKPOINT_FILE_SUFFIX;
 import static org.apache.flink.kubernetes.utils.Constants.DNS_POLICY_DEFAULT;
@@ -90,6 +95,7 @@ import static 
org.apache.flink.kubernetes.utils.Constants.DNS_POLICY_HOSTNETWORK
 import static 
org.apache.flink.kubernetes.utils.Constants.EXECUTION_PLAN_STORE_KEY_PREFIX;
 import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
 import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.SUBMITTED_APPLICATION_FILE_PREFIX;
 import static 
org.apache.flink.kubernetes.utils.Constants.SUBMITTED_EXECUTION_PLAN_FILE_PREFIX;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -234,6 +240,62 @@ public class KubernetesUtils {
         return LeaderInformation.known(sessionID, leaderAddress);
     }
 
+    /**
+     * Create a {@link DefaultApplicationStore}.
+     *
+     * @param configuration configuration to build a 
RetrievableStateStorageHelper
+     * @param flinkKubeClient flink kubernetes client
+     * @param configMapName ConfigMap name
+     * @param lockIdentity lock identity to check the leadership
+     * @return a {@link DefaultExecutionPlanStore}
+     * @throws Exception when create the storage helper
+     */
+    public static ApplicationStore createApplicationStore(
+            Configuration configuration,
+            FlinkKubeClient flinkKubeClient,
+            String configMapName,
+            String lockIdentity)
+            throws Exception {
+
+        final KubernetesStateHandleStore<ApplicationStoreEntry> 
stateHandleStore =
+                createApplicationStateHandleStore(
+                        configuration, flinkKubeClient, configMapName, 
lockIdentity);
+        return new DefaultApplicationStore<>(
+                stateHandleStore, KubernetesApplicationStoreUtil.INSTANCE);
+    }
+
+    /**
+     * Create a {@link KubernetesStateHandleStore} which storing {@link 
ApplicationStoreEntry}.
+     *
+     * @param configuration configuration to build a 
RetrievableStateStorageHelper
+     * @param flinkKubeClient flink kubernetes client
+     * @param configMapName ConfigMap name
+     * @param lockIdentity lock identity to check the leadership
+     * @return a {@link KubernetesStateHandleStore} which storing {@link 
ApplicationStoreEntry}.
+     * @throws Exception when create the storage helper
+     */
+    public static KubernetesStateHandleStore<ApplicationStoreEntry>
+            createApplicationStateHandleStore(
+                    Configuration configuration,
+                    FlinkKubeClient flinkKubeClient,
+                    String configMapName,
+                    String lockIdentity)
+                    throws Exception {
+
+        final RetrievableStateStorageHelper<ApplicationStoreEntry> 
stateStorage =
+                new FileSystemStateStorageHelper<>(
+                        
HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(
+                                configuration),
+                        SUBMITTED_APPLICATION_FILE_PREFIX);
+
+        return new KubernetesStateHandleStore<>(
+                flinkKubeClient,
+                configMapName,
+                stateStorage,
+                k -> k.startsWith(APPLICATION_STORE_KEY_PREFIX),
+                lockIdentity);
+    }
+
     /**
      * Create a {@link DefaultExecutionPlanStore} with {@link 
NoOpExecutionPlanStoreWatcher}.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStore.java
new file mode 100644
index 00000000000..88a6cd8e326
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStore.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/** {@link ApplicationStoreEntry} instances for recovery. */
+public interface ApplicationStore extends ApplicationWriter {
+
+    /** Starts the {@link ApplicationStore} service. */
+    void start() throws Exception;
+
+    /** Stops the {@link ApplicationStore} service. */
+    void stop() throws Exception;
+
+    /**
+     * Returns the {@link ApplicationStoreEntry} with the given {@link 
ApplicationID} or {@link
+     * Optional#empty()} if no application was registered.
+     */
+    Optional<ApplicationStoreEntry> recoverApplication(ApplicationID 
applicationId)
+            throws Exception;
+
+    /**
+     * Get all application ids of submitted applications to the submitted 
application store.
+     *
+     * @return Collection of submitted application ids
+     * @throws Exception if the operation fails
+     */
+    Collection<ApplicationID> getApplicationIds() throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreEntry.java
new file mode 100644
index 00000000000..10a55473122
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreEntry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/** Entry of {@code ApplicationStore}. */
+public interface ApplicationStoreEntry extends Serializable {
+
+    /**
+     * Get the application from this entry.
+     *
+     * @param blobService The blob service to retrieve user artifacts
+     * @param recoveredJobInfos The info of jobs recovered from a previous run
+     * @param recoveredTerminalJobInfos The info of terminal jobs from a 
previous run
+     * @return The application
+     */
+    AbstractApplication getApplication(
+            PermanentBlobService blobService,
+            Collection<JobInfo> recoveredJobInfos,
+            Collection<JobInfo> recoveredTerminalJobInfos);
+
+    /**
+     * Gets the unique identifier of the application.
+     *
+     * @return the application id
+     */
+    ApplicationID getApplicationId();
+
+    /**
+     * Gets the name of the application.
+     *
+     * @return the application name
+     */
+    String getName();
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java
new file mode 100644
index 00000000000..0ec9f9d47c5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+
+/**
+ * ApplicationStore utility interfaces. For example, convert a name(e.g. 
ZooKeeper path, key name in
+ * Kubernetes ConfigMap) to {@link ApplicationID}, or vice versa.
+ */
+public interface ApplicationStoreUtil {
+
+    /**
+     * Get the name in external storage from application id.
+     *
+     * @param applicationId application id
+     * @return Key name in ConfigMap or child path name in ZooKeeper
+     */
+    String applicationIdToName(ApplicationID applicationId);
+
+    /**
+     * Get the application id from name.
+     *
+     * @param name Key name in ConfigMap or child path name in ZooKeeper
+     * @return parsed application id.
+     */
+    ApplicationID nameToApplicationId(String name);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java
new file mode 100644
index 00000000000..f49956b412e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ApplicationID;
+import 
org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableApplicationResource;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/** Allows to store and remove applications. */
+@Internal
+public interface ApplicationWriter extends 
GloballyCleanableApplicationResource {
+
+    /**
+     * Adds the {@link ApplicationStoreEntry} instance.
+     *
+     * <p>If an application with the same {@link ApplicationID} exists, it is 
replaced.
+     */
+    void putApplication(ApplicationStoreEntry application) throws Exception;
+
+    @Override
+    default CompletableFuture<Void> globalCleanupAsync(
+            ApplicationID applicationId, Executor executor) {
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java
new file mode 100644
index 00000000000..d332a246ad1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.persistence.ResourceVersion;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Default implementation for {@link ApplicationStore}. Combined with 
different {@link
+ * StateHandleStore}, we could persist the applications to various distributed 
storage.
+ */
+public class DefaultApplicationStore<R extends ResourceVersion<R>> implements 
ApplicationStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultApplicationStore.class);
+
+    private final Object lock = new Object();
+
+    /** The set of IDs of all added applications. */
+    @GuardedBy("lock")
+    private final Set<ApplicationID> addedApplications = new HashSet<>();
+
+    /** Submitted applications handle store. */
+    private final StateHandleStore<ApplicationStoreEntry, R> 
applicationStateHandleStore;
+
+    private final ApplicationStoreUtil applicationStoreUtil;
+
+    /** Flag indicating whether this instance is running. */
+    @GuardedBy("lock")
+    private volatile boolean running;
+
+    public DefaultApplicationStore(
+            StateHandleStore<ApplicationStoreEntry, R> stateHandleStore,
+            ApplicationStoreUtil applicationStoreUtil) {
+        this.applicationStateHandleStore = checkNotNull(stateHandleStore);
+        this.applicationStoreUtil = checkNotNull(applicationStoreUtil);
+
+        this.running = false;
+    }
+
+    @Override
+    public void start() throws Exception {
+        synchronized (lock) {
+            if (!running) {
+                running = true;
+            }
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        synchronized (lock) {
+            if (running) {
+                running = false;
+                LOG.info("Stopping DefaultApplicationStore.");
+                Exception exception = null;
+
+                try {
+                    applicationStateHandleStore.releaseAll();
+                } catch (Exception e) {
+                    exception = e;
+                }
+
+                if (exception != null) {
+                    throw new FlinkException(
+                            "Could not properly stop the 
DefaultApplicationStore.", exception);
+                }
+            }
+        }
+    }
+
+    @Override
+    public Optional<ApplicationStoreEntry> recoverApplication(ApplicationID 
applicationId)
+            throws Exception {
+        checkNotNull(applicationId, "Application ID");
+
+        LOG.debug("Recovering application {} from {}.", applicationId, 
applicationStateHandleStore);
+
+        final String name = 
applicationStoreUtil.applicationIdToName(applicationId);
+
+        synchronized (lock) {
+            verifyIsRunning();
+
+            boolean success = false;
+
+            RetrievableStateHandle<ApplicationStoreEntry> 
applicationRetrievableStateHandle;
+
+            try {
+                try {
+                    applicationRetrievableStateHandle =
+                            applicationStateHandleStore.getAndLock(name);
+                } catch (StateHandleStore.NotExistException ignored) {
+                    success = true;
+                    return Optional.empty();
+                } catch (Exception e) {
+                    throw new FlinkException(
+                            "Could not retrieve the submitted application 
state handle "
+                                    + "for "
+                                    + name
+                                    + " from the submitted application store.",
+                            e);
+                }
+
+                ApplicationStoreEntry application;
+                try {
+                    application = 
applicationRetrievableStateHandle.retrieveState();
+                } catch (ClassNotFoundException cnfe) {
+                    throw new FlinkException(
+                            "Could not retrieve submitted application from 
state handle under "
+                                    + name
+                                    + ". This indicates that you are trying to 
recover from state written by an "
+                                    + "older Flink version which is not 
compatible. Try cleaning the state handle store.",
+                            cnfe);
+                } catch (IOException ioe) {
+                    throw new FlinkException(
+                            "Could not retrieve submitted application from 
state handle under "
+                                    + name
+                                    + ". This indicates that the retrieved 
state handle is broken. Try cleaning the state handle "
+                                    + "store.",
+                            ioe);
+                }
+
+                addedApplications.add(applicationId);
+
+                LOG.info("Recovered {} ({}).", application.getName(), 
applicationId);
+
+                success = true;
+                return Optional.of(application);
+            } finally {
+                if (!success) {
+                    applicationStateHandleStore.release(name);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void putApplication(ApplicationStoreEntry application) throws 
Exception {
+        checkNotNull(application, "Application");
+
+        final ApplicationID applicationId = application.getApplicationId();
+        final String name = 
applicationStoreUtil.applicationIdToName(applicationId);
+
+        LOG.debug("Adding application {} to {}.", applicationId, 
applicationStateHandleStore);
+
+        boolean success = false;
+
+        while (!success) {
+            synchronized (lock) {
+                verifyIsRunning();
+
+                final R currentVersion = 
applicationStateHandleStore.exists(name);
+
+                if (!currentVersion.isExisting()) {
+                    try {
+                        applicationStateHandleStore.addAndLock(name, 
application);
+
+                        addedApplications.add(applicationId);
+
+                        success = true;
+                    } catch (StateHandleStore.AlreadyExistException ignored) {
+                        LOG.warn(
+                                "{} already exists in {}.",
+                                application,
+                                applicationStateHandleStore);
+                    }
+                } else if (addedApplications.contains(applicationId)) {
+                    try {
+                        applicationStateHandleStore.replace(name, 
currentVersion, application);
+                        LOG.info("Updated {} in {}.", application, 
getClass().getSimpleName());
+
+                        success = true;
+                    } catch (StateHandleStore.NotExistException ignored) {
+                        LOG.warn(
+                                "{} does not exists in {}.",
+                                application,
+                                applicationStateHandleStore);
+                    }
+                } else {
+                    throw new IllegalStateException(
+                            "Trying to update an application you didn't "
+                                    + "#getAllSubmittedApplications() or 
#putApplication() yourself before.");
+                }
+            }
+        }
+
+        LOG.info("Added {} to {}.", application, applicationStateHandleStore);
+    }
+
+    @Override
+    public CompletableFuture<Void> globalCleanupAsync(
+            ApplicationID applicationId, Executor executor) {
+        checkNotNull(applicationId, "Application ID");
+
+        return runAsyncWithLockAssertRunning(
+                () -> {
+                    LOG.debug(
+                            "Removing application {} from {}.",
+                            applicationId,
+                            applicationStateHandleStore);
+
+                    final String name = 
applicationStoreUtil.applicationIdToName(applicationId);
+                    releaseAndRemoveOrThrowCompletionException(applicationId, 
name);
+
+                    addedApplications.remove(applicationId);
+
+                    LOG.info(
+                            "Removed application {} from {}.",
+                            applicationId,
+                            applicationStateHandleStore);
+                },
+                executor);
+    }
+
+    @GuardedBy("lock")
+    private void releaseAndRemoveOrThrowCompletionException(
+            ApplicationID applicationId, String applicationName) {
+        boolean success;
+        try {
+            success = 
applicationStateHandleStore.releaseAndTryRemove(applicationName);
+        } catch (Exception e) {
+            throw new CompletionException(e);
+        }
+
+        if (!success) {
+            throw new CompletionException(
+                    new FlinkException(
+                            String.format(
+                                    "Could not remove application with 
application id %s from %s.",
+                                    applicationId, 
applicationStateHandleStore)));
+        }
+    }
+
+    private CompletableFuture<Void> runAsyncWithLockAssertRunning(
+            ThrowingRunnable<Exception> runnable, Executor executor) {
+        return CompletableFuture.runAsync(
+                () -> {
+                    synchronized (lock) {
+                        verifyIsRunning();
+                        try {
+                            runnable.run();
+                        } catch (Exception e) {
+                            throw new CompletionException(e);
+                        }
+                    }
+                },
+                executor);
+    }
+
+    @Override
+    public Collection<ApplicationID> getApplicationIds() throws Exception {
+        LOG.debug("Retrieving all stored application ids from {}.", 
applicationStateHandleStore);
+
+        final Collection<String> names;
+        try {
+            names = applicationStateHandleStore.getAllHandles();
+        } catch (Exception e) {
+            throw new Exception(
+                    "Failed to retrieve all application ids from "
+                            + applicationStateHandleStore
+                            + ".",
+                    e);
+        }
+
+        final List<ApplicationID> applicationIds = new 
ArrayList<>(names.size());
+
+        for (String name : names) {
+            try {
+                
applicationIds.add(applicationStoreUtil.nameToApplicationId(name));
+            } catch (Exception exception) {
+                LOG.warn(
+                        "Could not parse application id from {}. This 
indicates a malformed name.",
+                        name,
+                        exception);
+            }
+        }
+
+        LOG.info(
+                "Retrieved application ids {} from {}",
+                applicationIds,
+                applicationStateHandleStore);
+
+        return applicationIds;
+    }
+
+    /** Verifies that the state is running. */
+    private void verifyIsRunning() {
+        checkState(running, "Not running. Forgot to call start()?");
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java
new file mode 100644
index 00000000000..d35f3ed7e7e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * {@link ApplicationStore} instances for JobManagers running in {@link 
HighAvailabilityMode#NONE}.
+ *
+ * <p>All operations are NoOps, because {@link ApplicationStoreEntry} 
instances cannot be recovered
+ * in this recovery mode.
+ */
+public class StandaloneApplicationStore implements ApplicationStore {
+
+    @Override
+    public void start() throws Exception {
+        // Nothing to do
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do
+    }
+
+    @Override
+    public void putApplication(ApplicationStoreEntry application) {
+        // Nothing to do
+    }
+
+    @Override
+    public Collection<ApplicationID> getApplicationIds() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Optional<ApplicationStoreEntry> recoverApplication(ApplicationID 
applicationID) {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java
new file mode 100644
index 00000000000..3682ead2e93
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/** Singleton {@link ApplicationStoreUtil} implementation for ZooKeeper. */
+public enum ZooKeeperApplicationStoreUtil implements ApplicationStoreUtil {
+    INSTANCE;
+
+    @Override
+    public String applicationIdToName(ApplicationID applicationId) {
+        return ZooKeeperUtils.getPathForApplication(applicationId);
+    }
+
+    @Override
+    public ApplicationID nameToApplicationId(String name) {
+        return ApplicationID.fromHexString(name);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index a1ed8f78f0e..e814c35987e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -35,9 +36,13 @@ import 
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import 
org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
+import org.apache.flink.runtime.jobmanager.ApplicationStore;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry;
+import org.apache.flink.runtime.jobmanager.DefaultApplicationStore;
 import org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStore;
 import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmanager.ZooKeeperApplicationStoreUtil;
 import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreUtil;
 import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreWatcher;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
@@ -102,6 +107,9 @@ public class ZooKeeperUtils {
     public static final String HA_STORAGE_SUBMITTED_EXECUTION_PLAN_PREFIX =
             "submittedExecutionPlan";
 
+    /** The prefix of the submitted application file. */
+    public static final String HA_STORAGE_SUBMITTED_APPLICATION_PREFIX = 
"submittedApplication";
+
     /** The prefix of the completed checkpoint file. */
     public static final String HA_STORAGE_COMPLETED_CHECKPOINT = 
"completedCheckpoint";
 
@@ -561,6 +569,44 @@ public class ZooKeeperUtils {
                 ZooKeeperExecutionPlanStoreUtil.INSTANCE);
     }
 
+    /**
+     * Creates a {@link DefaultApplicationStore} instance with {@link 
ZooKeeperStateHandleStore},
+     * and {@link ZooKeeperApplicationStoreUtil}.
+     *
+     * @param client The {@link CuratorFramework} ZooKeeper client to use
+     * @param configuration {@link Configuration} object
+     * @return {@link DefaultApplicationStore} instance
+     * @throws Exception if the submitted application store cannot be created
+     */
+    public static ApplicationStore createApplicationStore(
+            CuratorFramework client, Configuration configuration) throws 
Exception {
+
+        checkNotNull(configuration, "Configuration");
+
+        RetrievableStateStorageHelper<ApplicationStoreEntry> stateStorage =
+                createFileSystemStateStorage(
+                        configuration, 
HA_STORAGE_SUBMITTED_APPLICATION_PREFIX);
+
+        // ZooKeeper submitted applications root dir
+        String zooKeeperApplicationsPath =
+                
configuration.get(HighAvailabilityOptions.HA_ZOOKEEPER_APPLICATIONS_PATH);
+
+        // Ensure that the applications path exists
+        client.newNamespaceAwareEnsurePath(zooKeeperApplicationsPath)
+                .ensure(client.getZookeeperClient());
+
+        // All operations will have the path as root
+        CuratorFramework facade =
+                client.usingNamespace(client.getNamespace() + 
zooKeeperApplicationsPath);
+
+        final ZooKeeperStateHandleStore<ApplicationStoreEntry>
+                zooKeeperApplicationStateHandleStore =
+                        new ZooKeeperStateHandleStore<>(facade, stateStorage);
+
+        return new DefaultApplicationStore<>(
+                zooKeeperApplicationStateHandleStore, 
ZooKeeperApplicationStoreUtil.INSTANCE);
+    }
+
     /**
      * Creates a {@link DefaultCompletedCheckpointStore} instance with {@link
      * ZooKeeperStateHandleStore}.
@@ -616,6 +662,12 @@ public class ZooKeeperUtils {
         return String.format("/%s", jobId);
     }
 
+    /** Returns the ApplicationID as a String (with leading slash). */
+    public static String getPathForApplication(ApplicationID applicationId) {
+        checkNotNull(applicationId, "Application ID");
+        return String.format("/%s", applicationId);
+    }
+
     /**
      * Creates an instance of {@link ZooKeeperStateHandleStore}.
      *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStoreTest.java
new file mode 100644
index 00000000000..e37b4ca91ab
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStoreTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import 
org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
+import org.apache.flink.runtime.persistence.IntegerResourceVersion;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.persistence.TestingStateHandleStore;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link DefaultApplicationStore} with {@link 
TestingStateHandleStore} and {@link
+ * TestingApplicationStoreEntry}.
+ */
+public class DefaultApplicationStoreTest extends TestLogger {
+
+    private final ApplicationStoreEntry testingApplication =
+            TestingApplicationStoreEntry.newBuilder().build();
+    private final long timeout = 100L;
+
+    private TestingStateHandleStore.Builder<ApplicationStoreEntry> builder;
+    private TestingRetrievableStateStorageHelper<ApplicationStoreEntry> 
storageHelper;
+
+    @Before
+    public void setup() {
+        builder = TestingStateHandleStore.newBuilder();
+        storageHelper = new TestingRetrievableStateStorageHelper<>();
+    }
+
+    @Test
+    public void testRecoverApplication() throws Exception {
+        final RetrievableStateHandle<ApplicationStoreEntry> stateHandle =
+                storageHelper.store(testingApplication);
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setGetFunction(ignore -> stateHandle).build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+
+        final Optional<ApplicationStoreEntry> recoveredApplication =
+                
applicationStore.recoverApplication(testingApplication.getApplicationId());
+        assertThat(recoveredApplication).isPresent();
+        assertThat(recoveredApplication.get().getApplicationId())
+                .isEqualTo(testingApplication.getApplicationId());
+    }
+
+    @Test
+    public void testRecoverApplicationWhenNotExist() throws Exception {
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setGetFunction(
+                                ignore -> {
+                                    throw new 
StateHandleStore.NotExistException("Not exist.");
+                                })
+                        .build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+
+        final Optional<ApplicationStoreEntry> recoveredApplication =
+                
applicationStore.recoverApplication(testingApplication.getApplicationId());
+        assertThat(recoveredApplication).isEmpty();
+    }
+
+    @Test
+    public void testRecoverApplicationFailedShouldReleaseHandle() throws 
Exception {
+        final CompletableFuture<String> releaseFuture = new 
CompletableFuture<>();
+        final FlinkException testException = new FlinkException("Test 
exception.");
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setGetFunction(
+                                ignore -> {
+                                    throw testException;
+                                })
+                        .setReleaseConsumer(releaseFuture::complete)
+                        .build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+
+        assertThatThrownBy(
+                        () ->
+                                applicationStore.recoverApplication(
+                                        testingApplication.getApplicationId()))
+                .hasCause(testException);
+        String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS);
+        assertThat(testingApplication.getApplicationId()).hasToString(actual);
+    }
+
+    @Test
+    public void testPutApplicationWhenNotExist() throws Exception {
+        final CompletableFuture<ApplicationStoreEntry> addFuture = new 
CompletableFuture<>();
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setExistsFunction(ignore -> 
IntegerResourceVersion.notExisting())
+                        .setAddFunction(
+                                (ignore, state) -> {
+                                    addFuture.complete(state);
+                                    return storageHelper.store(state);
+                                })
+                        .build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+        applicationStore.putApplication(testingApplication);
+
+        final ApplicationStoreEntry actual = addFuture.get(timeout, 
TimeUnit.MILLISECONDS);
+        
assertThat(actual.getApplicationId()).isEqualTo(testingApplication.getApplicationId());
+    }
+
+    @Test
+    public void testPutApplicationWhenAlreadyExist() throws Exception {
+        final CompletableFuture<Tuple3<String, IntegerResourceVersion, 
ApplicationStoreEntry>>
+                replaceFuture = new CompletableFuture<>();
+        final int resourceVersion = 100;
+        final AtomicBoolean alreadyExist = new AtomicBoolean(false);
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setExistsFunction(
+                                ignore -> {
+                                    if (alreadyExist.get()) {
+                                        return 
IntegerResourceVersion.valueOf(resourceVersion);
+                                    } else {
+                                        alreadyExist.set(true);
+                                        return 
IntegerResourceVersion.notExisting();
+                                    }
+                                })
+                        .setAddFunction((ignore, state) -> 
storageHelper.store(state))
+                        .setReplaceConsumer(replaceFuture::complete)
+                        .build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+        applicationStore.putApplication(testingApplication);
+        // Replace
+        applicationStore.putApplication(testingApplication);
+
+        final Tuple3<String, IntegerResourceVersion, ApplicationStoreEntry> 
actual =
+                replaceFuture.get(timeout, TimeUnit.MILLISECONDS);
+        
assertThat(actual.f0).isEqualTo(testingApplication.getApplicationId().toString());
+        
assertThat(actual.f1).isEqualTo(IntegerResourceVersion.valueOf(resourceVersion));
+        
assertThat(actual.f2.getApplicationId()).isEqualTo(testingApplication.getApplicationId());
+    }
+
+    @Test
+    public void testGlobalCleanup() throws Exception {
+        final CompletableFuture<ApplicationID> removeFuture = new 
CompletableFuture<>();
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setAddFunction((ignore, state) -> 
storageHelper.store(state))
+                        .setRemoveFunction(
+                                name -> {
+                                    
removeFuture.complete(ApplicationID.fromHexString(name));
+                                    return true;
+                                })
+                        .build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+
+        applicationStore.putApplication(testingApplication);
+        applicationStore
+                .globalCleanupAsync(
+                        testingApplication.getApplicationId(), 
Executors.directExecutor())
+                .join();
+        final ApplicationID actual = removeFuture.get(timeout, 
TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(testingApplication.getApplicationId());
+    }
+
+    @Test
+    public void testGlobalCleanupWithNonExistName() throws Exception {
+        final CompletableFuture<ApplicationID> removeFuture = new 
CompletableFuture<>();
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setRemoveFunction(
+                                name -> {
+                                    
removeFuture.complete(ApplicationID.fromHexString(name));
+                                    return true;
+                                })
+                        .build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+        applicationStore
+                .globalCleanupAsync(
+                        testingApplication.getApplicationId(), 
Executors.directExecutor())
+                .join();
+
+        assertThat(removeFuture).isDone();
+    }
+
+    @Test
+    public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception 
{
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setRemoveFunction(name -> false).build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+        assertThatThrownBy(
+                        () ->
+                                applicationStore
+                                        .globalCleanupAsync(
+                                                
testingApplication.getApplicationId(),
+                                                Executors.directExecutor())
+                                        .get())
+                .isInstanceOf(ExecutionException.class);
+    }
+
+    @Test
+    public void testGetApplicationIds() throws Exception {
+        final Collection<ApplicationID> existingApplicationIds =
+                Arrays.asList(new ApplicationID(), new ApplicationID());
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setGetAllHandlesSupplier(
+                                () ->
+                                        existingApplicationIds.stream()
+                                                .map(ApplicationID::toString)
+                                                .collect(Collectors.toList()))
+                        .build();
+
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+        final Collection<ApplicationID> applicationIds = 
applicationStore.getApplicationIds();
+        
assertThat(applicationIds).containsExactlyInAnyOrderElementsOf(existingApplicationIds);
+    }
+
+    @Test
+    public void testStoppingApplicationStoreShouldReleaseAllHandles() throws 
Exception {
+        final CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+        final TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore =
+                builder.setReleaseAllHandlesRunnable(() -> 
completableFuture.complete(null))
+                        .build();
+        final ApplicationStore applicationStore = 
createAndStartApplicationStore(stateHandleStore);
+        applicationStore.stop();
+
+        assertThat(completableFuture).isDone();
+    }
+
+    private ApplicationStore createAndStartApplicationStore(
+            TestingStateHandleStore<ApplicationStoreEntry> stateHandleStore) 
throws Exception {
+        final ApplicationStore applicationStore =
+                new DefaultApplicationStore<>(
+                        stateHandleStore,
+                        new ApplicationStoreUtil() {
+                            @Override
+                            public String applicationIdToName(ApplicationID 
applicationId) {
+                                return applicationId.toString();
+                            }
+
+                            @Override
+                            public ApplicationID nameToApplicationId(String 
name) {
+                                return ApplicationID.fromHexString(name);
+                            }
+                        });
+        applicationStore.start();
+        return applicationStore;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStoreTest.java
new file mode 100644
index 00000000000..984554db8e9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStoreTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/** Tests for the {@link StandaloneApplicationStore}. */
+public class StandaloneApplicationStoreTest {
+
+    /** Tests that all operations work and don't change the state. */
+    @Test
+    public void testNoOps() throws Exception {
+        StandaloneApplicationStore applicationStore = new 
StandaloneApplicationStore();
+        applicationStore.start();
+
+        ApplicationID applicationId = new ApplicationID();
+        ApplicationStoreEntry applicationStoreEntry =
+                
TestingApplicationStoreEntry.newBuilder().setApplicationId(applicationId).build();
+
+        assertEquals(0, applicationStore.getApplicationIds().size());
+
+        applicationStore.putApplication(applicationStoreEntry);
+        assertEquals(0, applicationStore.getApplicationIds().size());
+
+        applicationStore.globalCleanupAsync(applicationId, 
Executors.directExecutor()).join();
+        assertEquals(0, applicationStore.getApplicationIds().size());
+
+        
assertFalse(applicationStore.recoverApplication(applicationId).isPresent());
+
+        applicationStore.stop();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TestingApplicationStoreEntry.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TestingApplicationStoreEntry.java
new file mode 100644
index 00000000000..7148c9e8e47
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TestingApplicationStoreEntry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+
+import java.util.Collection;
+
+/** {@link ApplicationStoreEntry} implementation for testing purposes. */
+public class TestingApplicationStoreEntry implements ApplicationStoreEntry {
+
+    private final ApplicationID applicationId;
+    private final String name;
+
+    public TestingApplicationStoreEntry(ApplicationID applicationId, String 
name) {
+        this.applicationId = applicationId;
+        this.name = name;
+    }
+
+    @Override
+    public AbstractApplication getApplication(
+            PermanentBlobService blobService,
+            Collection<JobInfo> recoveredJobInfos,
+            Collection<JobInfo> recoveredTerminalJobInfos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ApplicationID getApplicationId() {
+        return applicationId;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** Builder for creating {@link TestingApplicationStoreEntry} instances. */
+    public static class Builder {
+        private ApplicationID applicationId = new ApplicationID();
+        private String name = "TestingApplication";
+
+        public Builder setApplicationId(ApplicationID applicationId) {
+            this.applicationId = applicationId;
+            return this;
+        }
+
+        public Builder setName(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public TestingApplicationStoreEntry build() {
+            return new TestingApplicationStoreEntry(applicationId, name);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreITCase.java
new file mode 100644
index 00000000000..2cd4b89bb3b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreITCase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/**
+ * IT tests for {@link DefaultApplicationStore} with all ZooKeeper 
components(e.g. {@link
+ * ZooKeeperStateHandleStore}, {@link ZooKeeperApplicationStoreUtil}).
+ */
+public class ZooKeeperApplicationStoreITCase extends TestLogger {
+
+    private final ZooKeeperExtension zooKeeperExtension = new 
ZooKeeperExtension();
+
+    @RegisterExtension
+    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource =
+            new EachCallbackWrapper<>(zooKeeperExtension);
+
+    @RegisterExtension
+    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource =
+            new TestingFatalErrorHandlerExtension();
+
+    private static final RetrievableStateStorageHelper<ApplicationStoreEntry> 
localStateStorage =
+            applicationStoreEntry -> {
+                ByteStreamStateHandle byteStreamStateHandle =
+                        new ByteStreamStateHandle(
+                                String.valueOf(java.util.UUID.randomUUID()),
+                                
InstantiationUtil.serializeObject(applicationStoreEntry));
+                return new 
RetrievableStreamStateHandle<>(byteStreamStateHandle);
+            };
+
+    @Test
+    public void testPutAndRemoveApplication() throws Exception {
+        ApplicationStore applicationStore =
+                
createZooKeeperApplicationStore("/testPutAndRemoveApplication");
+
+        try {
+            applicationStore.start();
+
+            ApplicationStoreEntry applicationEntry =
+                    createApplicationStoreEntry(new ApplicationID(), 
"AppName");
+
+            // Empty state
+            assertThat(applicationStore.getApplicationIds()).isEmpty();
+
+            // Add initial
+            applicationStore.putApplication(applicationEntry);
+
+            // Verify initial application
+            Collection<ApplicationID> applicationIds = 
applicationStore.getApplicationIds();
+            assertThat(applicationIds).hasSize(1);
+
+            ApplicationID applicationId = applicationIds.iterator().next();
+
+            Optional<ApplicationStoreEntry> recoveredEntry =
+                    applicationStore.recoverApplication(applicationId);
+            assertThat(recoveredEntry).isPresent();
+            verifyApplicationStoreEntries(applicationEntry, 
recoveredEntry.get());
+
+            // Update (same ID)
+            applicationEntry =
+                    createApplicationStoreEntry(
+                            applicationEntry.getApplicationId(), "Updated 
AppName");
+            applicationStore.putApplication(applicationEntry);
+
+            // Verify updated
+            applicationIds = applicationStore.getApplicationIds();
+            assertThat(applicationIds).hasSize(1);
+
+            applicationId = applicationIds.iterator().next();
+
+            recoveredEntry = 
applicationStore.recoverApplication(applicationId);
+            assertThat(recoveredEntry).isPresent();
+            verifyApplicationStoreEntries(applicationEntry, 
recoveredEntry.get());
+
+            // Remove
+            applicationStore
+                    .globalCleanupAsync(
+                            applicationEntry.getApplicationId(), 
Executors.directExecutor())
+                    .join();
+
+            // Empty state
+            assertThat(applicationStore.getApplicationIds()).isEmpty();
+
+            // Don't fail if called again
+            applicationStore
+                    .globalCleanupAsync(
+                            applicationEntry.getApplicationId(), 
Executors.directExecutor())
+                    .join();
+        } finally {
+            applicationStore.stop();
+        }
+    }
+
+    @Nonnull
+    private ApplicationStore createZooKeeperApplicationStore(String fullPath) 
throws Exception {
+        final CuratorFramework client =
+                zooKeeperExtension.getZooKeeperClient(
+                        
testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
+        // Ensure that the applications path exists
+        
client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
+
+        // All operations will have the path as root
+        CuratorFramework facade = client.usingNamespace(client.getNamespace() 
+ fullPath);
+        final ZooKeeperStateHandleStore<ApplicationStoreEntry> 
zooKeeperStateHandleStore =
+                new ZooKeeperStateHandleStore<>(facade, localStateStorage);
+        return new DefaultApplicationStore<>(
+                zooKeeperStateHandleStore, 
ZooKeeperApplicationStoreUtil.INSTANCE);
+    }
+
+    @Test
+    public void testRecoverApplications() throws Exception {
+        ApplicationStore applicationStore =
+                createZooKeeperApplicationStore("/testRecoverApplications");
+
+        try {
+            applicationStore.start();
+
+            HashMap<ApplicationID, ApplicationStoreEntry> expected = new 
HashMap<>();
+            ApplicationID[] applicationIds =
+                    new ApplicationID[] {
+                        new ApplicationID(), new ApplicationID(), new 
ApplicationID()
+                    };
+
+            expected.put(applicationIds[0], 
createApplicationStoreEntry(applicationIds[0]));
+            expected.put(applicationIds[1], 
createApplicationStoreEntry(applicationIds[1]));
+            expected.put(applicationIds[2], 
createApplicationStoreEntry(applicationIds[2]));
+
+            // Add all
+            for (ApplicationStoreEntry applicationStoreEntry : 
expected.values()) {
+                applicationStore.putApplication(applicationStoreEntry);
+            }
+
+            Collection<ApplicationID> actual = 
applicationStore.getApplicationIds();
+
+            assertThat(actual).hasSameSizeAs(expected.entrySet());
+
+            for (ApplicationID applicationId : actual) {
+                Optional<ApplicationStoreEntry> applicationStoreEntry =
+                        applicationStore.recoverApplication(applicationId);
+                assertThat(applicationStoreEntry).isPresent();
+                
assertThat(expected).containsKey(applicationStoreEntry.get().getApplicationId());
+
+                verifyApplicationStoreEntries(
+                        
expected.get(applicationStoreEntry.get().getApplicationId()),
+                        applicationStoreEntry.get());
+
+                applicationStore
+                        .globalCleanupAsync(
+                                applicationStoreEntry.get().getApplicationId(),
+                                Executors.directExecutor())
+                        .join();
+            }
+
+            // Empty state
+            assertThat(applicationStore.getApplicationIds()).isEmpty();
+        } finally {
+            applicationStore.stop();
+        }
+    }
+
+    @Test
+    public void testUpdateApplicationYouDidNotGetOrAdd() throws Exception {
+        ApplicationStore applicationStore =
+                
createZooKeeperApplicationStore("/testUpdateApplicationYouDidNotGetOrAdd");
+
+        ApplicationStore otherApplicationStore =
+                
createZooKeeperApplicationStore("/testUpdateApplicationYouDidNotGetOrAdd");
+
+        applicationStore.start();
+        otherApplicationStore.start();
+
+        ApplicationStoreEntry applicationEntry = 
createApplicationStoreEntry(new ApplicationID());
+
+        applicationStore.putApplication(applicationEntry);
+
+        assertThatExceptionOfType(IllegalStateException.class)
+                .isThrownBy(() -> 
otherApplicationStore.putApplication(applicationEntry));
+
+        applicationStore.stop();
+        otherApplicationStore.stop();
+    }
+
+    /**
+     * Tests that we fail with an exception if the application cannot be 
removed from the
+     * ZooKeeperApplicationStore.
+     *
+     * <p>Tests that a close ZooKeeperApplicationStore no longer holds any 
locks.
+     */
+    @Test
+    public void testApplicationRemovalFailureAndLockRelease() throws Exception 
{
+        final ApplicationStore submittedApplicationStore =
+                
createZooKeeperApplicationStore("/testApplicationRemovalFailureAndLockRelease");
+        final ApplicationStore otherSubmittedApplicationStore =
+                
createZooKeeperApplicationStore("/testApplicationRemovalFailureAndLockRelease");
+
+        submittedApplicationStore.start();
+        otherSubmittedApplicationStore.start();
+
+        final ApplicationStoreEntry applicationEntry =
+                createApplicationStoreEntry(new ApplicationID());
+        submittedApplicationStore.putApplication(applicationEntry);
+
+        final Optional<ApplicationStoreEntry> recoveredApplicationEntry =
+                otherSubmittedApplicationStore.recoverApplication(
+                        applicationEntry.getApplicationId());
+
+        assertThat(recoveredApplicationEntry).isPresent();
+
+        assertThatExceptionOfType(Exception.class)
+                .as(
+                        "It should not be possible to remove the 
ApplicationStoreEntry since the first store still has a lock on it.")
+                .isThrownBy(
+                        () ->
+                                otherSubmittedApplicationStore
+                                        .globalCleanupAsync(
+                                                
recoveredApplicationEntry.get().getApplicationId(),
+                                                Executors.directExecutor())
+                                        .join());
+
+        submittedApplicationStore.stop();
+
+        // now we should be able to delete the application entry
+        otherSubmittedApplicationStore
+                .globalCleanupAsync(
+                        recoveredApplicationEntry.get().getApplicationId(),
+                        Executors.directExecutor())
+                .join();
+
+        assertThat(
+                        otherSubmittedApplicationStore.recoverApplication(
+                                
recoveredApplicationEntry.get().getApplicationId()))
+                .isEmpty();
+
+        otherSubmittedApplicationStore.stop();
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+
+    private ApplicationStoreEntry createApplicationStoreEntry(ApplicationID 
applicationId) {
+        return createApplicationStoreEntry(applicationId, "Test Application");
+    }
+
+    private ApplicationStoreEntry createApplicationStoreEntry(
+            ApplicationID applicationId, String name) {
+        return TestingApplicationStoreEntry.newBuilder()
+                .setApplicationId(applicationId)
+                .setName(name)
+                .build();
+    }
+
+    private void verifyApplicationStoreEntries(
+            ApplicationStoreEntry expected, ApplicationStoreEntry actual) {
+        assertThat(actual.getName()).isEqualTo(expected.getName());
+        
assertThat(actual.getApplicationId()).isEqualTo(expected.getApplicationId());
+    }
+}

Reply via email to