gaoyunhaii commented on a change in pull request #18805: URL: https://github.com/apache/flink/pull/18805#discussion_r820063655
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java ########## @@ -185,12 +219,36 @@ private void emitCommittables(Long checkpointId) throws IOException, Interrupted Collection<CommT> committables = ((PrecommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit(); StreamingRuntimeContext runtimeContext = getRuntimeContext(); - int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); + final int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); + final int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks(); + + // Emit only committable summary if there are legacy committables + if (!legacyCommittables.isEmpty()) { + emit( Review comment: Since it is called from `prepareSnapshotPreBarrier`, thus if the job is restored from savepoint, the next checkpoint should start from at least the `restored checkpoint id + 1`, thus it should always larger than `InitContext.INITIAL_CHECKPOINT_ID` ? The main issues of `checkpointId == InitContext.INITIAL_CHECKPOINT_ID` is that in this case there will be two summary messages of the same checkpoint id, which would cause exceptions in `CommitterOperator`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org