This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 21ef45fcc [Feature][CDC] MySQL CDC supports deserialization of
multi-tables (#4067)
21ef45fcc is described below
commit 21ef45fcca04b0244994d3a4e45a19cc08966603
Author: hailin0 <[email protected]>
AuthorDate: Tue Feb 7 10:40:30 2023 +0800
[Feature][CDC] MySQL CDC supports deserialization of multi-tables (#4067)
---
.../api/table/catalog/TableIdentifier.java | 2 +-
.../seatunnel/api/table/type/MultipleRowType.java | 56 +++++++++++++++++++++
.../seatunnel/api/table/type/SeaTunnelRow.java | 6 +--
.../apache/seatunnel/api/table/type/SqlType.java | 3 +-
.../cdc/base/source/IncrementalSource.java | 18 +++++++
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 58 ++++++++++++++++------
.../cdc/mysql/source/MySqlIncrementalSource.java | 21 +++++---
.../source/MySqlIncrementalSourceFactory.java | 42 +++++++++++++++-
.../transform/common/SeaTunnelRowAccessor.java | 2 +-
9 files changed, 181 insertions(+), 27 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index e3c60ada1..66ae7df2d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -47,7 +47,7 @@ public final class TableIdentifier implements Serializable {
return databaseName;
}
- public String gettableName() {
+ public String getTableName() {
return tableName;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
new file mode 100644
index 000000000..5d7fe330f
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class MultipleRowType implements SeaTunnelDataType<SeaTunnelRow>,
Iterable<Map.Entry<String, SeaTunnelRowType>> {
+ private final Map<String, SeaTunnelRowType> rowTypeMap;
+
+ public MultipleRowType(String[] tableIds, SeaTunnelRowType[] rowTypes) {
+ Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+ for (int i = 0; i < tableIds.length; i++) {
+ rowTypeMap.put(tableIds[i], rowTypes[i]);
+ }
+ this.rowTypeMap = rowTypeMap;
+ }
+
+ public SeaTunnelRowType getRowType(String tableId) {
+ return rowTypeMap.get(tableId);
+ }
+
+ @Override
+ public Class<SeaTunnelRow> getTypeClass() {
+ return SeaTunnelRow.class;
+ }
+
+ @Override
+ public SqlType getSqlType() {
+ return SqlType.MULTIPLE_ROW;
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, SeaTunnelRowType>> iterator() {
+ return rowTypeMap.entrySet().iterator();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index e78d246d9..2d48c6cb1 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -29,7 +29,7 @@ import java.util.Objects;
public final class SeaTunnelRow implements Serializable {
private static final long serialVersionUID = -1L;
/** Table identifier, used for the source connector that {@link
SupportMultipleTable}. */
- private int tableId = -1;
+ private String tableId;
/** The kind of change that a row describes in a changelog. */
private RowKind kind = RowKind.INSERT;
/** The array to store the actual internal format values. */
@@ -47,7 +47,7 @@ public final class SeaTunnelRow implements Serializable {
this.fields[pos] = value;
}
- public void setTableId(int tableId) {
+ public void setTableId(String tableId) {
this.tableId = tableId;
}
@@ -59,7 +59,7 @@ public final class SeaTunnelRow implements Serializable {
return fields.length;
}
- public int getTableId() {
+ public String getTableId() {
return tableId;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
index ad8b3c347..1e5c05b90 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
@@ -37,5 +37,6 @@ public enum SqlType {
DATE,
TIME,
TIMESTAMP,
- ROW;
+ ROW,
+ MULTIPLE_ROW;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 41e416aa5..8be224ea4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
@@ -53,6 +54,7 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceRead
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import io.debezium.relational.TableId;
+import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,6 +63,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
+@NoArgsConstructor
public abstract class IncrementalSource<T, C extends SourceConfig> implements
SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
protected ReadonlyConfig readonlyConfig;
@@ -76,6 +79,21 @@ public abstract class IncrementalSource<T, C extends
SourceConfig> implements Se
protected StopMode stopMode;
protected DebeziumDeserializationSchema<T> deserializationSchema;
+ protected SeaTunnelDataType<SeaTunnelRow> dataType;
+
+ protected IncrementalSource(ReadonlyConfig options,
SeaTunnelDataType<SeaTunnelRow> dataType) {
+ this.dataType = dataType;
+ this.readonlyConfig = options;
+ this.startupConfig = getStartupConfig(readonlyConfig);
+ this.stopConfig = getStopConfig(readonlyConfig);
+ this.stopMode = stopConfig.getStopMode();
+ this.incrementalParallelism =
readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
+ this.configFactory = createSourceConfigFactory(readonlyConfig);
+ this.dataSourceDialect = createDataSourceDialect(readonlyConfig);
+ this.deserializationSchema =
createDebeziumDeserializationSchema(readonlyConfig);
+ this.offsetFactory = createOffsetFactory(readonlyConfig);
+ }
+
@Override
public final void prepare(Config pluginConfig) throws PrepareFailException
{
this.readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 8bcee31a5..c94d59064 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.connectors.cdc.debezium.row;
import static com.google.common.base.Preconditions.checkNotNull;
+import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -36,6 +38,9 @@ import org.apache.kafka.connect.source.SourceRecord;
import java.io.Serializable;
import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
/**
* Deserialization schema from Debezium object to {@link SeaTunnelRow}.
@@ -52,7 +57,9 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
/**
* Runtime converter that converts Kafka {@link SourceRecord}s into {@link
SeaTunnelRow} consisted of
*/
- private final SeaTunnelRowDebeziumDeserializationConverters converters;
+ private final SeaTunnelRowDebeziumDeserializationConverters
singleRowConverter;
+
+ private final Map<String, SeaTunnelRowDebeziumDeserializationConverters>
multipleRowConverters;
/**
* Validator to validate the row value.
@@ -67,18 +74,35 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
}
SeaTunnelRowDebeziumDeserializeSchema(
- SeaTunnelRowType physicalDataType,
+ SeaTunnelDataType<SeaTunnelRow> physicalDataType,
MetadataConverter[] metadataConverters,
- SeaTunnelRowType resultType,
+ SeaTunnelDataType<SeaTunnelRow> resultType,
ValueValidator validator,
ZoneId serverTimeZone,
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
- this.converters = new SeaTunnelRowDebeziumDeserializationConverters(
- physicalDataType,
- metadataConverters,
- serverTimeZone,
- userDefinedConverterFactory
- );
+
+ SeaTunnelRowDebeziumDeserializationConverters singleRowConverter =
null;
+ Map<String, SeaTunnelRowDebeziumDeserializationConverters>
multipleRowConverters = Collections.emptyMap();
+ if (physicalDataType instanceof MultipleRowType) {
+ multipleRowConverters = new HashMap<>();
+ for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType)
physicalDataType) {
+ SeaTunnelRowDebeziumDeserializationConverters itemRowConverter
= new SeaTunnelRowDebeziumDeserializationConverters(
+ item.getValue(),
+ metadataConverters,
+ serverTimeZone,
+ userDefinedConverterFactory);
+ multipleRowConverters.put(item.getKey(), itemRowConverter);
+ }
+ } else {
+ singleRowConverter = new
SeaTunnelRowDebeziumDeserializationConverters(
+ (SeaTunnelRowType) physicalDataType,
+ metadataConverters,
+ serverTimeZone,
+ userDefinedConverterFactory
+ );
+ }
+ this.singleRowConverter = singleRowConverter;
+ this.multipleRowConverters = multipleRowConverters;
this.resultTypeInfo = checkNotNull(resultType);
this.validator = checkNotNull(validator);
}
@@ -90,28 +114,34 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
Schema valueSchema = record.valueSchema();
Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
- // TODO: multi-table
+ String database = sourceStruct.getString(DATABASE_NAME_KEY);
String tableName =
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+ String tableId = database + ":" + tableName;
+ SeaTunnelRowDebeziumDeserializationConverters converters =
multipleRowConverters.getOrDefault(tableId, singleRowConverter);
if (operation == Envelope.Operation.CREATE || operation ==
Envelope.Operation.READ) {
SeaTunnelRow insert = extractAfterRow(converters, record,
messageStruct, valueSchema);
insert.setRowKind(RowKind.INSERT);
+ insert.setTableId(tableId);
validator.validate(insert, RowKind.INSERT);
collector.collect(insert);
} else if (operation == Envelope.Operation.DELETE) {
SeaTunnelRow delete = extractBeforeRow(converters, record,
messageStruct, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
+ delete.setTableId(tableId);
collector.collect(delete);
} else {
SeaTunnelRow before = extractBeforeRow(converters, record,
messageStruct, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
+ before.setTableId(tableId);
collector.collect(before);
SeaTunnelRow after = extractAfterRow(converters, record,
messageStruct, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
+ after.setTableId(tableId);
collector.collect(after);
}
}
@@ -159,8 +189,8 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
* Builder of {@link SeaTunnelRowDebeziumDeserializeSchema}.
*/
public static class Builder {
- private SeaTunnelRowType physicalRowType;
- private SeaTunnelRowType resultTypeInfo;
+ private SeaTunnelDataType<SeaTunnelRow> physicalRowType;
+ private SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
private MetadataConverter[] metadataConverters = new
MetadataConverter[0];
private ValueValidator validator = (rowData, rowKind) -> {
};
@@ -168,7 +198,7 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
private DebeziumDeserializationConverterFactory
userDefinedConverterFactory =
DebeziumDeserializationConverterFactory.DEFAULT;
- public Builder setPhysicalRowType(SeaTunnelRowType physicalRowType) {
+ public Builder setPhysicalRowType(SeaTunnelDataType<SeaTunnelRow>
physicalRowType) {
this.physicalRowType = physicalRowType;
return this;
}
@@ -178,7 +208,7 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
return this;
}
- public Builder setResultTypeInfo(SeaTunnelRowType resultTypeInfo) {
+ public Builder setResultTypeInfo(SeaTunnelDataType<SeaTunnelRow>
resultTypeInfo) {
this.resultTypeInfo = resultTypeInfo;
return this;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 0d5d5e694..97f8dca85 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -22,7 +22,8 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
@@ -45,6 +46,10 @@ import java.time.ZoneId;
public class MySqlIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> implements SupportParallelism {
static final String IDENTIFIER = "MySQL-CDC";
+ public MySqlIncrementalSource(ReadonlyConfig options,
SeaTunnelDataType<SeaTunnelRow> dataType) {
+ super(options, dataType);
+ }
+
@Override
public String getPluginName() {
return IDENTIFIER;
@@ -65,11 +70,15 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
public DebeziumDeserializationSchema<T>
createDebeziumDeserializationSchema(ReadonlyConfig config) {
JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
String baseUrl = config.get(JdbcCatalogOptions.BASE_URL);
- // TODO: support multi-table
- // TODO: support metadata keys
- MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql",
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(),
jdbcSourceConfig.getPassword(), baseUrl);
- CatalogTable table =
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0),
config.get(JdbcSourceOptions.TABLE_NAME)));
- SeaTunnelRowType physicalRowType =
table.getTableSchema().toPhysicalRowDataType();
+ SeaTunnelDataType<SeaTunnelRow> physicalRowType;
+ if (dataType == null) {
+ // TODO: support metadata keys
+ MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql",
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(),
jdbcSourceConfig.getPassword(), baseUrl);
+ CatalogTable table =
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0),
config.get(JdbcSourceOptions.TABLE_NAME)));
+ physicalRowType = table.getTableSchema().toPhysicalRowDataType();
+ } else {
+ physicalRowType = dataType;
+ }
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
.setPhysicalRowType(physicalRowType)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index b4fa5467f..f97894522 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -19,15 +19,29 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
@AutoService(Factory.class)
-public class MySqlIncrementalSourceFactory implements TableSourceFactory {
+public class MySqlIncrementalSourceFactory implements TableSourceFactory,
SupportMultipleTable {
@Override
public String factoryIdentifier() {
return MySqlIncrementalSource.IDENTIFIER;
@@ -57,4 +71,30 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory {
public Class<? extends SeaTunnelSource> getSourceClass() {
return MySqlIncrementalSource.class;
}
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
+ return () -> {
+ SeaTunnelDataType<SeaTunnelRow> dataType;
+ if (context.getCatalogTables().size() == 1) {
+ dataType = context.getCatalogTables()
+ .get(0)
+ .getTableSchema()
+ .toPhysicalRowDataType();
+ } else {
+ Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+ for (CatalogTable catalogTable : context.getCatalogTables()) {
+ String tableId =
catalogTable.getTableId().getDatabaseName() + ":" +
catalogTable.getTableId().getTableName();
+ rowTypeMap.put(tableId,
catalogTable.getTableSchema().toPhysicalRowDataType());
+ }
+ dataType = new MultipleRowType(rowTypeMap);
+ }
+ return (SeaTunnelSource<T, SplitT, StateT>) new
MySqlIncrementalSource<>(context.getOptions(), dataType);
+ };
+ }
+
+ @Override
+ public Result applyTables(TableFactoryContext context) {
+ return Result.of(context.getCatalogTables(), Collections.emptyList());
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
index 81b2f50f0..0224ef4b8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
@@ -30,7 +30,7 @@ public class SeaTunnelRowAccessor {
return row.getArity();
}
- public int getTableId() {
+ public String getTableId() {
return row.getTableId();
}