StephanEwen commented on a change in pull request #13784:
URL: https://github.com/apache/flink/pull/13784#discussion_r516139098



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * This interface must be implemented by functions/operations that want to 
receive

Review comment:
       Suggested improvement for the JavaDocs:
   
   ```
   This interface let's functions/operators receive commit- or abort 
notifications for
   checkpoints.
   
   <p><b>Important</b>: Just using state (like {@link ValueState}, {@link 
ListState}, etc.) in
   functions/operators does NOT require implementing this interface.
   
   <p>This interface is typically only needed for transactional interaction 
with the
   "outside world", like committing external side effects on checkpoints.
   An example is committing external transactions once a checkpoint completes.
   
   <h3>Invocation Guarantees</h3>
   
   <p>It is NOT guaranteed that the implementation will receive a notification 
for each
   completed or aborted checkpoint. While these notifications come in most 
cases, notifications
   might not happen, for example, when a failure/restore happens directly after 
a checkpoint 
   completed.
   
   <p>To handle this correctly, implementation should follow the "Checkpoint 
Subsuming Contract"
   described below.
   
   <h3>Exceptions</h3>
   
   <p>The notifications from this interface come "after the fact", meaning 
after the checkpoint has
   been aborted or completed. Throwing an exception will not change the 
completion/abortion of
   the checkpoint.
   
   <p>Exceptions thrown from this method result in task- or job failure and 
recovery.
   
   <h3>Checkpoint Subsuming Contract</h3>
   
   <p>Checkpoint IDs are strictly increasing. A checkpoint with higher ID 
always subsumes
   a checkpoint with lower ID. For example, when checkpoint T is confirmed 
complete, the
   code can assume that no checkpoints with lower ID (T-1, T-2, etc.) are 
pending any more.
   <b>No checkpoint with lower ID will ever be committed after a checkpoint 
with a higher ID.</b>
   
   <p>This does not necessarily mean that all of the previous checkpoints 
actually completed
   successfully. It is also possible that some checkpoint timed out or was not 
fully acknowledged
   by all tasks. Implementations must then behave as if that checkpoint did not 
happen.
   The recommended way to do this is to let the completion of a new checkpoint 
(higher ID)
   subsume the completion of all earlier checkpoints (lower ID).
   
   <p>This property is easy to achieve for cases where increasing "offsets", 
"watermarks", or
   other progress indicators are communicated on checkpoint completion. A newer 
checkpoint
   will have a higher "offset" (more progress) than the previous checkpoint, so 
it automatically subsumes the previous one. Remember the "offset to commit" for 
a checkpoint ID
   and commit it when that specific checkpoint (by ID) gets the notification 
that it is complete.
   
   <p>If you need to publish some specific artifacts (like files) or 
acknowledge some specific IDs
   after a checkpoint, you can follow a pattern like below.
   
   <h3>Implementing Checkpoint Subsuming for Committing Artifacts</h3>
   
   <p>The following is a sample pattern how applications can publish specific 
artifacts on checkpoint.
   Examples would be operators that acknowledge specific IDs or publish 
specific files on checkpoint.
   
   <ul>
       <li>During processing, have two sets of artifacts.
           <ol>
               <li>A "ready set": Artifacts that are ready to be published as 
part of the next checkpoint.
                   Artifacts are added to this set as soon as they are ready to 
be committed.
                   This set is "transient", it is not stored in Flink's state 
persisted anywhere.</li>
               <li>A "pending set": Artifacts being committed with a checkpoint.
                   The actual publishing happens when the checkpoint is 
complete.
                   This is a map of "{@code long => List<Artifact>}", mapping 
from the id of the
                   checkpoint when the artifact was ready to the artifacts. /li>
           </ol>
       <li>On checkpoint, add that set of artifacts from the "ready set" to the 
"pending set",
           associated with the checkpoint ID. The whole "pending set" gets 
stored in the
           checkpoint state.</li>
       <li>On {@code notifyCheckpointComplete()} publish all IDs/artifacts from 
the "pending set"
           up to the checkpoint with that ID. Remove these from the "pending 
set".<li/>
   </ul>
   
   <p>That way, even if some checkpoints did not complete, or if the 
notification that they
   completed got lost, the artifacts will be published as part of the next 
checkpoint that completes.
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
##########
@@ -32,7 +33,8 @@
  * @param <SplitT> The type of the the source splits.
  */
 @PublicEvolving
-public interface SourceReader<T, SplitT extends SourceSplit> extends 
AutoCloseable {
+public interface SourceReader<T, SplitT extends SourceSplit>
+               extends AutoCloseable, CheckpointListener {

Review comment:
       Optional: We could add a default method override for 
`notifyCheckpointComplete()` to indicate that this is not a "core method" but 
somewhat optional for some cases.
   That would also save us overriding it in each test class (with an empty 
method).
   
   I am fine either way, would leave this up to you.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
##########
@@ -29,7 +30,8 @@
  * 2. assign the splits to the source reader.
  */
 @PublicEvolving
-public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> 
extends AutoCloseable {
+public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT>

Review comment:
       Similar as above, could add this default method override for 
`notifyCheckpointComplete()`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * This interface must be implemented by functions/operations that want to 
receive
+ * a commit notification once a checkpoint has been completely acknowledged by 
all
+ * participants.
+ *
+ * <p>Note that the notifications sent to the implementation of this 
interfaces is
+ * based on best effort. That means it is possible that some checkpoint 
completion
+ * or abortion are not sent to the implementation. See the Java doc of each 
method
+ * for more details.
+ */
+@Public
+public interface CheckpointListener {
+
+       /**
+        * This method is called as a notification once a distributed 
checkpoint has been completed.
+        *
+        * <p>Note that any exception during this method will not cause the 
checkpoint to
+        * fail any more.
+        *
+        * <p>It is NOT guaranteed that the implementation will receive the 
notification of
+        * all completed checkpoints. The only guarantee for this listener is 
that if the
+        * implementation is notified of a checkpoint completion with a 
checkpoint ID, then
+        * all the checkpoints whose checkpoint ID is smaller than the 
completed checkpoint
+        * ID can also be considered as successful.
+        *
+        * @param checkpointId The ID of the checkpoint that has been completed.
+        * @throws Exception This method can propagate exceptions, which leads 
to a failure/recovery for
+        *                   the task. Not that this will NOT lead to the 
checkpoint being revoked.
+        */
+       void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+       /**
+        * This method is called as a notification once a distributed 
checkpoint has been aborted.

Review comment:
       Suggested improvement for the JavaDocs:
   
   ```
   This method is called as a notification once a distributed checkpoint has 
been aborted.
   
   <p><b>Important:</b> The fact that a checkpoint has been aborted does NOT 
mean that the
   data and artifacts produced between the previous checkpoint and the aborted 
checkpoint are
   to be discarded. The expected behavior is as if this checkpoint was never 
triggered in the first
   place, and the next successful checkpoint simply covers a longer time span. 
See the 
   "Checkpoint Subsuming Contract" in the {@link CheckpointListener class-level 
JavaDocs}
   for details.
   
   <p>These notifications are "best effort", meaning they can sometimes be 
skipped.
   
   <p>This method is very rarely necessary to implement. The "best effort" 
guarantee, together
   with the fact that this method should not result in discarding any data (per 
the "Checkpoint
   Subsuming Contract") means it is mainly useful for earlier cleanups of 
auxiliary resources.
   One example is to pro-actively clear a local per-checkpoint state cache upon 
checkpoint failure.
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * This interface must be implemented by functions/operations that want to 
receive
+ * a commit notification once a checkpoint has been completely acknowledged by 
all
+ * participants.
+ *
+ * <p>Note that the notifications sent to the implementation of this 
interfaces is
+ * based on best effort. That means it is possible that some checkpoint 
completion
+ * or abortion are not sent to the implementation. See the Java doc of each 
method
+ * for more details.
+ */
+@Public
+public interface CheckpointListener {
+
+       /**

Review comment:
       Suggested improvement for the JavaDocs:
   
   ```
   Notifies the listener that the checkpoint with the given {@code 
checkpointId} completed
   and was committed.
   
   <p>These notifications are "best effort", meaning they can sometimes be 
skipped.
   To behave properly, implementers need to follow the "Checkpoint Subsuming 
Contract".
   Please see the {@link CheckpointListener class-level JavaDocs} for details.
   
   <p>Please note that checkpoints may generally overlap, so you cannot assume 
that the
   {@code notifyCheckpointComplete()} call is always for the latest prior 
checkpoint (or snapshot)
   that was taken on the function/operator implementing this interface.
   It might be for a checkpoint that was triggered earlier. Implementing the 
"Checkpoint
   Subsuming Contract" (see above) properly handles this situation correctly as 
well.
   
   <p>Please note that throwing exceptions from this method will not cause the 
completed
   checkpoint to be revoked. Throwing exceptions will typically cause task/job 
failure and
   trigger recovery.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
##########
@@ -27,15 +28,33 @@
  * This context is the interface through which the {@link 
CheckpointCoordinator} interacts with an
  * {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
  */
-public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, 
CheckpointListener {
 
        void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> 
result) throws Exception;
 
        void afterSourceBarrierInjection(long checkpointId);
 
        void abortCurrentTriggering();
 
-       void checkpointComplete(long checkpointId);
+       /**
+        * This method does not throw exception because the operator 
coordinator implementation

Review comment:
       I would reference the JavaDocs of `CheckpointListener` here, for details.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to