zentol commented on a change in pull request #19102:
URL: https://github.com/apache/flink/pull/19102#discussion_r827129983
##########
File path: docs/content.zh/docs/deployment/overview.md
##########
@@ -158,6 +158,11 @@ Once a job has reached a globally terminal state of either
finished, failed or c
external component resources associated with the job are then cleaned up. In
the event of a
failure when cleaning up a resource, Flink will attempt to retry the cleanup.
You can
[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry
strategy used.
+Reaching the maximum number of retries without succeeding will leave the job
in a dirty state.
+Its artifacts (and its [JobResultStore]({{< ref
"docs/deployment/ha/overview#jobresultstore" >}})
+entry like in Application Mode) would need to be cleaned up manually.
Restarting the very same
Review comment:
```suggestion
entry) would need to be cleaned up manually. Restarting the very same
```
Reads a bit weird, and I'm not sure why application mode should be
explicitly mentioned.
##########
File path: docs/content/docs/deployment/overview.md
##########
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options
available for each buildi
Once a job has reached a globally terminal state of either finished, failed or
cancelled, the
external component resources associated with the job are then cleaned up. In
the event of a
failure when cleaning up a resource, Flink will attempt to retry the cleanup.
You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry
strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry
strategy used.
+Reaching the maximum number of retries without succeeding will leave the job
in a dirty state.
Review comment:
Do we have any documentation that explains the consequences of being in
a dirty state?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -615,7 +615,16 @@ private void runJob(JobManagerRunner jobManagerRunner,
ExecutionType executionTy
final CompletableFuture<Void> jobTerminationFuture =
cleanupJobStateFuture.thenCompose(
- cleanupJobState -> removeJob(jobId, cleanupJobState));
+ cleanupJobState ->
+ removeJob(jobId, cleanupJobState)
+ .exceptionally(
+ throwable -> {
+ log.warn(
+ "The cleanup of
job {} failed. The job's artifacts and its JobResultStore entry needs to be
cleaned manually.",
+ jobId,
Review comment:
would it be possible to list the paths that need to be cleaned up?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -288,7 +304,7 @@ public void testCleanupAfterLeadershipChange() throws
Exception {
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
- jobGraphRemovalErrorReceived.await();
+ firstCleanupFailsLatch.trigger();
Review comment:
I don't see where we guarantee that the cleanup isn't re-run immediately
and finishes before the leadership has changed.
##########
File path: docs/content/docs/deployment/overview.md
##########
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options
available for each buildi
Once a job has reached a globally terminal state of either finished, failed or
cancelled, the
external component resources associated with the job are then cleaned up. In
the event of a
failure when cleaning up a resource, Flink will attempt to retry the cleanup.
You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry
strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry
strategy used.
+Reaching the maximum number of retries without succeeding will leave the job
in a dirty state.
+Its artifacts (and its [JobResultStore]({{< ref
"docs/deployment/ha/overview#jobresultstore" >}})
+entry like in Application Mode) would need to be cleaned up manually.
Restarting the very same
+job (i.e. using the same job ID) would result in the retryable cleanup being
picked up again
Review comment:
```suggestion
job (i.e. using the same job ID) will result in the cleanup being restarted
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -249,11 +259,30 @@ public void testCleanupAfterLeadershipChange() throws
Exception {
// Construct job graph store.
final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
+ final OneShotLatch firstCleanupFailsLatch = new OneShotLatch();
final OneShotLatch successfulCleanupLatch = new OneShotLatch();
- final RuntimeException temporaryError = new RuntimeException("Unable
to remove job graph.");
final JobGraphStore jobGraphStore =
- createAndStartJobGraphStoreWithCleanupFailures(
- 1, temporaryError, actualGlobalCleanupCallCount,
successfulCleanupLatch);
+ TestingJobGraphStore.newBuilder()
+ .setGlobalCleanupFunction(
+ (ignoredJobId, ignoredExecutor) -> {
+ try {
+ firstCleanupFailsLatch.await();
+ } catch (InterruptedException e) {
+ throw new CompletionException(e);
+ }
+
+ if
(actualGlobalCleanupCallCount.getAndIncrement() < 1) {
+ return
FutureUtils.completedExceptionally(
+ new RuntimeException(
+ "Expected
RuntimeException: Unable to remove job graph."));
+ }
+
+ successfulCleanupLatch.trigger();
+ return FutureUtils.completedVoidFuture();
+ })
+ .build();
+
+ jobGraphStore.start(null);
Review comment:
see above
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -133,12 +132,23 @@ public void testCleanupThroughRetries() throws Exception {
final int numberOfErrors = 5;
final RuntimeException temporaryError =
new RuntimeException("Expected RuntimeException: Unable to
remove job graph.");
+ final AtomicInteger failureCount = new AtomicInteger(numberOfErrors);
final JobGraphStore jobGraphStore =
- createAndStartJobGraphStoreWithCleanupFailures(
- numberOfErrors,
- temporaryError,
- actualGlobalCleanupCallCount,
- successfulCleanupLatch);
+ TestingJobGraphStore.newBuilder()
+ .setGlobalCleanupFunction(
+ (ignoredJobId, ignoredExecutor) -> {
+
actualGlobalCleanupCallCount.incrementAndGet();
+
+ if (failureCount.getAndDecrement() > 0) {
+ return
FutureUtils.completedExceptionally(temporaryError);
+ }
+
+ successfulCleanupLatch.trigger();
+ return FutureUtils.completedVoidFuture();
+ })
+ .build();
+
+ jobGraphStore.start(null);
Review comment:
```suggestion
jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]