JingGe commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r791758548



##########
File path: 
flink-architecture-tests/violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
##########
@@ -93,7 +93,6 @@ org.apache.flink.connector.file.src.util.Pool.recycler(): 
Returned leaf type org
 
org.apache.flink.connector.file.src.util.Utils.forEachRemaining(org.apache.flink.connector.file.src.reader.BulkFormat$Reader,
 java.util.function.Consumer): Argument leaf type 
org.apache.flink.connector.file.src.reader.BulkFormat$Reader does not satisfy: 
reside outside of package 'org.apache.flink..' or annotated with @Public or 
annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions.builder(): Returned 
leaf type 
org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions$JDBCExactlyOnceOptionsBuilder
 does not satisfy: reside outside of package 'org.apache.flink..' or annotated 
with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder(): Returned leaf 
type org.apache.flink.connector.jdbc.JdbcExecutionOptions$Builder does not 
satisfy: reside outside of package 'org.apache.flink..' or annotated with 
@Public or annotated with @PublicEvolving or annotated with @Deprecated
-org.apache.flink.connector.jdbc.JdbcSink.exactlyOnceSink(java.lang.String, 
org.apache.flink.connector.jdbc.JdbcStatementBuilder, 
org.apache.flink.connector.jdbc.JdbcExecutionOptions, 
org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions, 
org.apache.flink.util.function.SerializableSupplier): Argument leaf type 
org.apache.flink.util.function.SerializableSupplier does not satisfy: reside 
outside of package 'org.apache.flink..' or annotated with @Public or annotated 
with @PublicEvolving or annotated with @Deprecated

Review comment:
       I think these should not be mixed up with this PR

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/Committables.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Internal wrapper to handle the committing of committables.
+ *
+ * @param <CommT> type of the committable
+ */
+@Internal
+public interface Committables<CommT> {
+    /** Returns a summary of the current commit progress. */
+    CommittableSummary<CommT> getSummary();
+
+    /**
+     * Commits all due committables.
+     *
+     * @param fullyReceived only commit committables if all committables of 
this checkpoint for a

Review comment:
       This description seems to be the implementation logic. Instead of it, we 
should describe the purpose of this parameter at the interface level

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {

Review comment:
       It is difficult to understand the different between `getNumPending` and 
`getPendingRequests`, since both of them are dealing with committables, one is 
via `CommittableSummary` and the other is via `CommitRequestImpl`. I didn't 
find a feasible name for it, therefore I would suggest adding java doc to make 
it clear.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables;

Review comment:
       ```suggestion
       private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittablesMap;
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements 
CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> 
subtasksCommittables;
+    private final CommittableCollector<CommT> collector;
+    private final long checkpointId;
+
+    CheckpointCommittablesImpl(CommittableCollector<CommT> collector, long 
checkpointId) {
+        this.collector = checkNotNull(collector);
+        this.checkpointId = checkpointId;
+        this.subtasksCommittables = new HashMap<>();
+    }
+
+    CheckpointCommittablesImpl(
+            Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables, 
long checkpointId) {

Review comment:
       checkpointId could be get from `SubtaskCommittables`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements 
CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> 
subtasksCommittables;
+    private final CommittableCollector<CommT> collector;
+    private final long checkpointId;
+
+    CheckpointCommittablesImpl(CommittableCollector<CommT> collector, long 
checkpointId) {
+        this.collector = checkNotNull(collector);
+        this.checkpointId = checkpointId;
+        this.subtasksCommittables = new HashMap<>();
+    }
+
+    CheckpointCommittablesImpl(
+            Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables, 
long checkpointId) {
+        this.subtasksCommittables = checkNotNull(subtasksCommittables);
+        // dummy collector, never used
+        this.collector = new CommittableCollector<>(0, 1);
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    Collection<SubtaskCommittables<CommT>> getSubtasks() {
+        return subtasksCommittables.values();
+    }
+
+    void addSummary(CommittableSummary<CommT> summary) {

Review comment:
       should actually be upsertSummary(...)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */
+    CommittableCollector(Map<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables) {
+        this(checkpointCommittables, 0, 1);
+    }
+
+    /** For deep-copy. */
+    CommittableCollector(
+            Map<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables,
+            int subtaskId,
+            int numberOfSubtasks) {
+        this.checkpointCommittables = new 
TreeMap<>(checkNotNull(checkpointCommittables));
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} based on the current runtime 
information. This method
+     * should be used for to instantiate a collector for all Sink V2.
+     *
+     * @param context holding runtime of information
+     * @param <CommT> type of the committable
+     * @return {@link CommittableCollector}
+     */
+    public static <CommT> CommittableCollector<CommT> of(RuntimeContext 
context) {
+        return new CommittableCollector<>(
+                context.getIndexOfThisSubtask(), 
context.getNumberOfParallelSubtasks());
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} for a list of committables. This 
method is mainly used
+     * to create a collector from the state of Sink V1.
+     *
+     * @param r list of committables
+     * @param <CommT> type of committables
+     * @return {@link CommittableCollector}
+     */
+    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> r) {
+        CommittableCollector<CommT> committableCollector = new 
CommittableCollector<>(0, 1);
+        // add a checkpoint with the lowest checkpoint id, this will be merged 
into the next
+        // checkpoint data, subtask id is arbitrary
+        CommittableSummary<CommT> summary =
+                new CommittableSummary<>(
+                        0, 1, InitContext.INITIAL_CHECKPOINT_ID, r.size(), 
r.size(), 0);
+        committableCollector.addSummary(summary);
+        SubtaskCommittables<CommT> subtask =
+                committableCollector
+                        .getCheckpointCommittables(summary)
+                        .getSubtaskCommittables(summary.getSubtaskId());
+        r.forEach(subtask::add);
+        return committableCollector;
+    }
+
+    /**
+     * Adds a {@link CommittableMessage} to the collector to hold it until 
emission.
+     *
+     * @param message either {@link CommittableSummary} or {@link 
CommittableWithLineage}
+     */
+    public void addMessage(CommittableMessage<CommT> message) {

Review comment:
       Nit

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittables.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This interface adds checkpoint meta information to the committable.
+ *
+ * @param <CommT> type of the committable
+ */
+@Internal
+public interface CheckpointCommittables<CommT> extends Committables<CommT> {

Review comment:
       From OOP point of view, this interface introduced the checkpoint, which 
means `Committables` must be at a higher level above it and has nothing to do 
with the checkpoint. But the fact is that `Committables` has already been 
dealing with `CommittableSummary` and `CommittableWithLineage` that can all 
provide checkpointId. If it was intended to distinguish between them, more info 
will be required to describe when to use `Committables` and when to use 
`CheckpointCommittables`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {

Review comment:
       java doc

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */

Review comment:
       Java doc is required to describe the deserialization why `subtaskId` and 
`numberOfSubtasks` are not required for serde. Add reference to the class where 
it is handled by custom serialization/deserialization, i.e. 
`CommittableCollectorSerializer`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {
+        return summary.getNumberOfCommittables() - (numDrained + numFailed);
+    }
+
+    int getNumFailed() {
+        return numFailed;
+    }
+
+    boolean isFinished() {
+        return getNumPending() == 0;
+    }
+
+    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
+        return requests.stream().filter(c -> !c.isFinished());
+    }
+
+    List<CommittableWithLineage<CommT>> drainCommitted() {
+        List<CommittableWithLineage<CommT>> committed = new 
ArrayList<>(requests.size());
+        for (Iterator<CommitRequestImpl<CommT>> iterator = requests.iterator();
+                iterator.hasNext(); ) {
+            CommitRequestImpl<CommT> request = iterator.next();
+            if (!request.isFinished()) {
+                continue;
+            }
+            if (request.getState() == CommitRequestState.FAILED) {
+                numFailed += 1;

Review comment:
       iterator.remove()? otherwise, failed commitables will be counted twice

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/Committables.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Internal wrapper to handle the committing of committables.

Review comment:
       It seems that there are now two meanings of the word "comittables", 
since the new interface `Committables` has been created. Afaik, "commitables" 
has been used as a conceptual name previously very often. There is no 
corresponding domain entity developed for it and just let it open for users to 
define via the generic type `CommT`.
   
   Since all methods in this interface are working with subclasses of 
`ComittableMessage`, this interface is actually responsible for managing all 
commitables and all kinds of `ComittableMessage`, i.e. `ComittableSummary` and 
`ComittableWithLineage`, which turns out that calling this interface as 
`ComittablesManager` would be another naming option that is easier to 
understand the concept behind.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables;

Review comment:
       add single line comment for the key -> checkpointId

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements 
CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> 
subtasksCommittables;

Review comment:
       ```suggestion
       private final Map<Integer, SubtaskCommittables<CommT>> 
subtasksCommittablesMap;
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {
+        return summary.getNumberOfCommittables() - (numDrained + numFailed);
+    }
+
+    int getNumFailed() {
+        return numFailed;
+    }
+
+    boolean isFinished() {
+        return getNumPending() == 0;
+    }
+
+    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
+        return requests.stream().filter(c -> !c.isFinished());
+    }
+
+    List<CommittableWithLineage<CommT>> drainCommitted() {

Review comment:
       I think java doc is required for this method. Will this method be called 
multiple times? How to make the numbers being synched with the `summary`, i.e. 
`hasReceivedAll()`? Will the failed request be retried?
   
   There many numbers defined in this class that need more detailed description 
about the relationship between them:
   1. total number of commitables in `summary`
   2. total number of `CommitRequestImpl` in `request`
      2.1 number of unfinished request
      2.2 number of failed request
      2.3 number of request that will be drained and removed
   3. numFailed, accumulated if this method would be called multiple time.
   4. numDrained, accumulated if this method would be called multiple time.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables;
+    private final int subtaskId;

Review comment:
       what is the purpose of using `subtaskId`? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {

Review comment:
       As additional info to other comments: this method opens the door to let 
the total numbers of request be greater than the total number of commitables in 
the `summary`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {
+        return summary.getNumberOfCommittables() - (numDrained + numFailed);
+    }
+
+    int getNumFailed() {
+        return numFailed;
+    }
+
+    boolean isFinished() {
+        return getNumPending() == 0;
+    }
+
+    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
+        return requests.stream().filter(c -> !c.isFinished());
+    }
+
+    List<CommittableWithLineage<CommT>> drainCommitted() {
+        List<CommittableWithLineage<CommT>> committed = new 
ArrayList<>(requests.size());
+        for (Iterator<CommitRequestImpl<CommT>> iterator = requests.iterator();
+                iterator.hasNext(); ) {
+            CommitRequestImpl<CommT> request = iterator.next();
+            if (!request.isFinished()) {
+                continue;
+            }
+            if (request.getState() == CommitRequestState.FAILED) {
+                numFailed += 1;
+                continue;
+            } else {
+                committed.add(
+                        new CommittableWithLineage<>(
+                                request.getCommittable(),
+                                summary.getCheckpointId().isPresent()
+                                        ? summary.getCheckpointId().getAsLong()
+                                        : null,
+                                summary.getSubtaskId()));
+            }
+            iterator.remove();
+        }
+
+        numDrained += committed.size();
+        return committed;
+    }
+
+    CommittableSummary<CommT> getSummary() {
+        return summary;
+    }
+
+    void setSummary(CommittableSummary<CommT> summary) {
+        this.summary = summary;
+    }
+
+    int getNumDrained() {
+        return numDrained;
+    }
+
+    Deque<CommitRequestImpl<CommT>> getRequests() {
+        return requests;
+    }
+
+    SubtaskCommittables<CommT> merge(SubtaskCommittables<CommT> other) {
+        CommittableSummary<CommT> otherSummary = other.getSummary();
+        checkArgument(otherSummary.getSubtaskId() == 
this.summary.getSubtaskId());
+        OptionalLong checkpointId = this.summary.getCheckpointId();
+        this.summary =
+                new CommittableSummary<>(
+                        this.summary.getSubtaskId(),
+                        this.summary.getNumberOfCommittables(),

Review comment:
       Should this be the `numberOfSubtasks`?  How does the merge work, if the 
other has a different numberOfSubtasks? Or should check that the 
numberOfSubtasks should be equal?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements 
CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> 
subtasksCommittables;
+    private final CommittableCollector<CommT> collector;
+    private final long checkpointId;
+
+    CheckpointCommittablesImpl(CommittableCollector<CommT> collector, long 
checkpointId) {
+        this.collector = checkNotNull(collector);
+        this.checkpointId = checkpointId;
+        this.subtasksCommittables = new HashMap<>();
+    }
+
+    CheckpointCommittablesImpl(
+            Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables, 
long checkpointId) {
+        this.subtasksCommittables = checkNotNull(subtasksCommittables);
+        // dummy collector, never used
+        this.collector = new CommittableCollector<>(0, 1);
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    Collection<SubtaskCommittables<CommT>> getSubtasks() {

Review comment:
       It would be better to use a suitable name, since Subtask has its won 
domain entity meaning. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements 
CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> 
subtasksCommittables;

Review comment:
       // key -> subtaskId

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */
+    CommittableCollector(Map<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables) {
+        this(checkpointCommittables, 0, 1);
+    }
+
+    /** For deep-copy. */
+    CommittableCollector(
+            Map<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables,
+            int subtaskId,
+            int numberOfSubtasks) {
+        this.checkpointCommittables = new 
TreeMap<>(checkNotNull(checkpointCommittables));
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} based on the current runtime 
information. This method
+     * should be used for to instantiate a collector for all Sink V2.
+     *
+     * @param context holding runtime of information
+     * @param <CommT> type of the committable
+     * @return {@link CommittableCollector}
+     */
+    public static <CommT> CommittableCollector<CommT> of(RuntimeContext 
context) {
+        return new CommittableCollector<>(
+                context.getIndexOfThisSubtask(), 
context.getNumberOfParallelSubtasks());
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} for a list of committables. This 
method is mainly used
+     * to create a collector from the state of Sink V1.
+     *
+     * @param r list of committables
+     * @param <CommT> type of committables
+     * @return {@link CommittableCollector}
+     */
+    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> r) {
+        CommittableCollector<CommT> committableCollector = new 
CommittableCollector<>(0, 1);
+        // add a checkpoint with the lowest checkpoint id, this will be merged 
into the next
+        // checkpoint data, subtask id is arbitrary
+        CommittableSummary<CommT> summary =
+                new CommittableSummary<>(
+                        0, 1, InitContext.INITIAL_CHECKPOINT_ID, r.size(), 
r.size(), 0);
+        committableCollector.addSummary(summary);
+        SubtaskCommittables<CommT> subtask =
+                committableCollector
+                        .getCheckpointCommittables(summary)
+                        .getSubtaskCommittables(summary.getSubtaskId());
+        r.forEach(subtask::add);
+        return committableCollector;
+    }
+
+    /**
+     * Adds a {@link CommittableMessage} to the collector to hold it until 
emission.
+     *
+     * @param message either {@link CommittableSummary} or {@link 
CommittableWithLineage}
+     */
+    public void addMessage(CommittableMessage<CommT> message) {

Review comment:
       ```suggestion
       public void addCommittableMessage(CommittableMessage<CommT> message) {
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across 
checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */
+    CommittableCollector(Map<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables) {
+        this(checkpointCommittables, 0, 1);
+    }
+
+    /** For deep-copy. */
+    CommittableCollector(
+            Map<Long, CheckpointCommittablesImpl<CommT>> 
checkpointCommittables,
+            int subtaskId,
+            int numberOfSubtasks) {
+        this.checkpointCommittables = new 
TreeMap<>(checkNotNull(checkpointCommittables));
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} based on the current runtime 
information. This method
+     * should be used for to instantiate a collector for all Sink V2.
+     *
+     * @param context holding runtime of information
+     * @param <CommT> type of the committable
+     * @return {@link CommittableCollector}
+     */
+    public static <CommT> CommittableCollector<CommT> of(RuntimeContext 
context) {
+        return new CommittableCollector<>(
+                context.getIndexOfThisSubtask(), 
context.getNumberOfParallelSubtasks());
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} for a list of committables. This 
method is mainly used
+     * to create a collector from the state of Sink V1.
+     *
+     * @param r list of committables
+     * @param <CommT> type of committables
+     * @return {@link CommittableCollector}
+     */
+    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> r) {
+        CommittableCollector<CommT> committableCollector = new 
CommittableCollector<>(0, 1);
+        // add a checkpoint with the lowest checkpoint id, this will be merged 
into the next
+        // checkpoint data, subtask id is arbitrary
+        CommittableSummary<CommT> summary =
+                new CommittableSummary<>(
+                        0, 1, InitContext.INITIAL_CHECKPOINT_ID, r.size(), 
r.size(), 0);
+        committableCollector.addSummary(summary);
+        SubtaskCommittables<CommT> subtask =
+                committableCollector
+                        .getCheckpointCommittables(summary)
+                        .getSubtaskCommittables(summary.getSubtaskId());
+        r.forEach(subtask::add);
+        return committableCollector;
+    }
+
+    /**
+     * Adds a {@link CommittableMessage} to the collector to hold it until 
emission.
+     *
+     * @param message either {@link CommittableSummary} or {@link 
CommittableWithLineage}
+     */
+    public void addMessage(CommittableMessage<CommT> message) {
+        if (message instanceof CommittableSummary) {
+            addSummary((CommittableSummary<CommT>) message);
+        } else if (message instanceof CommittableWithLineage) {
+            addCommittable((CommittableWithLineage<CommT>) message);
+        }
+    }
+
+    /**
+     * Returns all {@link CheckpointCommittables} until the requested 
checkpoint id.
+     *
+     * @param checkpointId counter
+     * @return collection of {@link CheckpointCommittables}
+     */
+    public Collection<? extends CheckpointCommittables<CommT>> 
getCheckpointCommittablesUpTo(
+            long checkpointId) {
+        // clean up fully committed previous checkpoints
+        // this wouldn't work with concurrent unaligned checkpoints
+        Collection<CheckpointCommittablesImpl<CommT>> checkpoints =
+                checkpointCommittables.headMap(checkpointId, true).values();
+        checkpoints.removeIf(CheckpointCommittablesImpl::isFinished);
+        return checkpoints;
+    }
+
+    /**
+     * Returns all {@link CheckpointCommittables} that are currently hold by 
the collector.
+     *
+     * @return collection of {@link CheckpointCommittables}
+     */
+    @Nullable
+    public Collection<? extends CheckpointCommittables<CommT>> 
getEndOfInputCommittables() {
+        return getCheckpointCommittablesUpTo(EOI);
+    }
+
+    /**
+     * Returns whether all {@link CheckpointCommittables} currently hold by 
the collector are either
+     * committed or failed.
+     *
+     * @return state of the {@link CheckpointCommittables}
+     */
+    public boolean isFinished() {
+        return checkpointCommittables.values().stream()
+                .allMatch(CheckpointCommittablesImpl::isFinished);
+    }
+
+    /**
+     * Merges all information from an external collector into this collector.
+     *
+     * <p>This method is important during recovery from existing state.
+     *
+     * @param cc other {@link CommittableCollector}
+     */
+    public void merge(CommittableCollector<CommT> cc) {
+        for (Entry<Long, CheckpointCommittablesImpl<CommT>> checkpointEntry :
+                cc.checkpointCommittables.entrySet()) {
+            checkpointCommittables.merge(
+                    checkpointEntry.getKey(),
+                    checkpointEntry.getValue(),
+                    CheckpointCommittablesImpl::merge);
+        }
+    }
+
+    /**
+     * Returns number of subtasks.
+     *
+     * @return number of subtasks
+     */
+    public int getNumberOfSubtasks() {
+        return numberOfSubtasks;
+    }
+
+    /**
+     * Returns subtask id.
+     *
+     * @return subtask id.
+     */
+    public int getSubtaskId() {
+        return subtaskId;
+    }
+
+    /**
+     * Returns a new committable collector that deep copies all internals.
+     *
+     * @return {@link CommittableCollector}
+     */
+    public CommittableCollector<CommT> copy() {
+        return new CommittableCollector<>(
+                checkpointCommittables.entrySet().stream()
+                        .map(e -> Tuple2.of(e.getKey(), e.getValue().copy()))
+                        .collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1)),
+                subtaskId,
+                numberOfSubtasks);
+    }
+
+    Collection<CheckpointCommittablesImpl<CommT>> getCheckpointCommittables() {
+        return checkpointCommittables.values();
+    }
+
+    private void addSummary(CommittableSummary<CommT> summary) {
+        checkpointCommittables
+                .computeIfAbsent(
+                        summary.getCheckpointId().orElse(EOI),
+                        key ->
+                                new CheckpointCommittablesImpl<>(
+                                        this, 
summary.getCheckpointId().orElse(EOI)))
+                .addSummary(summary);
+    }
+
+    private void addCommittable(CommittableWithLineage<CommT> committable) {
+        getCheckpointCommittables(committable).addCommittable(committable);
+    }
+
+    private CheckpointCommittablesImpl<CommT> getCheckpointCommittables(
+            CommittableMessage<CommT> committable) {

Review comment:
       ```suggestion
               OptionalLong checkpointId) {
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+
+/**
+ * Internal implementation to commit a specific committable and handle the 
response.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommitRequestImpl<CommT> implements 
Committer.CommitRequest<CommT> {
+
+    private CommT committable;
+    private int numRetries;
+    private CommitRequestState state;
+
+    protected CommitRequestImpl(CommT committable) {
+        this.committable = committable;
+        state = CommitRequestState.RECEIVED;
+    }
+
+    protected CommitRequestImpl(CommT committable, int numRetries, 
CommitRequestState state) {
+        this.committable = committable;
+        this.numRetries = numRetries;
+        this.state = state;
+    }
+
+    boolean isFinished() {
+        return state.isFinalState();
+    }
+
+    CommitRequestState getState() {
+        return state;
+    }
+
+    @Override
+    public CommT getCommittable() {
+        return committable;
+    }
+
+    @Override
+    public int getNumberOfRetries() {
+        return numRetries;
+    }
+
+    @Override
+    public void signalFailedWithKnownReason(Throwable t) {
+        state = CommitRequestState.FAILED;
+        // add metric later

Review comment:
       ```suggestion
           // TODO(...) add metric later
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to