StephanEwen commented on a change in pull request #13784: URL: https://github.com/apache/flink/pull/13784#discussion_r516139098
########## File path: flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Public; + +/** + * This interface must be implemented by functions/operations that want to receive Review comment: Suggested improvement for the JavaDocs: ``` This interface let's functions/operators receive commit- or abort notifications for checkpoints. <p><b>Important</b>: Just using state (like {@link ValueState}, {@link ListState}, etc.) in functions/operators does NOT require implementing this interface. <p>This interface is typically only needed for transactional interaction with the "outside world", like committing external side effects on checkpoints. An example is committing external transactions once a checkpoint completes. <h3>Invocation Guarantees</h3> <p>It is NOT guaranteed that the implementation will receive a notification for each completed or aborted checkpoint. While these notifications come in most cases, notifications might not happen, for example, when a failure/restore happens directly after a checkpoint completed. <p>To handle this correctly, implementation should follow the "Checkpoint Subsuming Contract" described below. <h3>Exceptions</h3> <p>The notifications from this interface come "after the fact", meaning after the checkpoint has been aborted or completed. Throwing an exception will not change the completion/abortion of the checkpoint. <p>Exceptions thrown from this method result in task- or job failure and recovery. <h3>Checkpoint Subsuming Contract</h3> <p>Checkpoint IDs are strictly increasing. A checkpoint with higher ID always subsumes a checkpoint with lower ID. For example, when checkpoint T is confirmed complete, the code can assume that no checkpoints with lower ID (T-1, T-2, etc.) are pending any more. <b>No checkpoint with lower ID will ever be committed after a checkpoint with a higher ID.</b> <p>This does not necessarily mean that all of the previous checkpoints actually completed successfully. It is also possible that some checkpoint timed out or was not fully acknowledged by all tasks. Implementations must then behave as if that checkpoint did not happen. The recommended way to do this is to let the completion of a new checkpoint (higher ID) subsume the completion of all earlier checkpoints (lower ID). <p>This property is easy to achieve for cases where increasing "offsets", "watermarks", or other progress indicators are communicated on checkpoint completion. A newer checkpoint will have a higher "offset" (more progress) than the previous checkpoint, so it automatically subsumes the previous one. Remember the "offset to commit" for a checkpoint ID and commit it when that specific checkpoint (by ID) gets the notification that it is complete. <p>If you need to publish some specific artifacts (like files) or acknowledge some specific IDs after a checkpoint, you can follow a pattern like below. <h3>Implementing Checkpoint Subsuming for Committing Artifacts</h3> <p>The following is a sample pattern how applications can publish specific artifacts on checkpoint. Examples would be operators that acknowledge specific IDs or publish specific files on checkpoint. <ul> <li>During processing, have two sets of artifacts. <ol> <li>A "ready set": Artifacts that are ready to be published as part of the next checkpoint. Artifacts are added to this set as soon as they are ready to be committed. This set is "transient", it is not stored in Flink's state persisted anywhere.</li> <li>A "pending set": Artifacts being committed with a checkpoint. The actual publishing happens when the checkpoint is complete. This is a map of "{@code long => List<Artifact>}", mapping from the id of the checkpoint when the artifact was ready to the artifacts. /li> </ol> <li>On checkpoint, add that set of artifacts from the "ready set" to the "pending set", associated with the checkpoint ID. The whole "pending set" gets stored in the checkpoint state.</li> <li>On {@code notifyCheckpointComplete()} publish all IDs/artifacts from the "pending set" up to the checkpoint with that ID. Remove these from the "pending set".<li/> </ul> <p>That way, even if some checkpoints did not complete, or if the notification that they completed got lost, the artifacts will be published as part of the next checkpoint that completes. ``` ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java ########## @@ -32,7 +33,8 @@ * @param <SplitT> The type of the the source splits. */ @PublicEvolving -public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable { +public interface SourceReader<T, SplitT extends SourceSplit> + extends AutoCloseable, CheckpointListener { Review comment: Optional: We could add a default method override for `notifyCheckpointComplete()` to indicate that this is not a "core method" but somewhat optional for some cases. That would also save us overriding it in each test class (with an empty method). I am fine either way, would leave this up to you. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java ########## @@ -29,7 +30,8 @@ * 2. assign the splits to the source reader. */ @PublicEvolving -public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable { +public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> Review comment: Similar as above, could add this default method override for `notifyCheckpointComplete()`. ########## File path: flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Public; + +/** + * This interface must be implemented by functions/operations that want to receive + * a commit notification once a checkpoint has been completely acknowledged by all + * participants. + * + * <p>Note that the notifications sent to the implementation of this interfaces is + * based on best effort. That means it is possible that some checkpoint completion + * or abortion are not sent to the implementation. See the Java doc of each method + * for more details. + */ +@Public +public interface CheckpointListener { + + /** + * This method is called as a notification once a distributed checkpoint has been completed. + * + * <p>Note that any exception during this method will not cause the checkpoint to + * fail any more. + * + * <p>It is NOT guaranteed that the implementation will receive the notification of + * all completed checkpoints. The only guarantee for this listener is that if the + * implementation is notified of a checkpoint completion with a checkpoint ID, then + * all the checkpoints whose checkpoint ID is smaller than the completed checkpoint + * ID can also be considered as successful. + * + * @param checkpointId The ID of the checkpoint that has been completed. + * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for + * the task. Not that this will NOT lead to the checkpoint being revoked. + */ + void notifyCheckpointComplete(long checkpointId) throws Exception; + + /** + * This method is called as a notification once a distributed checkpoint has been aborted. Review comment: Suggested improvement for the JavaDocs: ``` This method is called as a notification once a distributed checkpoint has been aborted. <p><b>Important:</b> The fact that a checkpoint has been aborted does NOT mean that the data and artifacts produced between the previous checkpoint and the aborted checkpoint are to be discarded. The expected behavior is as if this checkpoint was never triggered in the first place, and the next successful checkpoint simply covers a longer time span. See the "Checkpoint Subsuming Contract" in the {@link CheckpointListener class-level JavaDocs} for details. <p>These notifications are "best effort", meaning they can sometimes be skipped. <p>This method is very rarely necessary to implement. The "best effort" guarantee, together with the fact that this method should not result in discarding any data (per the "Checkpoint Subsuming Contract") means it is mainly useful for earlier cleanups of auxiliary resources. One example is to pro-actively clear a local per-checkpoint state cache upon checkpoint failure. ``` ########## File path: flink-core/src/main/java/org/apache/flink/api/common/state/CheckpointListener.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Public; + +/** + * This interface must be implemented by functions/operations that want to receive + * a commit notification once a checkpoint has been completely acknowledged by all + * participants. + * + * <p>Note that the notifications sent to the implementation of this interfaces is + * based on best effort. That means it is possible that some checkpoint completion + * or abortion are not sent to the implementation. See the Java doc of each method + * for more details. + */ +@Public +public interface CheckpointListener { + + /** Review comment: Suggested improvement for the JavaDocs: ``` Notifies the listener that the checkpoint with the given {@code checkpointId} completed and was committed. <p>These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the {@link CheckpointListener class-level JavaDocs} for details. <p>Please note that checkpoints may generally overlap, so you cannot assume that the {@code notifyCheckpointComplete()} call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well. <p>Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery. ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java ########## @@ -27,15 +28,33 @@ * This context is the interface through which the {@link CheckpointCoordinator} interacts with an * {@link OperatorCoordinator} during checkpointing and checkpoint restoring. */ -public interface OperatorCoordinatorCheckpointContext extends OperatorInfo { +public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, CheckpointListener { void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception; void afterSourceBarrierInjection(long checkpointId); void abortCurrentTriggering(); - void checkpointComplete(long checkpointId); + /** + * This method does not throw exception because the operator coordinator implementation Review comment: I would reference the JavaDocs of `CheckpointListener` here, for details. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org