ruanhang1993 commented on code in PR #3619:
URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1853188260
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java:
##########
@@ -55,14 +55,14 @@ public void
testPendingSplitsStateSerializerAndDeserialize() throws IOException
new
PendingSplitsStateSerializer(constructSourceSplitSerializer());
PendingSplitsState streamSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState(
- 6,
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
+ 7,
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
Assert.assertEquals(streamPendingSplitsStateBefore,
streamSplitsStateAfter);
SnapshotPendingSplitsState snapshotPendingSplitsStateBefore =
constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING);
PendingSplitsState snapshotPendingSplitsStateAfter =
pendingSplitsStateSerializer.deserializePendingSplitsState(
- 6,
Review Comment:
Please add the test for version 6 in
`testPendingSplitsStateSerializerCompatibility`.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -397,6 +491,27 @@ && allSnapshotSplitsFinished()) {
}
LOG.info("Snapshot split assigner is turn into finished status.");
}
+
+ if (splitFinishedCheckpointIds != null &&
!splitFinishedCheckpointIds.isEmpty()) {
+ Iterator<Map.Entry<String, Long>> iterator =
+ splitFinishedCheckpointIds.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Long> splitFinishedCheckpointId =
iterator.next();
+ String splitId = splitFinishedCheckpointId.getKey();
+ Long splitCheckpointId = splitFinishedCheckpointId.getValue();
+ if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID
+ && checkpointId >= splitCheckpointId) {
+ // record table-level splits metrics
+ TableId tableId = SnapshotSplit.parseTableId(splitId);
+
enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
+ iterator.remove();
+ }
+ }
+ LOG.info(
Review Comment:
Maybe we do not need this information when the size of
splitFinishedCheckpointIds is 0.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -359,11 +432,31 @@ public void addSplits(Collection<SourceSplitBase> splits)
{
// because they are failed
assignedSplits.remove(split.splitId());
splitFinishedOffsets.remove(split.splitId());
+
+ enumeratorMetrics
+ .getTableMetrics(split.asSnapshotSplit().getTableId())
+ .reprocessSplit(split.splitId());
+ TableId tableId = split.asSnapshotSplit().getTableId();
+
+
enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId());
}
}
@Override
public SnapshotPendingSplitsState snapshotState(long checkpointId) {
+ if (splitFinishedCheckpointIds != null &&
!splitFinishedCheckpointIds.isEmpty()) {
+ for (Map.Entry<String, Long> splitFinishedCheckpointId :
+ splitFinishedCheckpointIds.entrySet()) {
+ if (splitFinishedCheckpointId.getValue() ==
UNDEFINED_CHECKPOINT_ID) {
+ splitFinishedCheckpointId.setValue(checkpointId);
+ }
+ }
+ }
+ LOG.info(
+ "SnapshotSplitAssigner snapshotState on checkpoint {} with
splitFinishedCheckpointIds size {}.",
+ checkpointId,
+ splitFinishedCheckpointIds == null ? 0 :
splitFinishedCheckpointIds.size());
Review Comment:
Maybe we do not need this information when the size of
splitFinishedCheckpointIds is 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]