gaoyunhaii commented on a change in pull request #18805:
URL: https://github.com/apache/flink/pull/18805#discussion_r820063655



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
##########
@@ -185,12 +219,36 @@ private void emitCommittables(Long checkpointId) throws 
IOException, Interrupted
         Collection<CommT> committables =
                 ((PrecommittingSinkWriter<?, CommT>) 
sinkWriter).prepareCommit();
         StreamingRuntimeContext runtimeContext = getRuntimeContext();
-        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
+        final int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
+        final int numberOfParallelSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
+
+        // Emit only committable summary if there are legacy committables
+        if (!legacyCommittables.isEmpty()) {
+            emit(

Review comment:
       Since it is called from `prepareSnapshotPreBarrier`, thus if the job is 
restored from savepoint, the next checkpoint should start from at least the 
`restored checkpoint id + 1`, thus it should always larger than 
`InitContext.INITIAL_CHECKPOINT_ID` ?
   
   The main issues of `checkpointId == InitContext.INITIAL_CHECKPOINT_ID` is 
that in this case there will be two summary messages of the same checkpoint id, 
which would cause exceptions in `CommitterOperator`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to