ashulin opened a new issue, #4050: URL: https://github.com/apache/incubator-seatunnel/issues/4050
### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### Describe the proposal ## Multiple table proposal ### Backgroud & Motivation In the CDC scenario, we found that when there are too many CDC Sources, too many database links will be occupied, which will affect the stability of the database. For this reason, we expect to reduce the number of Sources when synchronizing all tables. Since the current design is that each Source synchronizes one table, we expect one Source to handle multiple tables. **Advantages**: take up fewer database connections, reduce database pressure **Disadvantage**: In the `SeaTunnel zeta`, multiple tables will be in a pipeline, and the granularity of fault tolerance will become larger. ### Overall Design  1. Load `CatalogFactory` SPI through Config file. 2. Create `Catalog` using `CatalogFactory`. 3. Create `CatalogTable`s with `Catalog` and configured options. - If the table does not exist in the sink, create an inferred CatalogTable in the sink. 4. Fill the obtained `CatalogTable`s into `TableFactoryContext`, and use them in`TableSinkFactory`, `TableSourceFactory`, `TableTransformFactory`. 5. If Source supports multiple tables, its `TableSourceFactory` must implement the `SupportMultipleTable` interface, use the information of multiple CatalogTables to create `MultipleRowType`, and `SeaTunnelSource#getProducedType` will return `MultipleRowType`. 6. Use `MultipleRowType` inside Source to deserialize data into `SeaTunnelRow`, and add table name to `SeaTunnelRow`. 7. The engine distributes data according to `MultipleRowType` and `SeaTunnelRow`'s table name. #### Config design ```config source { MySQL-CDC { parallelism = 1 // RegEx to get multiple tables database-name = "inventory_.*" table-name = ".*" result_table_name = "cdc1" } } transform { DistributionTransform { source_table_name = "cdc1" result_table_name = "transform1" } Filter { source_table_name = "transform1#test" result_table_name = "filter1#test" } } sink { // sink option template Doris { source_table_name = "transform1" nodeUrls = ["e2e_dorisdb:8030"] username = root password = "" database = "test" batch_max_rows = 100 doris.config = { format = "JSON" strip_outer_array = true } } // Specify options for a single table Doris { source_table_name = "filter1#test" nodeUrls = ["e2e_dorisdb:8030"] username = root password = "" database = "test" batch_max_rows = 200 doris.config = { format = "JSON" strip_outer_array = true } } } ``` #### Related pseudo-code ```java // For Source deserialization and Row distribution public class MultipleRowType implements SeaTunnelDataType<SeaTunnelRow> { private final String[] tableNames; private final SeaTunnelRowType[] rowTypes; } ``` ```java // create, get, update catalog public interface Catalog { // query List<String> listDatabases() throws CatalogException; List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException; // get catalog table CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException; // update catalog table void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; } ``` ```java // Declare that the Source supports multiple tables, and control the number of tables by itself public interface SupportMultipleTable { /** * A connector can pick tables and return the accepted and remaining tables. */ Result applyTables(TableFactoryContext context); final class Result { private final List<CatalogTable> acceptedTables; private final List<CatalogTable> remainingTables; private Result( List<CatalogTable> acceptedTables, List<CatalogTable> remainingTables) { this.acceptedTables = acceptedTables; this.remainingTables = remainingTables; } } } ``` #### Adapter ##### SeaTunnel Zeta ```java // pseudo-code public class DistributionTransform extends SeaTunnelTransform<Record<?>> { // Use MultipleRowType to distribute records to corresponding data channels private MultipleRowType multiRowType; } ``` ##### Flink - Operator chain: avoid row serialization of different structures - OutputTag & Context#output: Use side-output streams to distribute data to corresponding channels ### Task list - ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
