zentol commented on a change in pull request #17474: URL: https://github.com/apache/flink/pull/17474#discussion_r734546710
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java ########## @@ -132,37 +134,49 @@ requestTaskManagerMetricQueryServiceAddresses(@RpcTimeout Time timeout); /** - * Triggers a savepoint with the given savepoint directory as a target. + * Triggers a savepoint with the given savepoint directory as a target, returning a future that + * completes when the operation is started. * - * @param jobId ID of the job for which the savepoint should be triggered. - * @param targetDirectory Target directory for the savepoint. + * @param operationKey the key of the operation, for deduplication purposes + * @param parameters input parameters for taking a savepoint * @param timeout Timeout for the asynchronous operation - * @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of - * the savepoint. + * @return Future which is completed once the operation is triggered successfully */ - default CompletableFuture<String> triggerSavepoint( - JobID jobId, String targetDirectory, boolean cancelJob, @RpcTimeout Time timeout) { + default CompletableFuture<Acknowledge> triggerSavepoint( + AsynchronousJobOperationKey operationKey, + TriggerSavepointParameters parameters, Review comment: The JobID is contained in both, so we're duplicating data a bit. It doesn't seem ergonomic that I need to bundled the JobID with other data twice. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; + +/** Tests for the {@link DispatcherCachedOperationsHandler} component. */ +public class DispatcherCachedOperationsHandlerTest extends TestLogger { + + private static final Time TIMEOUT = Time.minutes(1); + + private CompletedOperationCache<AsynchronousJobOperationKey, String> cache; + private DispatcherCachedOperationsHandler handler; + + private TriggerSavepointSpyFunction triggerSavepointFunction; + private TriggerSavepointSpyFunction stopWithSavepointFunction; + + private CompletableFuture<String> savepointLocationFuture = new CompletableFuture<>(); + private final TriggerSavepointParameters triggerSavepointParameters = + TriggerSavepointParameters.builder() + .jobID(new JobID()) + .targetDirectory("dummyDirectory") + .savepointMode(TriggerSavepointMode.SAVEPOINT) + .build(); + private AsynchronousJobOperationKey operationKey; + + @Before + public void setup() { + savepointLocationFuture = new CompletableFuture<>(); + triggerSavepointFunction = + TriggerSavepointSpyFunction.wrap((parameters, timeout) -> savepointLocationFuture); + stopWithSavepointFunction = + TriggerSavepointSpyFunction.wrap((parameters, timeout) -> savepointLocationFuture); + cache = new CompletedOperationCache<>(); + handler = + new DispatcherCachedOperationsHandler( + triggerSavepointFunction, stopWithSavepointFunction, cache); + operationKey = + AsynchronousJobOperationKey.of( + new TriggerId(), triggerSavepointParameters.getJobID()); + } + + @Test + public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedException { + CompletableFuture<Acknowledge> firstAcknowledge = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + CompletableFuture<Acknowledge> secondAcknowledge = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + + assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1)); + assertThat( + triggerSavepointFunction.getInvocationParameters().get(0), + is(triggerSavepointParameters)); + + assertThat(firstAcknowledge.get(), is(Acknowledge.get())); + assertThat(secondAcknowledge.get(), is(Acknowledge.get())); + } + + @Test + public void stopWithSavepointRepeatedly() throws ExecutionException, InterruptedException { + TriggerSavepointParameters stopWithSavepointParameters = + TriggerSavepointParameters.builder() + .jobID(new JobID()) + .targetDirectory("dummyDirectory") + .savepointMode(TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT) + .build(); + CompletableFuture<Acknowledge> firstAcknowledge = + handler.stopWithSavepoint(operationKey, stopWithSavepointParameters, TIMEOUT); + CompletableFuture<Acknowledge> secondAcknowledge = + handler.stopWithSavepoint(operationKey, stopWithSavepointParameters, TIMEOUT); + + assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1)); + assertThat( + stopWithSavepointFunction.getInvocationParameters().get(0), + is(stopWithSavepointParameters)); + + assertThat(firstAcknowledge.get(), is(Acknowledge.get())); + assertThat(secondAcknowledge.get(), is(Acknowledge.get())); + } + + @Test + public void returnsFailedFutureIfOperationFails() + throws ExecutionException, InterruptedException { + CompletableFuture<Acknowledge> acknowledgeRegisteredOperation = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + savepointLocationFuture.completeExceptionally(new RuntimeException("Expected failure")); + CompletableFuture<Acknowledge> failedAcknowledgeFuture = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + + assertThat(acknowledgeRegisteredOperation.get(), is(Acknowledge.get())); + assertThrows(ExecutionException.class, failedAcknowledgeFuture::get); + } + + @Test + public void throwsIfCacheIsShuttingDown() { + cache.closeAsync(); + assertThrows( + IllegalStateException.class, + () -> handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT)); + } + + @Test + public void getStatus() throws ExecutionException, InterruptedException { + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + + String savepointLocation = "location"; + savepointLocationFuture.complete(savepointLocation); + + CompletableFuture<OperationResult<String>> statusFuture = + handler.getSavepointStatus(operationKey); + + assertEquals(statusFuture.get(), OperationResult.success(savepointLocation)); + } + + @Test + public void getStatusFailsIfKeyUnknown() throws InterruptedException { + CompletableFuture<OperationResult<String>> statusFuture = + handler.getSavepointStatus(operationKey); + + assertThat( + statusFuture, + futureWillCompleteExceptionally(UnknownOperationKeyException.class, Duration.ZERO)); Review comment: I don't think `Duration.ZERO` actually works if the future is not complete yet. It should fail immediately with a `TimeoutExcepion` in `CompletableFuture#TimeoutException.` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointParameters.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Bundles parameters required for taking a savepoint. */ +public class TriggerSavepointParameters { + private final JobID jobID; + private final String targetDirectory; + private final TriggerSavepointMode savepointMode; + + private TriggerSavepointParameters( + JobID jobID, String targetDirectory, TriggerSavepointMode savepointMode) { + this.jobID = checkNotNull(jobID); + this.targetDirectory = checkNotNull(targetDirectory); + this.savepointMode = savepointMode; + } + + public JobID getJobID() { + return jobID; + } + + public String getTargetDirectory() { + return targetDirectory; + } + + public TriggerSavepointMode getSavepointMode() { + return savepointMode; + } + + public static Builder builder() { + return new Builder(); + } + + /** Builder for the class. */ + public static class Builder { + private JobID jobID; + private String targetDirectory; + private TriggerSavepointMode savepointMode; Review comment: considering that none of these are optional, do we really need a builder? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; + +/** Tests for the {@link DispatcherCachedOperationsHandler} component. */ +public class DispatcherCachedOperationsHandlerTest extends TestLogger { + + private static final Time TIMEOUT = Time.minutes(1); + + private CompletedOperationCache<AsynchronousJobOperationKey, String> cache; + private DispatcherCachedOperationsHandler handler; + + private TriggerSavepointSpyFunction triggerSavepointFunction; + private TriggerSavepointSpyFunction stopWithSavepointFunction; + + private CompletableFuture<String> savepointLocationFuture = new CompletableFuture<>(); + private final TriggerSavepointParameters triggerSavepointParameters = + TriggerSavepointParameters.builder() + .jobID(new JobID()) + .targetDirectory("dummyDirectory") + .savepointMode(TriggerSavepointMode.SAVEPOINT) + .build(); + private AsynchronousJobOperationKey operationKey; + + @Before + public void setup() { + savepointLocationFuture = new CompletableFuture<>(); + triggerSavepointFunction = + TriggerSavepointSpyFunction.wrap((parameters, timeout) -> savepointLocationFuture); + stopWithSavepointFunction = + TriggerSavepointSpyFunction.wrap((parameters, timeout) -> savepointLocationFuture); + cache = new CompletedOperationCache<>(); + handler = + new DispatcherCachedOperationsHandler( + triggerSavepointFunction, stopWithSavepointFunction, cache); + operationKey = + AsynchronousJobOperationKey.of( + new TriggerId(), triggerSavepointParameters.getJobID()); + } + + @Test + public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedException { + CompletableFuture<Acknowledge> firstAcknowledge = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + CompletableFuture<Acknowledge> secondAcknowledge = Review comment: In hindsight it is a bit unfortunate that this PR both moves the state and implements idempotent triggering in the Dispatcher. We could've reduced the scope of this PR a bit if we had implemented the idempotent triggering first. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; + +/** Tests for the {@link DispatcherCachedOperationsHandler} component. */ +public class DispatcherCachedOperationsHandlerTest extends TestLogger { + + private static final Time TIMEOUT = Time.minutes(1); Review comment: This should be a bit longer (say, 10 minutes) because we have seen instabilities on CI. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; + +/** Tests for the {@link DispatcherCachedOperationsHandler} component. */ +public class DispatcherCachedOperationsHandlerTest extends TestLogger { + + private static final Time TIMEOUT = Time.minutes(1); + + private CompletedOperationCache<AsynchronousJobOperationKey, String> cache; + private DispatcherCachedOperationsHandler handler; + + private TriggerSavepointSpyFunction triggerSavepointFunction; + private TriggerSavepointSpyFunction stopWithSavepointFunction; + + private CompletableFuture<String> savepointLocationFuture = new CompletableFuture<>(); + private final TriggerSavepointParameters triggerSavepointParameters = + TriggerSavepointParameters.builder() + .jobID(new JobID()) + .targetDirectory("dummyDirectory") + .savepointMode(TriggerSavepointMode.SAVEPOINT) + .build(); + private AsynchronousJobOperationKey operationKey; + + @Before + public void setup() { + savepointLocationFuture = new CompletableFuture<>(); + triggerSavepointFunction = + TriggerSavepointSpyFunction.wrap((parameters, timeout) -> savepointLocationFuture); + stopWithSavepointFunction = + TriggerSavepointSpyFunction.wrap((parameters, timeout) -> savepointLocationFuture); + cache = new CompletedOperationCache<>(); + handler = + new DispatcherCachedOperationsHandler( + triggerSavepointFunction, stopWithSavepointFunction, cache); + operationKey = + AsynchronousJobOperationKey.of( + new TriggerId(), triggerSavepointParameters.getJobID()); + } + + @Test + public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedException { + CompletableFuture<Acknowledge> firstAcknowledge = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + CompletableFuture<Acknowledge> secondAcknowledge = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + + assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1)); + assertThat( + triggerSavepointFunction.getInvocationParameters().get(0), + is(triggerSavepointParameters)); + + assertThat(firstAcknowledge.get(), is(Acknowledge.get())); + assertThat(secondAcknowledge.get(), is(Acknowledge.get())); + } + + @Test + public void stopWithSavepointRepeatedly() throws ExecutionException, InterruptedException { + TriggerSavepointParameters stopWithSavepointParameters = + TriggerSavepointParameters.builder() + .jobID(new JobID()) + .targetDirectory("dummyDirectory") + .savepointMode(TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT) + .build(); + CompletableFuture<Acknowledge> firstAcknowledge = + handler.stopWithSavepoint(operationKey, stopWithSavepointParameters, TIMEOUT); + CompletableFuture<Acknowledge> secondAcknowledge = + handler.stopWithSavepoint(operationKey, stopWithSavepointParameters, TIMEOUT); + + assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1)); + assertThat( + stopWithSavepointFunction.getInvocationParameters().get(0), + is(stopWithSavepointParameters)); + + assertThat(firstAcknowledge.get(), is(Acknowledge.get())); + assertThat(secondAcknowledge.get(), is(Acknowledge.get())); + } + + @Test + public void returnsFailedFutureIfOperationFails() + throws ExecutionException, InterruptedException { + CompletableFuture<Acknowledge> acknowledgeRegisteredOperation = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + savepointLocationFuture.completeExceptionally(new RuntimeException("Expected failure")); + CompletableFuture<Acknowledge> failedAcknowledgeFuture = + handler.triggerSavepoint(operationKey, triggerSavepointParameters, TIMEOUT); + + assertThat(acknowledgeRegisteredOperation.get(), is(Acknowledge.get())); + assertThrows(ExecutionException.class, failedAcknowledgeFuture::get); Review comment: Use FlinkMatchers to check for the actual cause (which should be RuntimeException above?) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.OperationResultStatus; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +/** + * A handler for async operations triggered by the {@link Dispatcher} whose keys and results are + * cached. + */ +public class DispatcherCachedOperationsHandler { + + private final CompletedOperationCache<AsynchronousJobOperationKey, String> + savepointTriggerCache; + + private final TriggerSavepointFunction triggerSavepointFunction; + + private final TriggerSavepointFunction stopWithSavepointFunction; + + DispatcherCachedOperationsHandler( + DispatcherOperationCaches operationCaches, + TriggerSavepointFunction triggerSavepointFunction, + TriggerSavepointFunction stopWithSavepointFunction) { + this( + triggerSavepointFunction, + stopWithSavepointFunction, + operationCaches.getSavepointTriggerCache()); + } + + @VisibleForTesting + DispatcherCachedOperationsHandler( + TriggerSavepointFunction triggerSavepointFunction, + TriggerSavepointFunction stopWithSavepointFunction, + CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) { + this.triggerSavepointFunction = triggerSavepointFunction; + this.stopWithSavepointFunction = stopWithSavepointFunction; + this.savepointTriggerCache = savepointTriggerCache; + } + + public CompletableFuture<Acknowledge> triggerSavepoint( + AsynchronousJobOperationKey operationKey, + TriggerSavepointParameters parameters, + Time timeout) { + return registerOperationIdempotently( + operationKey, () -> triggerSavepointFunction.apply(parameters, timeout)); + } + + public CompletableFuture<Acknowledge> stopWithSavepoint( + AsynchronousJobOperationKey operationKey, + TriggerSavepointParameters parameters, + Time timeout) { + return registerOperationIdempotently( + operationKey, () -> stopWithSavepointFunction.apply(parameters, timeout)); + } + + public CompletableFuture<OperationResult<String>> getSavepointStatus( + AsynchronousJobOperationKey operationKey) { + return savepointTriggerCache + .get(operationKey) + .map(CompletableFuture::completedFuture) + .orElse( + CompletableFuture.failedFuture( + new UnknownOperationKeyException(operationKey))); + } + + private <P> CompletableFuture<Acknowledge> registerOperationIdempotently( + AsynchronousJobOperationKey operationKey, + Supplier<CompletableFuture<String>> operation) { + Optional<OperationResult<String>> resultOptional = savepointTriggerCache.get(operationKey); + if (resultOptional.isPresent()) { + return convertToFuture(resultOptional.get()); + } + + savepointTriggerCache.registerOngoingOperation(operationKey, operation.get()); + + return savepointTriggerCache + .get(operationKey) + .map(this::convertToFuture) Review comment: We could streamline this a bit by having `registerOngoingOperation` return the current `OperationResult`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; Review comment: New tests should be implemented against jUnit5. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.OperationResultStatus; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +/** + * A handler for async operations triggered by the {@link Dispatcher} whose keys and results are + * cached. + */ +public class DispatcherCachedOperationsHandler { + + private final CompletedOperationCache<AsynchronousJobOperationKey, String> + savepointTriggerCache; + + private final TriggerSavepointFunction triggerSavepointFunction; + + private final TriggerSavepointFunction stopWithSavepointFunction; + + DispatcherCachedOperationsHandler( + DispatcherOperationCaches operationCaches, + TriggerSavepointFunction triggerSavepointFunction, + TriggerSavepointFunction stopWithSavepointFunction) { + this( + triggerSavepointFunction, + stopWithSavepointFunction, + operationCaches.getSavepointTriggerCache()); + } + + @VisibleForTesting + DispatcherCachedOperationsHandler( + TriggerSavepointFunction triggerSavepointFunction, + TriggerSavepointFunction stopWithSavepointFunction, + CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) { + this.triggerSavepointFunction = triggerSavepointFunction; + this.stopWithSavepointFunction = stopWithSavepointFunction; + this.savepointTriggerCache = savepointTriggerCache; + } + + public CompletableFuture<Acknowledge> triggerSavepoint( + AsynchronousJobOperationKey operationKey, + TriggerSavepointParameters parameters, + Time timeout) { + return registerOperationIdempotently( + operationKey, () -> triggerSavepointFunction.apply(parameters, timeout)); + } + + public CompletableFuture<Acknowledge> stopWithSavepoint( + AsynchronousJobOperationKey operationKey, + TriggerSavepointParameters parameters, + Time timeout) { + return registerOperationIdempotently( + operationKey, () -> stopWithSavepointFunction.apply(parameters, timeout)); + } + + public CompletableFuture<OperationResult<String>> getSavepointStatus( + AsynchronousJobOperationKey operationKey) { + return savepointTriggerCache + .get(operationKey) + .map(CompletableFuture::completedFuture) + .orElse( + CompletableFuture.failedFuture( + new UnknownOperationKeyException(operationKey))); + } + + private <P> CompletableFuture<Acknowledge> registerOperationIdempotently( + AsynchronousJobOperationKey operationKey, + Supplier<CompletableFuture<String>> operation) { + Optional<OperationResult<String>> resultOptional = savepointTriggerCache.get(operationKey); + if (resultOptional.isPresent()) { + return convertToFuture(resultOptional.get()); + } + + savepointTriggerCache.registerOngoingOperation(operationKey, operation.get()); + + return savepointTriggerCache + .get(operationKey) + .map(this::convertToFuture) + // This shouldn't happen as we just registered the operation. We assume it is a + // temporary issue with the cache + .orElse(CompletableFuture.completedFuture(Acknowledge.get())); + } + + private CompletableFuture<Acknowledge> convertToFuture(OperationResult<String> result) { Review comment: ```suggestion private static CompletableFuture<Acknowledge> convertToFuture(OperationResult<String> result) { ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.async.OperationResultStatus; +import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +/** + * A handler for async operations triggered by the {@link Dispatcher} whose keys and results are + * cached. + */ +public class DispatcherCachedOperationsHandler { + + private final CompletedOperationCache<AsynchronousJobOperationKey, String> + savepointTriggerCache; + + private final TriggerSavepointFunction triggerSavepointFunction; + + private final TriggerSavepointFunction stopWithSavepointFunction; + + DispatcherCachedOperationsHandler( + DispatcherOperationCaches operationCaches, + TriggerSavepointFunction triggerSavepointFunction, + TriggerSavepointFunction stopWithSavepointFunction) { + this( + triggerSavepointFunction, + stopWithSavepointFunction, + operationCaches.getSavepointTriggerCache()); + } + + @VisibleForTesting + DispatcherCachedOperationsHandler( + TriggerSavepointFunction triggerSavepointFunction, + TriggerSavepointFunction stopWithSavepointFunction, + CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) { + this.triggerSavepointFunction = triggerSavepointFunction; + this.stopWithSavepointFunction = stopWithSavepointFunction; + this.savepointTriggerCache = savepointTriggerCache; + } + + public CompletableFuture<Acknowledge> triggerSavepoint( + AsynchronousJobOperationKey operationKey, + TriggerSavepointParameters parameters, + Time timeout) { + return registerOperationIdempotently( + operationKey, () -> triggerSavepointFunction.apply(parameters, timeout)); + } + + public CompletableFuture<Acknowledge> stopWithSavepoint( + AsynchronousJobOperationKey operationKey, + TriggerSavepointParameters parameters, + Time timeout) { + return registerOperationIdempotently( + operationKey, () -> stopWithSavepointFunction.apply(parameters, timeout)); + } Review comment: Potential follow-up: Merge these 2 methods, along with the ones in the RestfulGateway because the `TriggerSavepointParameters` already differentiates between the two via the `TriggerSavepointMode.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org