StephanEwen commented on a change in pull request #13784:
URL: https://github.com/apache/flink/pull/13784#discussion_r516139098
##
File path:
flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * This interface must be implemented by functions/operations that want to
receive
Review comment:
Suggested improvement for the JavaDocs:
```
This interface let's functions/operators receive commit- or abort
notifications for
checkpoints.
Important: Just using state (like {@link ValueState}, {@link
ListState}, etc.) in
functions/operators does NOT require implementing this interface.
This interface is typically only needed for transactional interaction
with the
"outside world", like committing external side effects on checkpoints.
An example is committing external transactions once a checkpoint completes.
Invocation Guarantees
It is NOT guaranteed that the implementation will receive a notification
for each
completed or aborted checkpoint. While these notifications come in most
cases, notifications
might not happen, for example, when a failure/restore happens directly after
a checkpoint
completed.
To handle this correctly, implementation should follow the "Checkpoint
Subsuming Contract"
described below.
Exceptions
The notifications from this interface come "after the fact", meaning
after the checkpoint has
been aborted or completed. Throwing an exception will not change the
completion/abortion of
the checkpoint.
Exceptions thrown from this method result in task- or job failure and
recovery.
Checkpoint Subsuming Contract
Checkpoint IDs are strictly increasing. A checkpoint with higher ID
always subsumes
a checkpoint with lower ID. For example, when checkpoint T is confirmed
complete, the
code can assume that no checkpoints with lower ID (T-1, T-2, etc.) are
pending any more.
No checkpoint with lower ID will ever be committed after a checkpoint
with a higher ID.
This does not necessarily mean that all of the previous checkpoints
actually completed
successfully. It is also possible that some checkpoint timed out or was not
fully acknowledged
by all tasks. Implementations must then behave as if that checkpoint did not
happen.
The recommended way to do this is to let the completion of a new checkpoint
(higher ID)
subsume the completion of all earlier checkpoints (lower ID).
This property is easy to achieve for cases where increasing "offsets",
"watermarks", or
other progress indicators are communicated on checkpoint completion. A newer
checkpoint
will have a higher "offset" (more progress) than the previous checkpoint, so
it automatically subsumes the previous one. Remember the "offset to commit" for
a checkpoint ID
and commit it when that specific checkpoint (by ID) gets the notification
that it is complete.
If you need to publish some specific artifacts (like files) or
acknowledge some specific IDs
after a checkpoint, you can follow a pattern like below.
Implementing Checkpoint Subsuming for Committing Artifacts
The following is a sample pattern how applications can publish specific
artifacts on checkpoint.
Examples would be operators that acknowledge specific IDs or publish
specific files on checkpoint.
During processing, have two sets of artifacts.
A "ready set": Artifacts that are ready to be published as
part of the next checkpoint.
Artifacts are added to this set as soon as they are ready to
be committed.
This set is "transient", it is not stored in Flink's state
persisted anywhere.
A "pending set": Artifacts being committed with a checkpoint.
The actual publishing happens when the checkpoint is
complete.
This is a map of "{@code long => List}", mapping
from the id of the