Yizhou-Yang opened a new issue, #8817:
URL: https://github.com/apache/inlong/issues/8817
### Description
1.Background:
It is known that the current InLong connector source only implements a MySQL
split fetcher, and the VerVerica source has only open-sourced a MySQL split
fetcher, so the related functionality needs to be self-developed based on the
reference of MySQL. The MySQL implementation is based on binlog, and now the
relevant functions need to be synchronized to the Postgres source, so a method
to obtain Debezium split without using binlog needs to be developed.
The current MySQL split fetcher is divided into two types: MySQLBinlogSplit
and MySQLSnapshotSplit. Postgresql only needs to implement a snapshot split
fetcher. (Schema and schemaless)
Postgresql Debezium CDC uses LSN to benchmark MySQL binlog, so in the
implementation, the data modification position can be tracked through the
encapsulated PostgresOffset class.
This requirement is expected to take 20-30 person-days, and it needs to be
implemented within 15 days due to time urgency.
2. Solution: The relevant parameters of the control layer have been
implemented last week, and only the Oceanus connector needs to be implemented
now. This solution has three major modules: Postgres state context, Postgres
LSN split fetcher, and Postgres enumerator.
In Debezium 1.8.0,
The watermark required for implementing the fetcher is currently planned to
be obtained through io.debezium.engine.format.OffsetContext. public
PostgresOffset getWatermark(OffsetContext offsetContext, String columns) {
Map<String, ?> sourceOffset = offsetContext.offset().getSourceOffset();
PostgresOffset highWatermark = sourceOffset.get(column + ".max"); //do the
nessesary casting and initialization return highwatermark }
The above code is not available and needs to be copied and fine-tuned from
the SourceEventEmitter to generate a high watermark.
Postgres stateful context needs to be implemented to save and read status.
Currently, the state cannot be converted into a context for sharing. There is a
PostgresSourceFetchTaskContext, but the state saving is not complete enough,
and it needs to be fine-tuned.
The columns are a unique identifier, which is passed down by the upstream
query primary key SQL result or null.
When creating an event filter, a PostgresOffset implementation similar to
BinlogOffset.gettimestamp.builder is required. Here, a class similar to the one
that can filter heartbeat by timestamp needs to be implemented using pg's Lsn.
lsn.getStringRepresentation should be similar to mysql.binlogoffset.tostring in
usage.
There is a difficulty here: DataInputDeserializer in cannot obtain the LSN
sequence number from it.
3.Self-test verification:
Run two pg-pg upsert tasks, one with schema and one schemaless, both with
multiple splits (100w data). Print the current split and the current processed
data volume at the end of each split.
Pause the task, continue from the timestamp checkpoint, and then verify that
the split number is consistent with the timestamp.
### Use case
_No response_
### Are you willing to submit PR?
- [X] Yes, I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]