This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c12def92b76fe3bbe6029a5b1b03b9c59c41b366 Author: Yi Zhang <[email protected]> AuthorDate: Thu Mar 5 19:48:45 2026 +0800 [FLINK-38975][runtime] Add ApplicationResultStore --- .../application_result_store_configuration.html | 24 ++ .../common_high_availability_ars_section.html | 24 ++ .../common_high_availability_jrs_section.html | 6 - .../generated/job_result_store_configuration.html | 6 - .../flink/annotation/docs/Documentation.java | 2 + .../HAApplicationRunOnMinioS3StoreITCase.java | 150 +++++++++ ...HAApplicationRunOnHadoopS3FileSystemITCase.java | 24 ++ ...HAApplicationRunOnPrestoS3FileSystemITCase.java | 24 ++ .../AbstractThreadsafeApplicationResultStore.java | 161 ++++++++++ .../highavailability/ApplicationResult.java | 151 +++++++++ .../highavailability/ApplicationResultEntry.java | 43 +++ .../highavailability/ApplicationResultStore.java | 112 +++++++ ...ons.java => ApplicationResultStoreOptions.java} | 25 +- .../EmbeddedApplicationResultStore.java | 93 ++++++ .../FileSystemApplicationResultStore.java | 269 ++++++++++++++++ .../highavailability/JobResultStoreOptions.java | 6 +- .../flink/runtime/minicluster/MiniCluster.java | 15 + .../json/ApplicationResultDeserializer.java | 128 ++++++++ .../messages/json/ApplicationResultSerializer.java | 77 +++++ .../AbstractHAApplicationRunITCase.java | 115 +++++++ .../ApplicationResultStoreContractTest.java | 211 +++++++++++++ ...leSystemApplicationResultStoreContractTest.java | 43 +++ ...emApplicationResultStoreFileOperationsTest.java | 346 +++++++++++++++++++++ ...EmbeddedApplicationResultStoreContractTest.java | 37 +++ .../testutils/TestingApplicationResultStore.java | 198 ++++++++++++ 25 files changed, 2265 insertions(+), 25 deletions(-) diff --git a/docs/layouts/shortcodes/generated/application_result_store_configuration.html b/docs/layouts/shortcodes/generated/application_result_store_configuration.html new file mode 100644 index 00000000000..f910e569bde --- /dev/null +++ b/docs/layouts/shortcodes/generated/application_result_store_configuration.html @@ -0,0 +1,24 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>application-result-store.delete-on-commit</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Determines whether application results should be automatically removed from the underlying application result store when the corresponding entity transitions into a clean state. If false, the cleaned application results are, instead, marked as clean to indicate their state. In this case, Flink no longer has ownership and the resources need to be cleaned up by the user.</td> + </tr> + <tr> + <td><h5>application-result-store.storage-path</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Defines where application results should be stored. This should be an underlying file-system that provides read-after-write consistency. By default, this is <code class="highlighter-rouge">{high-availability.storageDir}/application-result-store/{high-availability.cluster-id}</code>.</td> + </tr> + </tbody> +</table> diff --git a/docs/layouts/shortcodes/generated/common_high_availability_ars_section.html b/docs/layouts/shortcodes/generated/common_high_availability_ars_section.html new file mode 100644 index 00000000000..f910e569bde --- /dev/null +++ b/docs/layouts/shortcodes/generated/common_high_availability_ars_section.html @@ -0,0 +1,24 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>application-result-store.delete-on-commit</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Determines whether application results should be automatically removed from the underlying application result store when the corresponding entity transitions into a clean state. If false, the cleaned application results are, instead, marked as clean to indicate their state. In this case, Flink no longer has ownership and the resources need to be cleaned up by the user.</td> + </tr> + <tr> + <td><h5>application-result-store.storage-path</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Defines where application results should be stored. This should be an underlying file-system that provides read-after-write consistency. By default, this is <code class="highlighter-rouge">{high-availability.storageDir}/application-result-store/{high-availability.cluster-id}</code>.</td> + </tr> + </tbody> +</table> diff --git a/docs/layouts/shortcodes/generated/common_high_availability_jrs_section.html b/docs/layouts/shortcodes/generated/common_high_availability_jrs_section.html index e6d58ee3ffb..1d677d17de1 100644 --- a/docs/layouts/shortcodes/generated/common_high_availability_jrs_section.html +++ b/docs/layouts/shortcodes/generated/common_high_availability_jrs_section.html @@ -8,12 +8,6 @@ </tr> </thead> <tbody> - <tr> - <td><h5>job-result-store.delete-on-commit</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>Determines whether job results should be automatically removed from the underlying job result store when the corresponding entity transitions into a clean state. If false, the cleaned job results are, instead, marked as clean to indicate their state. In this case, Flink no longer has ownership and the resources need to be cleaned up by the user.</td> - </tr> <tr> <td><h5>job-result-store.storage-path</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/docs/layouts/shortcodes/generated/job_result_store_configuration.html b/docs/layouts/shortcodes/generated/job_result_store_configuration.html index e6d58ee3ffb..1d677d17de1 100644 --- a/docs/layouts/shortcodes/generated/job_result_store_configuration.html +++ b/docs/layouts/shortcodes/generated/job_result_store_configuration.html @@ -8,12 +8,6 @@ </tr> </thead> <tbody> - <tr> - <td><h5>job-result-store.delete-on-commit</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>Determines whether job results should be automatically removed from the underlying job result store when the corresponding entity transitions into a clean state. If false, the cleaned job results are, instead, marked as clean to indicate their state. In this case, Flink no longer has ownership and the resources need to be cleaned up by the user.</td> - </tr> <tr> <td><h5>job-result-store.storage-path</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java index ef44de3411b..8a3c81458e1 100644 --- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java @@ -68,6 +68,8 @@ public final class Documentation { "common_high_availability_zk"; public static final String COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE = "common_high_availability_jrs"; + public static final String COMMON_HIGH_AVAILABILITY_APPLICATION_RESULT_STORE = + "common_high_availability_ars"; public static final String COMMON_MEMORY = "common_memory"; public static final String COMMON_MISCELLANEOUS = "common_miscellaneous"; diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnMinioS3StoreITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnMinioS3StoreITCase.java new file mode 100644 index 00000000000..b80a952fb2e --- /dev/null +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnMinioS3StoreITCase.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3.common; + +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.TestContainerExtension; +import org.apache.flink.runtime.highavailability.AbstractHAApplicationRunITCase; +import org.apache.flink.runtime.highavailability.ApplicationResultStoreOptions; +import org.apache.flink.runtime.highavailability.FileSystemApplicationResultStore; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables; + +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.List; + +import static org.apache.flink.shaded.guava33.com.google.common.base.Predicates.not; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code ApplicationRunOnMinioS3StoreITCase} covers an application run where the HA data is stored + * in Minio. The implementation verifies whether the {@code JobResult} was written into the + * FileSystem-backed {@code ApplicationResultStore}. + */ +public abstract class HAApplicationRunOnMinioS3StoreITCase extends AbstractHAApplicationRunITCase { + + private static final String CLUSTER_ID = "test-cluster"; + private static final String APPLICATION_RESULT_STORE_FOLDER = "ars"; + + @RegisterExtension + @Order(2) + private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>> + MINIO_EXTENSION = + new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new)); + + @RegisterExtension + @Order(3) + private static final MiniClusterExtension miniClusterExtension = + new MiniClusterExtension( + () -> { + final Configuration configuration = createConfiguration(); + FileSystem.initialize(configuration, null); + return new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration) + .build(); + }); + + private static MinioTestContainer getMinioContainer() { + return MINIO_EXTENSION.getCustomExtension().getTestContainer(); + } + + private static String createS3URIWithSubPath(String... subfolders) { + return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); + } + + private static List<S3ObjectSummary> getObjectsFromApplicationResultStore() { + return getMinioContainer() + .getClient() + .listObjects( + getMinioContainer().getDefaultBucketName(), + createSubPath(CLUSTER_ID, APPLICATION_RESULT_STORE_FOLDER)) + .getObjectSummaries(); + } + + private static String createSubPath(String... subfolders) { + final String pathSeparator = "/"; + return pathSeparator + StringUtils.join(subfolders, pathSeparator); + } + + private static Configuration createConfiguration() { + final Configuration config = new Configuration(); + + getMinioContainer().setS3ConfigOptions(config); + + // ApplicationResultStore configuration + config.set(ApplicationResultStoreOptions.DELETE_ON_COMMIT, Boolean.FALSE); + config.set( + ApplicationResultStoreOptions.STORAGE_PATH, + createS3URIWithSubPath(CLUSTER_ID, APPLICATION_RESULT_STORE_FOLDER)); + + return addHaConfiguration(config, createS3URIWithSubPath(CLUSTER_ID)); + } + + @AfterAll + static void unsetFileSystem() { + FileSystem.initialize(new Configuration(), null); + } + + @Override + protected void runAfterApplicationTermination() throws Exception { + CommonTestUtils.waitUntilCondition( + () -> { + final List<S3ObjectSummary> objects = getObjectsFromApplicationResultStore(); + return objects.stream() + .map(S3ObjectSummary::getKey) + .anyMatch( + FileSystemApplicationResultStore + ::hasValidApplicationResultStoreEntryExtension) + && objects.stream() + .map(S3ObjectSummary::getKey) + .noneMatch( + FileSystemApplicationResultStore + ::hasValidDirtyApplicationResultStoreEntryExtension); + }, + 2000L); + + final S3ObjectSummary objRef = + Iterables.getOnlyElement(getObjectsFromApplicationResultStore()); + assertThat(objRef.getKey()) + .matches( + FileSystemApplicationResultStore + ::hasValidApplicationResultStoreEntryExtension) + .matches( + not( + FileSystemApplicationResultStore + ::hasValidDirtyApplicationResultStoreEntryExtension)); + + final String objContent = + getMinioContainer() + .getClient() + .getObjectAsString(objRef.getBucketName(), objRef.getKey()); + assertThat(objContent).contains(ApplicationState.FINISHED.name()); + } +} diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAApplicationRunOnHadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAApplicationRunOnHadoopS3FileSystemITCase.java new file mode 100644 index 00000000000..cd4bb089ff2 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAApplicationRunOnHadoopS3FileSystemITCase.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3hadoop; + +import org.apache.flink.fs.s3.common.HAApplicationRunOnMinioS3StoreITCase; + +/** Runs the {@link HAApplicationRunOnMinioS3StoreITCase} on the Hadoop S3 file system. */ +class HAApplicationRunOnHadoopS3FileSystemITCase extends HAApplicationRunOnMinioS3StoreITCase {} diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/HAApplicationRunOnPrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/HAApplicationRunOnPrestoS3FileSystemITCase.java new file mode 100644 index 00000000000..444c6c6a4d3 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/HAApplicationRunOnPrestoS3FileSystemITCase.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3presto; + +import org.apache.flink.fs.s3.common.HAApplicationRunOnMinioS3StoreITCase; + +/** Runs the {@link HAApplicationRunOnMinioS3StoreITCase} on the Presto S3 file system. */ +class HAApplicationRunOnPrestoS3FileSystemITCase extends HAApplicationRunOnMinioS3StoreITCase {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java new file mode 100644 index 00000000000..5fbdacff807 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** An abstract class for threadsafe implementations of the {@link ApplicationResultStore}. */ +public abstract class AbstractThreadsafeApplicationResultStore implements ApplicationResultStore { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractThreadsafeApplicationResultStore.class); + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + private final Executor ioExecutor; + + protected AbstractThreadsafeApplicationResultStore(Executor ioExecutor) { + this.ioExecutor = ioExecutor; + } + + @Override + public CompletableFuture<Void> createDirtyResultAsync( + ApplicationResultEntry applicationResultEntry) { + return hasApplicationResultEntryAsync(applicationResultEntry.getApplicationId()) + .thenAccept( + hasApplicationResultEntry -> + Preconditions.checkState( + !hasApplicationResultEntry, + "Application result store already contains an entry for application %s", + applicationResultEntry.getApplicationId())) + .thenCompose( + ignoredVoid -> + withWriteLockAsync( + () -> createDirtyResultInternal(applicationResultEntry))); + } + + @GuardedBy("readWriteLock") + protected abstract void createDirtyResultInternal(ApplicationResultEntry applicationResultEntry) + throws IOException; + + @Override + public CompletableFuture<Void> markResultAsCleanAsync(ApplicationID applicationId) { + return hasCleanApplicationResultEntryAsync(applicationId) + .thenCompose( + hasCleanApplicationResultEntry -> { + if (hasCleanApplicationResultEntry) { + LOG.debug( + "The application {} is already marked as clean. No action required.", + applicationId); + return FutureUtils.completedVoidFuture(); + } + + return withWriteLockAsync( + () -> markResultAsCleanInternal(applicationId)); + }); + } + + @GuardedBy("readWriteLock") + protected abstract void markResultAsCleanInternal(ApplicationID applicationId) + throws IOException, NoSuchElementException; + + @Override + public CompletableFuture<Boolean> hasApplicationResultEntryAsync(ApplicationID applicationId) { + return withReadLockAsync( + () -> + hasDirtyApplicationResultEntryInternal(applicationId) + || hasCleanApplicationResultEntryInternal(applicationId)); + } + + @Override + public CompletableFuture<Boolean> hasDirtyApplicationResultEntryAsync( + ApplicationID applicationId) { + return withReadLockAsync(() -> hasDirtyApplicationResultEntryInternal(applicationId)); + } + + @GuardedBy("readWriteLock") + protected abstract boolean hasDirtyApplicationResultEntryInternal(ApplicationID applicationId) + throws IOException; + + @Override + public CompletableFuture<Boolean> hasCleanApplicationResultEntryAsync( + ApplicationID applicationId) { + return withReadLockAsync(() -> hasCleanApplicationResultEntryInternal(applicationId)); + } + + @GuardedBy("readWriteLock") + protected abstract boolean hasCleanApplicationResultEntryInternal(ApplicationID applicationId) + throws IOException; + + @Override + public Set<ApplicationResult> getDirtyResults() throws IOException { + return withReadLock(this::getDirtyResultsInternal); + } + + @GuardedBy("readWriteLock") + protected abstract Set<ApplicationResult> getDirtyResultsInternal() throws IOException; + + private CompletableFuture<Void> withWriteLockAsync(ThrowingRunnable<IOException> runnable) { + return FutureUtils.runAsync( + () -> { + withWriteLock(runnable); + }, + ioExecutor); + } + + private void withWriteLock(ThrowingRunnable<IOException> runnable) throws IOException { + readWriteLock.writeLock().lock(); + try { + runnable.run(); + } finally { + readWriteLock.writeLock().unlock(); + } + } + + private <T> CompletableFuture<T> withReadLockAsync( + SupplierWithException<T, IOException> runnable) { + return FutureUtils.supplyAsync(() -> withReadLock(runnable), ioExecutor); + } + + private <T> T withReadLock(SupplierWithException<T, IOException> supplier) throws IOException { + readWriteLock.readLock().lock(); + try { + return supplier.get(); + } finally { + readWriteLock.readLock().unlock(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResult.java new file mode 100644 index 00000000000..273cddf5f6f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResult.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * The result of an application execution. This class collects information about a globally + * terminated application. + */ +public class ApplicationResult implements Serializable { + + private static final long serialVersionUID = 1L; + + private final ApplicationID applicationId; + private final ApplicationState applicationState; + private final String applicationName; + private final long startTime; + private final long endTime; + + private ApplicationResult( + ApplicationID applicationId, + ApplicationState applicationState, + String applicationName, + long startTime, + long endTime) { + this.applicationId = Preconditions.checkNotNull(applicationId); + this.applicationState = Preconditions.checkNotNull(applicationState); + this.applicationName = Preconditions.checkNotNull(applicationName); + this.startTime = startTime; + this.endTime = endTime; + } + + public ApplicationID getApplicationId() { + return applicationId; + } + + public ApplicationState getApplicationState() { + return applicationState; + } + + public String getApplicationName() { + return applicationName; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + /** + * Creates the {@link ApplicationResult} from the given {@link ArchivedApplication} which must + * be in a globally terminal state. + * + * @param archivedApplication to create the ApplicationResult from + * @return ApplicationResult of the given ArchivedApplication + */ + public static ApplicationResult createFrom(ArchivedApplication archivedApplication) { + final ApplicationID applicationId = archivedApplication.getApplicationId(); + final ApplicationState applicationState = archivedApplication.getApplicationStatus(); + + Preconditions.checkArgument( + applicationState.isTerminalState(), + "The application " + + archivedApplication.getApplicationName() + + '(' + + applicationId + + ") is not in a terminal state. It is in state " + + applicationState + + '.'); + + final ApplicationResult.Builder builder = new ApplicationResult.Builder(); + builder.applicationId(applicationId); + builder.applicationState(applicationState); + builder.applicationName(archivedApplication.getApplicationName()); + + final long startTime = archivedApplication.getStatusTimestamp(ApplicationState.CREATED); + final long endTime = archivedApplication.getStatusTimestamp(applicationState); + builder.startTime(startTime).endTime(endTime); + + return builder.build(); + } + + /** Builder for {@link ApplicationResult}. */ + public static class Builder { + + private ApplicationID applicationId; + + private ApplicationState applicationState; + + private String applicationName = "unknown"; + + private long startTime = -1; + + private long endTime = -1; + + public Builder applicationId(final ApplicationID applicationId) { + this.applicationId = applicationId; + return this; + } + + public Builder applicationState(final ApplicationState applicationState) { + this.applicationState = applicationState; + return this; + } + + public Builder applicationName(final String applicationName) { + this.applicationName = applicationName; + return this; + } + + public Builder startTime(final long startTime) { + this.startTime = startTime; + return this; + } + + public Builder endTime(final long endTime) { + this.endTime = endTime; + return this; + } + + public ApplicationResult build() { + return new ApplicationResult( + applicationId, applicationState, applicationName, startTime, endTime); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultEntry.java new file mode 100644 index 00000000000..ed0db1562f2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultEntry.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.util.Preconditions; + +/** + * {@code ApplicationEntry} is the entity managed by the {@link ApplicationResultStore}. It collects + * information about a globally terminated application (e.g. {@link ApplicationResult}). + */ +public class ApplicationResultEntry { + + private final ApplicationResult applicationResult; + + public ApplicationResultEntry(ApplicationResult applicationResult) { + this.applicationResult = Preconditions.checkNotNull(applicationResult); + } + + public ApplicationResult getApplicationResult() { + return applicationResult; + } + + public ApplicationID getApplicationId() { + return applicationResult.getApplicationId(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java new file mode 100644 index 00000000000..5606e8e8169 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStore.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * A storage for the results of globally terminated applications. These results can have the + * following states: + * + * <ul> + * <li>{@code dirty} - indicating that the corresponding application is not properly cleaned up, + * yet. + * <li>{@code clean} - indicating that the cleanup of the corresponding application is performed + * and no further actions need to be applied. + * </ul> + */ +public interface ApplicationResultStore { + + /** + * Registers the passed {@link ApplicationResultEntry} instance as {@code dirty} which indicates + * that clean-up operations still need to be performed. Once the application resource cleanup + * has been finalized, we can mark the {@code ApplicationEntry} as {@code clean} result using + * {@link #markResultAsCleanAsync(ApplicationID)}. + * + * @param applicationResultEntry The application result we wish to persist. + * @return a successfully completed future if the dirty result is created successfully. The + * future will be completed with {@link IllegalStateException} if the passed {@code + * applicationEntry} has an {@code ApplicationID} attached that is already registered in + * this {@code ApplicationResultStore}. + */ + CompletableFuture<Void> createDirtyResultAsync(ApplicationResultEntry applicationResultEntry); + + /** + * Marks an existing {@link ApplicationResultEntry} as {@code clean}. This indicates that no + * more resource cleanup steps need to be performed. No actions should be triggered if the + * passed {@code ApplicationID} belongs to an application that was already marked as clean. + * + * @param applicationId Ident of the application we wish to mark as clean. + * @return a successfully completed future if the result is marked successfully. The future can + * complete exceptionally with a {@link NoSuchElementException}. i.e. there is no + * corresponding {@code dirty} application present in the store for the given {@code + * ApplicationID}. + */ + CompletableFuture<Void> markResultAsCleanAsync(ApplicationID applicationId); + + /** + * Returns the future of whether the store already contains an entry for an application. + * + * @param applicationId Ident of the application we wish to check the store for. + * @return a successfully completed future with {@code true} if a {@code dirty} or {@code clean} + * {@link ApplicationResultEntry} exists for the given {@code ApplicationID}; otherwise + * {@code false}. + */ + default CompletableFuture<Boolean> hasApplicationResultEntryAsync(ApplicationID applicationId) { + return hasDirtyApplicationResultEntryAsync(applicationId) + .thenCombine( + hasCleanApplicationResultEntryAsync(applicationId), + (result1, result2) -> result1 || result2); + } + + /** + * Returns the future of whether the store contains a {@code dirty} entry for the given {@code + * ApplicationID}. + * + * @param applicationId Ident of the application we wish to check the store for. + * @return a successfully completed future with {@code true}, if a {@code dirty} entry exists + * for the given {@code ApplicationID}; otherwise {@code false}. + */ + CompletableFuture<Boolean> hasDirtyApplicationResultEntryAsync(ApplicationID applicationId); + + /** + * Returns the future of whether the store contains a {@code clean} entry for the given {@code + * ApplicationID}. + * + * @param applicationId Ident of the application we wish to check the store for. + * @return a successfully completed future with {@code true}, if a {@code clean} entry exists + * for the given {@code ApplicationID}; otherwise a successfully completed future with + * {@code false}. + */ + CompletableFuture<Boolean> hasCleanApplicationResultEntryAsync(ApplicationID applicationId); + + /** + * Get the persisted {@link ApplicationResult} instances that are marked as {@code dirty}. This + * is useful for recovery of finalization steps. + * + * @return A set of dirty {@code ApplicationResults} from the store. + * @throws IOException if collecting the set of dirty results failed for IO reasons. + */ + Set<ApplicationResult> getDirtyResults() throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreOptions.java similarity index 80% copy from flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreOptions.java index c8341df1116..d992fd6bc5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreOptions.java @@ -26,23 +26,24 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; -/** The set of configuration options relating to the Job Result Store. */ +/** The set of configuration options relating to the Application Result Store. */ @PublicEvolving -public class JobResultStoreOptions { - @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE) +public class ApplicationResultStoreOptions { + + @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_APPLICATION_RESULT_STORE) public static final ConfigOption<String> STORAGE_PATH = - ConfigOptions.key("job-result-store.storage-path") + ConfigOptions.key("application-result-store.storage-path") .stringType() .noDefaultValue() .withDescription( Description.builder() .text( - "Defines where job results should be stored. This should be an " + "Defines where application results should be stored. This should be an " + "underlying file-system that provides read-after-write consistency. By " + "default, this is %s.", TextElement.code( - FileSystemJobResultStore - .createDefaultJobResultStorePath( + FileSystemApplicationResultStore + .createDefaultApplicationResultStorePath( String.format( "{%s}", HighAvailabilityOptions @@ -55,15 +56,15 @@ public class JobResultStoreOptions { .key())))) .build()); - @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE) + @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_APPLICATION_RESULT_STORE) public static final ConfigOption<Boolean> DELETE_ON_COMMIT = - ConfigOptions.key("job-result-store.delete-on-commit") + ConfigOptions.key("application-result-store.delete-on-commit") .booleanType() .defaultValue(Boolean.TRUE) .withDescription( - "Determines whether job results should be automatically removed " - + "from the underlying job result store when the corresponding entity " - + "transitions into a clean state. If false, the cleaned job results " + "Determines whether application results should be automatically removed " + + "from the underlying application result store when the corresponding entity " + + "transitions into a clean state. If false, the cleaned application results " + "are, instead, marked as clean to indicate their state. In this " + "case, Flink no longer has ownership and the resources need to " + "be cleaned up by the user."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java new file mode 100644 index 00000000000..af987d6dba3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedApplicationResultStore.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.util.concurrent.Executors; + +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** An embedded implementation of {@link ApplicationResultStore} for testing purposes. */ +public class EmbeddedApplicationResultStore extends AbstractThreadsafeApplicationResultStore { + + private final Map<ApplicationID, ApplicationResultEntry> dirtyResults = + new ConcurrentHashMap<>(); + private final Map<ApplicationID, ApplicationResultEntry> cleanResults = + new ConcurrentHashMap<>(); + + public EmbeddedApplicationResultStore() { + super(Executors.directExecutor()); + } + + @Override + protected void createDirtyResultInternal(ApplicationResultEntry applicationResultEntry) { + dirtyResults.put(applicationResultEntry.getApplicationId(), applicationResultEntry); + } + + @Override + protected void markResultAsCleanInternal(ApplicationID applicationId) + throws NoSuchElementException { + final ApplicationResultEntry entry = dirtyResults.remove(applicationId); + if (entry != null) { + cleanResults.put(applicationId, entry); + } else { + throw new NoSuchElementException( + String.format( + "Could not mark application %s as clean as it is not present in the application result store.", + applicationId)); + } + } + + @Override + protected boolean hasDirtyApplicationResultEntryInternal(ApplicationID applicationId) { + return dirtyResults.containsKey(applicationId); + } + + @Override + protected boolean hasCleanApplicationResultEntryInternal(ApplicationID applicationId) { + return cleanResults.containsKey(applicationId); + } + + @Override + protected Set<ApplicationResult> getDirtyResultsInternal() { + return dirtyResults.values().stream() + .map(ApplicationResultEntry::getApplicationResult) + .collect(Collectors.toSet()); + } + + /** Clears all stored results. */ + public void clear() { + dirtyResults.clear(); + cleanResults.clear(); + } + + /** Gets the number of dirty results. */ + public int getDirtyResultCount() { + return dirtyResults.size(); + } + + /** Gets the number of clean results. */ + public int getCleanResultCount() { + return cleanResults.size(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java new file mode 100644 index 00000000000..fd938fede9b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStore.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.rest.messages.json.ApplicationResultDeserializer; +import org.apache.flink.runtime.rest.messages.json.ApplicationResultSerializer; +import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** + * An implementation of the {@link ApplicationResultStore} which persists application result data to + * an underlying distributed filesystem. + */ +public class FileSystemApplicationResultStore extends AbstractThreadsafeApplicationResultStore { + + private static final Logger LOG = + LoggerFactory.getLogger(FileSystemApplicationResultStore.class); + + @VisibleForTesting static final String FILE_EXTENSION = ".json"; + @VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" + FILE_EXTENSION; + + @VisibleForTesting + public static boolean hasValidDirtyApplicationResultStoreEntryExtension(String filename) { + return filename.endsWith(DIRTY_FILE_EXTENSION); + } + + @VisibleForTesting + public static boolean hasValidApplicationResultStoreEntryExtension(String filename) { + return filename.endsWith(FILE_EXTENSION); + } + + private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); + + private final FileSystem fileSystem; + + private volatile boolean basePathCreated; + + private final Path basePath; + + private final boolean deleteOnCommit; + + @VisibleForTesting + FileSystemApplicationResultStore( + FileSystem fileSystem, Path basePath, boolean deleteOnCommit, Executor ioExecutor) { + super(ioExecutor); + this.fileSystem = fileSystem; + this.basePath = basePath; + this.deleteOnCommit = deleteOnCommit; + } + + public static FileSystemApplicationResultStore fromConfiguration( + Configuration config, Executor ioExecutor) throws IOException { + Preconditions.checkNotNull(config); + final String arsStoragePath = config.get(ApplicationResultStoreOptions.STORAGE_PATH); + final Path basePath; + + if (isNullOrWhitespaceOnly(arsStoragePath)) { + final String haStoragePath = config.get(HighAvailabilityOptions.HA_STORAGE_PATH); + final String haClusterId = config.get(HighAvailabilityOptions.HA_CLUSTER_ID); + basePath = + new Path(createDefaultApplicationResultStorePath(haStoragePath, haClusterId)); + } else { + basePath = new Path(arsStoragePath); + } + + boolean deleteOnCommit = config.get(ApplicationResultStoreOptions.DELETE_ON_COMMIT); + + return new FileSystemApplicationResultStore( + basePath.getFileSystem(), basePath, deleteOnCommit, ioExecutor); + } + + private void createBasePathIfNeeded() throws IOException { + if (!basePathCreated) { + LOG.info( + "Creating highly available application result storage directory at {}", + basePath); + fileSystem.mkdirs(basePath); + LOG.info( + "Created highly available application result storage directory at {}", + basePath); + basePathCreated = true; + } + } + + public static String createDefaultApplicationResultStorePath(String baseDir, String clusterId) { + return baseDir + "/application-result-store/" + clusterId; + } + + /** + * Given an application ID, construct the path for a dirty entry corresponding to it in the + * application result store. + * + * @param applicationId The application ID to construct a dirty entry path from. + * @return A path for a dirty entry for the given the Application ID. + */ + private Path constructDirtyPath(ApplicationID applicationId) { + return constructEntryPath(applicationId.toString() + DIRTY_FILE_EXTENSION); + } + + /** + * Given an application ID, construct the path for a clean entry corresponding to it in the + * application result store. + * + * @param applicationId The application ID to construct a clean entry path from. + * @return A path for a clean entry for the given the Application ID. + */ + private Path constructCleanPath(ApplicationID applicationId) { + return constructEntryPath(applicationId.toString() + FILE_EXTENSION); + } + + @VisibleForTesting + Path constructEntryPath(String fileName) { + return new Path(this.basePath, fileName); + } + + @Override + public void createDirtyResultInternal(ApplicationResultEntry applicationResultEntry) + throws IOException { + createBasePathIfNeeded(); + + final Path path = constructDirtyPath(applicationResultEntry.getApplicationId()); + try (OutputStream os = fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE)) { + mapper.writeValue( + // working around the internally used _writeAndClose method to ensure that close + // is only called once + new NonClosingOutputStreamDecorator(os), + new JsonApplicationResultEntry(applicationResultEntry.getApplicationResult())); + } + } + + @Override + public void markResultAsCleanInternal(ApplicationID applicationId) + throws IOException, NoSuchElementException { + Path dirtyPath = constructDirtyPath(applicationId); + + if (!fileSystem.exists(dirtyPath)) { + throw new NoSuchElementException( + String.format( + "Could not mark application %s as clean as it is not present in the application result store.", + applicationId)); + } + + if (deleteOnCommit) { + fileSystem.delete(dirtyPath, false); + } else { + fileSystem.rename(dirtyPath, constructCleanPath(applicationId)); + } + } + + @Override + public boolean hasDirtyApplicationResultEntryInternal(ApplicationID applicationId) + throws IOException { + return fileSystem.exists(constructDirtyPath(applicationId)); + } + + @Override + public boolean hasCleanApplicationResultEntryInternal(ApplicationID applicationId) + throws IOException { + return fileSystem.exists(constructCleanPath(applicationId)); + } + + @Override + public Set<ApplicationResult> getDirtyResultsInternal() throws IOException { + createBasePathIfNeeded(); + + final FileStatus[] statuses = fileSystem.listStatus(this.basePath); + + Preconditions.checkState( + statuses != null, + "The base directory of the ApplicationResultStore isn't accessible. No dirty ApplicationResults can be restored."); + + final Set<ApplicationResult> dirtyResults = new HashSet<>(); + for (FileStatus s : statuses) { + if (!s.isDir()) { + if (hasValidDirtyApplicationResultStoreEntryExtension(s.getPath().getName())) { + JsonApplicationResultEntry jre = + mapper.readValue( + fileSystem.open(s.getPath()), JsonApplicationResultEntry.class); + ApplicationResult applicationResult = jre.getApplicationResult(); + if (applicationResult != null) { + dirtyResults.add(applicationResult); + } + } + } + } + return dirtyResults; + } + + /** + * Wrapper class around {@link ApplicationResultEntry} to allow for serialization of a schema + * version, so that future schema changes can be handled in a backwards compatible manner. + */ + @JsonIgnoreProperties( + value = {JsonApplicationResultEntry.FIELD_NAME_VERSION}, + allowGetters = true) + @VisibleForTesting + static class JsonApplicationResultEntry extends ApplicationResultEntry { + private static final String FIELD_NAME_RESULT = "result"; + static final String FIELD_NAME_VERSION = "version"; + + @JsonCreator + private JsonApplicationResultEntry( + @JsonProperty(FIELD_NAME_RESULT) ApplicationResult applicationResult) { + super(applicationResult); + } + + @Override + @JsonProperty(FIELD_NAME_RESULT) + @JsonSerialize(using = ApplicationResultSerializer.class) + @JsonDeserialize(using = ApplicationResultDeserializer.class) + public ApplicationResult getApplicationResult() { + return super.getApplicationResult(); + } + + @JsonIgnore + @Override + public ApplicationID getApplicationId() { + return super.getApplicationId(); + } + + public int getVersion() { + return 1; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java index c8341df1116..c9493b0abc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java @@ -55,7 +55,11 @@ public class JobResultStoreOptions { .key())))) .build()); - @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE) + /** + * @deprecated Use {@link ApplicationResultStoreOptions#DELETE_ON_COMMIT} + */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption<Boolean> DELETE_ON_COMMIT = ConfigOptions.key("job-result-store.delete-on-commit") .booleanType() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 7d2defb29fb..0b834ca5032 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.minicluster; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; @@ -35,6 +37,7 @@ import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.RecoveryClaimMode; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.application.AbstractApplication; +import org.apache.flink.runtime.application.ArchivedApplication; import org.apache.flink.runtime.application.SingleJobApplication; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.BlobClient; @@ -1182,6 +1185,18 @@ public class MiniCluster implements AutoCloseableAsync { .thenCompose(Function.identity()); } + // ------------------------------------------------------------------------ + // Accessing applications + // ------------------------------------------------------------------------ + + public CompletableFuture<ApplicationState> getApplicationStatus(ApplicationID applicationId) { + return runDispatcherCommand( + dispatcherGateway -> + dispatcherGateway + .requestApplication(applicationId, rpcTimeout) + .thenApply(ArchivedApplication::getApplicationStatus)); + } + // ------------------------------------------------------------------------ // factories - can be overridden by subclasses to alter behavior // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultDeserializer.java new file mode 100644 index 00000000000..223a83b54c8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultDeserializer.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.runtime.highavailability.ApplicationResult; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link ApplicationResult}. + * + * @see ApplicationResultSerializer + */ +public class ApplicationResultDeserializer extends StdDeserializer<ApplicationResult> { + + private static final long serialVersionUID = 1L; + + private final ApplicationIDDeserializer applicationIdDeserializer = + new ApplicationIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + public ApplicationResultDeserializer() { + super(ApplicationResult.class); + } + + @Override + public ApplicationResult deserialize(final JsonParser p, final DeserializationContext ctxt) + throws IOException { + ApplicationID applicationId = null; + ApplicationState applicationState = null; + String applicationName = "unknown"; + long startTime = -1; + long endTime = -1; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case ApplicationResultSerializer.FIELD_NAME_APPLICATION_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + applicationId = applicationIdDeserializer.deserialize(p, ctxt); + break; + case ApplicationResultSerializer.FIELD_NAME_APPLICATION_STATE: + assertNextToken(p, JsonToken.VALUE_STRING); + applicationState = ApplicationState.valueOf(p.getValueAsString()); + break; + case ApplicationResultSerializer.FIELD_NAME_APPLICATION_NAME: + assertNextToken(p, JsonToken.VALUE_STRING); + applicationName = p.getValueAsString(); + break; + case ApplicationResultSerializer.FIELD_NAME_START_TIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + startTime = p.getLongValue(); + break; + case ApplicationResultSerializer.FIELD_NAME_END_TIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + endTime = p.getLongValue(); + break; + default: + // ignore unknown fields + } + } + + try { + return new ApplicationResult.Builder() + .applicationId(applicationId) + .applicationState(applicationState) + .applicationName(applicationName) + .startTime(startTime) + .endTime(endTime) + .build(); + } catch (final RuntimeException e) { + throw new JsonMappingException( + null, "Could not deserialize " + ApplicationResult.class.getSimpleName(), e); + } + } + + /** Asserts that the provided JsonToken is not null, i.e., not at the end of the input. */ + private static void assertNotEndOfInput( + final JsonParser p, @Nullable final JsonToken jsonToken) { + checkState(jsonToken != null, "Unexpected end of input at %s", p.getCurrentLocation()); + } + + /** Advances the token and asserts that it matches the required {@link JsonToken}. */ + private static void assertNextToken(final JsonParser p, final JsonToken requiredJsonToken) + throws IOException { + final JsonToken jsonToken = p.nextToken(); + if (jsonToken != requiredJsonToken) { + throw new JsonMappingException( + p, String.format("Expected token %s (was %s)", requiredJsonToken, jsonToken)); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultSerializer.java new file mode 100644 index 00000000000..dc73a18f491 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultSerializer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.runtime.highavailability.ApplicationResult; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +/** + * JSON serializer for {@link ApplicationResult}. + * + * @see ApplicationResultDeserializer + */ +public class ApplicationResultSerializer extends StdSerializer<ApplicationResult> { + + private static final long serialVersionUID = 1L; + + static final String FIELD_NAME_APPLICATION_ID = "application-id"; + + static final String FIELD_NAME_APPLICATION_STATE = "application-state"; + + static final String FIELD_NAME_APPLICATION_NAME = "application-name"; + + static final String FIELD_NAME_START_TIME = "start-time"; + + static final String FIELD_NAME_END_TIME = "end-time"; + + private final ApplicationIDSerializer applicationIdSerializer = new ApplicationIDSerializer(); + + public ApplicationResultSerializer() { + super(ApplicationResult.class); + } + + @Override + public void serialize( + final ApplicationResult result, + final JsonGenerator gen, + final SerializerProvider provider) + throws IOException { + + gen.writeStartObject(); + + gen.writeFieldName(FIELD_NAME_APPLICATION_ID); + applicationIdSerializer.serialize(result.getApplicationId(), gen, provider); + + gen.writeFieldName(FIELD_NAME_APPLICATION_STATE); + gen.writeString(result.getApplicationState().name()); + + gen.writeFieldName(FIELD_NAME_APPLICATION_NAME); + gen.writeString(result.getApplicationName()); + + gen.writeNumberField(FIELD_NAME_START_TIME, result.getStartTime()); + gen.writeNumberField(FIELD_NAME_END_TIME, result.getEndTime()); + + gen.writeEndObject(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAApplicationRunITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAApplicationRunITCase.java new file mode 100644 index 00000000000..203a0dbfb56 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAApplicationRunITCase.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.runtime.application.SingleJobApplication; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.test.junit5.InjectMiniCluster; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractApplicationRunITCase} runs a single application in HA mode and provides {@code + * abstract} methods for initializing a specific {@link FileSystem}. + * + * <p>Sub-classes must use a {@link + * org.apache.flink.runtime.testutils.InternalMiniClusterExtension}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHAApplicationRunITCase { + + @RegisterExtension + static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @RegisterExtension + @Order(1) + private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + protected static Configuration addHaConfiguration( + final Configuration config, final String haStoragePath) { + config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.set( + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, + ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + config.set(HighAvailabilityOptions.HA_STORAGE_PATH, haStoragePath); + + // getFlinkConfiguration() is called on each new instantiation of the MiniCluster which is + // happening before each test run + FileSystem.initialize(config, null); + + return config; + } + + protected void runAfterApplicationTermination() throws Exception {} + + @Test + void testApplicationExecutionInHaMode(@InjectMiniCluster MiniCluster flinkCluster) + throws Exception { + final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); + final SingleJobApplication application = new SingleJobApplication(jobGraph); + final ApplicationID applicationId = application.getApplicationId(); + + // providing a timeout helps making the test fail in case some issue occurred while + // initializing the cluster + flinkCluster.submitApplication(application).get(30, TimeUnit.SECONDS); + + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); + final ApplicationState applicationStatus = + FutureUtils.retrySuccessfulWithDelay( + () -> flinkCluster.getApplicationStatus(applicationId), + Duration.ofMillis(10), + deadline, + status -> + flinkCluster.isRunning() + && status == ApplicationState.FINISHED, + new ScheduledExecutorServiceAdapter( + EXECUTOR_RESOURCE.getExecutor())) + .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + assertThat(applicationStatus).isEqualTo(ApplicationState.FINISHED); + + runAfterApplicationTermination(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java new file mode 100644 index 00000000000..803f69c7920 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ApplicationResultStoreContractTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.testutils.TestingApplicationResultStore; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** + * This interface defines a series of tests for any implementation of the {@link + * ApplicationResultStore} to determine whether they correctly implement the contracts defined by + * the interface. + */ +public interface ApplicationResultStoreContractTest { + + ApplicationResultEntry DUMMY_APPLICATION_RESULT_ENTRY = + new ApplicationResultEntry(TestingApplicationResultStore.DUMMY_APPLICATION_RESULT); + + ApplicationResultStore createApplicationResultStore() throws IOException; + + @Test + default void testStoreApplicationResultsWithDuplicateIDsThrowsException() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + final ApplicationResultEntry otherEntryWithDuplicateId = + new ApplicationResultEntry( + TestingApplicationResultStore.createSuccessfulApplicationResult( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId())); + assertThatThrownBy( + () -> + applicationResultStore + .createDirtyResultAsync(otherEntryWithDuplicateId) + .join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(IllegalStateException.class); + } + + @Test + default void testStoreDirtyEntryForAlreadyCleanedApplicationResultThrowsException() + throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThatThrownBy( + () -> + applicationResultStore + .createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY) + .join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(IllegalStateException.class); + } + + @Test + default void testCleaningDuplicateEntryThrowsNoException() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThatNoException() + .isThrownBy( + () -> + applicationResultStore + .markResultAsCleanAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()); + } + + @Test + default void testCleaningNonExistentEntryThrowsException() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + assertThatThrownBy( + () -> + applicationResultStore + .markResultAsCleanAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()) + .hasCauseInstanceOf(NoSuchElementException.class); + } + + @Test + default void testHasApplicationResultEntryWithDirtyEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + assertThat( + applicationResultStore + .hasDirtyApplicationResultEntryAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()) + .isTrue(); + assertThat( + applicationResultStore + .hasCleanApplicationResultEntryAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()) + .isFalse(); + assertThat( + applicationResultStore + .hasApplicationResultEntryAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()) + .isTrue(); + } + + @Test + default void testHasApplicationResultEntryWithCleanEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + assertThat( + applicationResultStore + .hasDirtyApplicationResultEntryAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()) + .isFalse(); + assertThat( + applicationResultStore + .hasCleanApplicationResultEntryAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()) + .isTrue(); + assertThat( + applicationResultStore + .hasApplicationResultEntryAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join()) + .isTrue(); + } + + @Test + default void testHasApplicationResultEntryWithEmptyStore() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + ApplicationID applicationId = new ApplicationID(); + assertThat(applicationResultStore.hasDirtyApplicationResultEntryAsync(applicationId).join()) + .isFalse(); + assertThat(applicationResultStore.hasCleanApplicationResultEntryAsync(applicationId).join()) + .isFalse(); + assertThat(applicationResultStore.hasApplicationResultEntryAsync(applicationId).join()) + .isFalse(); + } + + @Test + default void testGetDirtyResultsWithNoEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + assertThat(applicationResultStore.getDirtyResults()).isEmpty(); + } + + @Test + default void testGetDirtyResultsWithDirtyEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + assertThat( + applicationResultStore.getDirtyResults().stream() + .map(ApplicationResult::getApplicationId) + .collect(Collectors.toList())) + .singleElement() + .isEqualTo(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + } + + @Test + default void testGetDirtyResultsWithDirtyAndCleanEntry() throws IOException { + ApplicationResultStore applicationResultStore = createApplicationResultStore(); + applicationResultStore.createDirtyResultAsync(DUMMY_APPLICATION_RESULT_ENTRY).join(); + applicationResultStore + .markResultAsCleanAsync(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .join(); + + final ApplicationResultEntry otherDirtyApplicationResultEntry = + new ApplicationResultEntry( + TestingApplicationResultStore.createSuccessfulApplicationResult( + new ApplicationID())); + applicationResultStore.createDirtyResultAsync(otherDirtyApplicationResultEntry).join(); + + assertThat( + applicationResultStore.getDirtyResults().stream() + .map(ApplicationResult::getApplicationId) + .collect(Collectors.toList())) + .singleElement() + .isEqualTo(otherDirtyApplicationResultEntry.getApplicationId()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java new file mode 100644 index 00000000000..fb4206a0892 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreContractTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; + +/** + * Tests for the {@link FileSystemApplicationResultStore} implementation of the {@link + * ApplicationResultStore}'s contracts. + */ +public class FileSystemApplicationResultStoreContractTest + implements ApplicationResultStoreContractTest { + @TempDir File temporaryFolder; + + @Override + public ApplicationResultStore createApplicationResultStore() throws IOException { + Path path = new Path(temporaryFolder.toURI()); + return new FileSystemApplicationResultStore( + path.getFileSystem(), path, false, Executors.directExecutor()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreFileOperationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreFileOperationsTest.java new file mode 100644 index 00000000000..dda4673a7d4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemApplicationResultStoreFileOperationsTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.FlinkAssertions; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.highavailability.ApplicationResultStoreContractTest.DUMMY_APPLICATION_RESULT_ENTRY; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the internal {@link FileSystemApplicationResultStore} mechanisms. */ +@ExtendWith(TestLoggerExtension.class) +public class FileSystemApplicationResultStoreFileOperationsTest { + + private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper(); + + private final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor = + new ManuallyTriggeredScheduledExecutor(); + + private FileSystemApplicationResultStore fileSystemApplicationResultStore; + + @TempDir File temporaryFolder; + + private Path basePath; + + @BeforeEach + public void setupTest() throws IOException { + basePath = new Path(temporaryFolder.toURI()); + fileSystemApplicationResultStore = + new FileSystemApplicationResultStore( + basePath.getFileSystem(), basePath, false, manuallyTriggeredExecutor); + } + + @Test + public void testValidEntryPathCreation() { + final Path entryParent = + fileSystemApplicationResultStore.constructEntryPath("random-name").getParent(); + assertThat(entryParent) + .extracting( + FileSystemApplicationResultStoreFileOperationsTest::stripSucceedingSlash) + .isEqualTo(stripSucceedingSlash(basePath)); + } + + private static String stripSucceedingSlash(Path path) { + final String uriStr = path.toUri().toString(); + if (uriStr.charAt(uriStr.length() - 1) == '/') { + return uriStr.substring(0, uriStr.length() - 1); + } + + return uriStr; + } + + @Test + public void testHasValidApplicationResultStoreEntryExtension() { + assertThat( + FileSystemApplicationResultStore + .hasValidApplicationResultStoreEntryExtension( + "test" + FileSystemApplicationResultStore.FILE_EXTENSION)) + .isTrue(); + } + + @ParameterizedTest + @ValueSource(strings = {"test.txt", "", "test.zip"}) + public void testHasInvalidApplicationResultStoreEntryExtension(String filename) { + assertThat( + FileSystemApplicationResultStore + .hasValidApplicationResultStoreEntryExtension(filename)) + .isFalse(); + } + + @Test + public void testHasValidDirtyApplicationResultStoreEntryExtension() { + assertThat( + FileSystemApplicationResultStore + .hasValidDirtyApplicationResultStoreEntryExtension( + "test" + + FileSystemApplicationResultStore + .DIRTY_FILE_EXTENSION)) + .isTrue(); + } + + @ParameterizedTest + @ValueSource(strings = {"test.json", "test.txt", "", "test.zip"}) + public void testHasInvalidDirtyApplicationResultStoreEntryExtension(String filename) { + assertThat( + FileSystemApplicationResultStore + .hasValidDirtyApplicationResultStoreEntryExtension(filename)) + .isFalse(); + } + + @Test + public void testBaseDirectoryCreationOnResultStoreInitialization() throws Exception { + final File emptyBaseDirectory = new File(temporaryFolder.getPath(), "empty-temp-dir"); + final Path basePath = new Path(emptyBaseDirectory.getPath()); + assertThat(emptyBaseDirectory).doesNotExist(); + + fileSystemApplicationResultStore = + new FileSystemApplicationResultStore( + basePath.getFileSystem(), basePath, false, manuallyTriggeredExecutor); + // Result store operations are creating the base directory on-the-fly + assertThat(emptyBaseDirectory).doesNotExist(); + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + assertThat(emptyBaseDirectory).doesNotExist(); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + assertThat(emptyBaseDirectory).exists().isDirectory(); + } + + @Test + public void testStoreDirtyApplicationResultCreatesFile() throws Exception { + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)).doesNotExist(); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + assertThat(getCleanResultIdsFromFileSystem()).isEmpty(); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)) + .exists() + .isFile() + .isNotEmpty(); + } + + @Test + public void testStoreCleanApplicationResultCreatesFile() throws Exception { + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + CompletableFuture<Void> markCleanAsync = + fileSystemApplicationResultStore.markResultAsCleanAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + assertThat(getCleanResultIdsFromFileSystem()) + .doesNotContain(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(markCleanAsync).eventuallySucceeds(); + assertThat(getCleanResultIdsFromFileSystem()) + .containsExactlyInAnyOrder(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + } + + @Test + public void testStoreCleanApplicationResultDeletesDirtyFile() { + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)).doesNotExist(); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)) + .exists() + .isFile() + .isNotEmpty(); + + CompletableFuture<Void> markResultAsCleanAsync = + fileSystemApplicationResultStore.markResultAsCleanAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(markResultAsCleanAsync).eventuallySucceeds(); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)).doesNotExist(); + } + + @Test + public void testCleanDirtyApplicationResultTwiceIsIdempotent() throws IOException { + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + CompletableFuture<Void> cleanResultAsync = + fileSystemApplicationResultStore.markResultAsCleanAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(cleanResultAsync).eventuallySucceeds(); + final byte[] cleanFileData = + FileUtils.readAllBytes(expectedCleanFile(DUMMY_APPLICATION_RESULT_ENTRY).toPath()); + + CompletableFuture<Void> markResultAsCleanAsync = + fileSystemApplicationResultStore.markResultAsCleanAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(markResultAsCleanAsync).eventuallySucceeds(); + assertThat(expectedCleanFile(DUMMY_APPLICATION_RESULT_ENTRY)) + .as( + "Marking the same application %s as clean should be idempotent.", + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()) + .hasBinaryContent(cleanFileData); + } + + /** + * Tests that, when the application result store is configured to delete on commit, both the + * clean and the dirty files for an application entry are deleted when the result is marked as + * clean. + */ + @Test + public void testDeleteOnCommit() throws IOException { + Path path = new Path(temporaryFolder.toURI()); + fileSystemApplicationResultStore = + new FileSystemApplicationResultStore( + path.getFileSystem(), path, true, manuallyTriggeredExecutor); + + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)).doesNotExist(); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)) + .exists() + .isFile() + .isNotEmpty(); + + CompletableFuture<Void> markResultAsCleanAsync = + fileSystemApplicationResultStore.markResultAsCleanAsync( + DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(markResultAsCleanAsync).eventuallySucceeds(); + assertThat(expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY)).doesNotExist(); + assertThat(expectedCleanFile(DUMMY_APPLICATION_RESULT_ENTRY)).doesNotExist(); + } + + @Test + public void testVersionSerialization() throws IOException { + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + final File dirtyFile = expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY); + final FileSystemApplicationResultStore.JsonApplicationResultEntry deserializedEntry = + MAPPER.readValue( + dirtyFile, + FileSystemApplicationResultStore.JsonApplicationResultEntry.class); + assertThat(dirtyFile).isFile().content().containsPattern("\"version\":1"); + assertThat(deserializedEntry.getVersion()).isEqualTo(1); + } + + @Test + public void testApplicationResultSerializationDeserialization() throws IOException { + CompletableFuture<Void> dirtyResultAsync = + fileSystemApplicationResultStore.createDirtyResultAsync( + DUMMY_APPLICATION_RESULT_ENTRY); + manuallyTriggeredExecutor.triggerAll(); + FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds(); + final File dirtyFile = expectedDirtyFile(DUMMY_APPLICATION_RESULT_ENTRY); + final FileSystemApplicationResultStore.JsonApplicationResultEntry deserializedEntry = + MAPPER.readValue( + dirtyFile, + FileSystemApplicationResultStore.JsonApplicationResultEntry.class); + final ApplicationResult deserializedApplicationResult = + deserializedEntry.getApplicationResult(); + assertThat(deserializedApplicationResult) + .extracting(ApplicationResult::getApplicationId) + .isEqualTo(DUMMY_APPLICATION_RESULT_ENTRY.getApplicationId()); + assertThat(deserializedApplicationResult) + .extracting(ApplicationResult::getApplicationState) + .isEqualTo( + DUMMY_APPLICATION_RESULT_ENTRY + .getApplicationResult() + .getApplicationState()); + } + + private List<ApplicationID> getCleanResultIdsFromFileSystem() throws IOException { + final List<ApplicationID> cleanResults = new ArrayList<>(); + + final File[] cleanFiles = + temporaryFolder.listFiles( + (dir, name) -> + !FileSystemApplicationResultStore + .hasValidDirtyApplicationResultStoreEntryExtension(name)); + assert cleanFiles != null; + for (File cleanFile : cleanFiles) { + final FileSystemApplicationResultStore.JsonApplicationResultEntry entry = + MAPPER.readValue( + cleanFile, + FileSystemApplicationResultStore.JsonApplicationResultEntry.class); + cleanResults.add(entry.getApplicationResult().getApplicationId()); + } + + return cleanResults; + } + + /** + * Generates the expected path for a dirty entry given an application entry. + * + * @param entry The application ID to construct the expected dirty path from. + * @return The expected dirty file. + */ + private File expectedDirtyFile(ApplicationResultEntry entry) { + return new File( + temporaryFolder.toURI().getPath(), + entry.getApplicationId().toString() + + FileSystemApplicationResultStore.DIRTY_FILE_EXTENSION); + } + + /** + * Generates the expected path for a clean entry given an application entry. + * + * @param entry The application entry to construct the expected clean path from. + * @return The expected clean file. + */ + private File expectedCleanFile(ApplicationResultEntry entry) { + return new File( + temporaryFolder.toURI().getPath(), + entry.getApplicationId().toString() + + FileSystemApplicationResultStore.FILE_EXTENSION); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedApplicationResultStoreContractTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedApplicationResultStoreContractTest.java new file mode 100644 index 00000000000..893f5c87fba --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedApplicationResultStoreContractTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha.embedded; + +import org.apache.flink.runtime.highavailability.ApplicationResultStore; +import org.apache.flink.runtime.highavailability.ApplicationResultStoreContractTest; +import org.apache.flink.runtime.highavailability.EmbeddedApplicationResultStore; + +import java.io.IOException; + +/** + * Tests for the {@link EmbeddedApplicationResultStore} implementation of the {@link + * ApplicationResultStore}'s contracts. + */ +public class EmbeddedApplicationResultStoreContractTest + implements ApplicationResultStoreContractTest { + @Override + public ApplicationResultStore createApplicationResultStore() throws IOException { + return new EmbeddedApplicationResultStore(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java new file mode 100644 index 00000000000..cf6dae44f0f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingApplicationResultStore.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.runtime.highavailability.ApplicationResult; +import org.apache.flink.runtime.highavailability.ApplicationResultEntry; +import org.apache.flink.runtime.highavailability.ApplicationResultStore; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.SupplierWithException; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * {@code TestingApplicationResultStore} is an {@link ApplicationResultStore} implementation that + * can be used in tests. + */ +public class TestingApplicationResultStore implements ApplicationResultStore { + + public static final ApplicationResult DUMMY_APPLICATION_RESULT = + createSuccessfulApplicationResult(new ApplicationID()); + + public static ApplicationResult createSuccessfulApplicationResult(ApplicationID applicationId) { + return createApplicationResult(applicationId, ApplicationState.FINISHED); + } + + public static ApplicationResult createApplicationResult( + ApplicationID applicationId, ApplicationState applicationState) { + return new ApplicationResult.Builder() + .applicationId(applicationId) + .applicationState(applicationState) + .applicationName("test-application") + .build(); + } + + private final Function<ApplicationResultEntry, CompletableFuture<Void>> + createDirtyResultConsumer; + private final Function<ApplicationID, CompletableFuture<Void>> markResultAsCleanConsumer; + + private final Function<ApplicationID, CompletableFuture<Boolean>> + hasApplicationResultEntryFunction; + private final Function<ApplicationID, CompletableFuture<Boolean>> + hasDirtyApplicationResultEntryFunction; + private final Function<ApplicationID, CompletableFuture<Boolean>> + hasCleanApplicationResultEntryFunction; + private final SupplierWithException<Set<ApplicationResult>, ? extends IOException> + getDirtyResultsSupplier; + + private TestingApplicationResultStore( + Function<ApplicationResultEntry, CompletableFuture<Void>> createDirtyResultConsumer, + Function<ApplicationID, CompletableFuture<Void>> markResultAsCleanConsumer, + Function<ApplicationID, CompletableFuture<Boolean>> hasApplicationResultEntryFunction, + Function<ApplicationID, CompletableFuture<Boolean>> + hasDirtyApplicationResultEntryFunction, + Function<ApplicationID, CompletableFuture<Boolean>> + hasCleanApplicationResultEntryFunction, + SupplierWithException<Set<ApplicationResult>, ? extends IOException> + getDirtyResultsSupplier) { + this.createDirtyResultConsumer = createDirtyResultConsumer; + this.markResultAsCleanConsumer = markResultAsCleanConsumer; + this.hasApplicationResultEntryFunction = hasApplicationResultEntryFunction; + this.hasDirtyApplicationResultEntryFunction = hasDirtyApplicationResultEntryFunction; + this.hasCleanApplicationResultEntryFunction = hasCleanApplicationResultEntryFunction; + this.getDirtyResultsSupplier = getDirtyResultsSupplier; + } + + @Override + public CompletableFuture<Void> createDirtyResultAsync( + ApplicationResultEntry applicationResultEntry) { + return createDirtyResultConsumer.apply(applicationResultEntry); + } + + @Override + public CompletableFuture<Void> markResultAsCleanAsync(ApplicationID applicationId) { + return markResultAsCleanConsumer.apply(applicationId); + } + + @Override + public CompletableFuture<Boolean> hasApplicationResultEntryAsync(ApplicationID applicationId) { + return hasApplicationResultEntryFunction.apply(applicationId); + } + + @Override + public CompletableFuture<Boolean> hasDirtyApplicationResultEntryAsync( + ApplicationID applicationId) { + return hasDirtyApplicationResultEntryFunction.apply(applicationId); + } + + @Override + public CompletableFuture<Boolean> hasCleanApplicationResultEntryAsync( + ApplicationID applicationId) { + return hasCleanApplicationResultEntryFunction.apply(applicationId); + } + + @Override + public Set<ApplicationResult> getDirtyResults() throws IOException { + return getDirtyResultsSupplier.get(); + } + + public static TestingApplicationResultStore.Builder builder() { + return new Builder(); + } + + /** {@code Builder} for instantiating {@code TestingApplicationResultStore} instances. */ + public static class Builder { + + private Function<ApplicationResultEntry, CompletableFuture<Void>> + createDirtyResultConsumer = + applicationResultEntry -> FutureUtils.completedVoidFuture(); + private Function<ApplicationID, CompletableFuture<Void>> markResultAsCleanConsumer = + applicationID -> FutureUtils.completedVoidFuture(); + + private Function<ApplicationID, CompletableFuture<Boolean>> + hasApplicationResultEntryFunction = + applicationID -> CompletableFuture.completedFuture(false); + private Function<ApplicationID, CompletableFuture<Boolean>> + hasDirtyApplicationResultEntryFunction = + applicationID -> CompletableFuture.completedFuture(false); + private Function<ApplicationID, CompletableFuture<Boolean>> + hasCleanApplicationResultEntryFunction = + applicationID -> CompletableFuture.completedFuture(false); + + private SupplierWithException<Set<ApplicationResult>, ? extends IOException> + getDirtyResultsSupplier = Collections::emptySet; + + public Builder withCreateDirtyResultConsumer( + Function<ApplicationResultEntry, CompletableFuture<Void>> + createDirtyResultConsumer) { + this.createDirtyResultConsumer = createDirtyResultConsumer; + return this; + } + + public Builder withMarkResultAsCleanConsumer( + Function<ApplicationID, CompletableFuture<Void>> markResultAsCleanConsumer) { + this.markResultAsCleanConsumer = markResultAsCleanConsumer; + return this; + } + + public Builder withHasApplicationResultEntryFunction( + Function<ApplicationID, CompletableFuture<Boolean>> + hasApplicationResultEntryFunction) { + this.hasApplicationResultEntryFunction = hasApplicationResultEntryFunction; + return this; + } + + public Builder withHasDirtyApplicationResultEntryFunction( + Function<ApplicationID, CompletableFuture<Boolean>> + hasDirtyApplicationResultEntryFunction) { + this.hasDirtyApplicationResultEntryFunction = hasDirtyApplicationResultEntryFunction; + return this; + } + + public Builder withHasCleanApplicationResultEntryFunction( + Function<ApplicationID, CompletableFuture<Boolean>> + hasCleanApplicationResultEntryFunction) { + this.hasCleanApplicationResultEntryFunction = hasCleanApplicationResultEntryFunction; + return this; + } + + public Builder withGetDirtyResultsSupplier( + SupplierWithException<Set<ApplicationResult>, ? extends IOException> + getDirtyResultsSupplier) { + this.getDirtyResultsSupplier = getDirtyResultsSupplier; + return this; + } + + public TestingApplicationResultStore build() { + return new TestingApplicationResultStore( + createDirtyResultConsumer, + markResultAsCleanConsumer, + hasApplicationResultEntryFunction, + hasDirtyApplicationResultEntryFunction, + hasCleanApplicationResultEntryFunction, + getDirtyResultsSupplier); + } + } +}
