morozov commented on code in PR #4087:
URL: https://github.com/apache/flink-cdc/pull/4087#discussion_r2957366409
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java:
##########
@@ -208,9 +206,7 @@ public void close() {
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
- snapshotSplitAssigner.getAssignedSplits().values().stream()
- .sorted(Comparator.comparing(MySqlSplit::splitId))
- .collect(Collectors.toList());
+ new
ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());
Review Comment:
> requesting the third shard according to the latest implementation method
will request `split3` again. Will there be compatibility issues with this?
It will not request `split3` again. The reader will know that there are 3
split infos in total and it has 2, so it needs to request the remaining one.
**Let's assume that `chunk-meta.group.size = 2`.**
1. The source reader state is `[split3, split2]` (`receivedMetaNum` = 2)
2. The next meta group ID is 1 (`1 / 1 = 1`):
https://github.com/apache/flink-cdc/blob/441eec81a1629ee101edd3ed3ab38bcefd65db9a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java#L110-L113
3. The splits on the enumerator are partitioned by 2:
https://github.com/apache/flink-cdc/blob/4bab36707b781d2221d4d5c6b515b074b72789ec/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java#L304-L306
```js
[
0: [split3, split2]
1: [split1]
]
```
4. The source reader requests group 1, the enumerator returns `[split1]`
5. The expected number of already retrieved elements in the last group on
the reader is 0 (`2 % 2 = 0`):
https://github.com/apache/flink-cdc/blob/2e82317810f1fe62bc75d4bc22247668d9613f20/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L424-L426
6. The source reader discards 0 first elements from the response, so
`[split1]` remains `[split1]`.
7. The source reader appends `[split1]` to `[split3, split2]`, so the
resulting source reader state becomes `[split3, split2, split1]` – correct.
**Let's assume that `chunk-meta.group.size = 3`.**
1. The source reader state is `[split3, split2]` (`receivedMetaNum` = 2)
2. The next meta group ID is 0 (`2 / 3 = 0`)
3. The splits on the enumerator are partitioned by 3:
```js
[
0: [split3, split2, split1]
]
```
4. The source reader requests group 0, the enumerator returns `[split3,
split2, split1]`
5. The expected number of already retrieved elements in the last group on
the reader is 2 (`2 % 3 = 2`):
6. The source reader discards 2 first elements from the response, so
`[split3, split2, split1]` becomes `[split1]`.
7. The source reader appends `[split1]` to `[split3, split2]`, so the
resulting source reader state becomes `[split3, split2, split1]` – correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]