This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 1636afad0 [improve][api] support for create sink (#4171)
1636afad0 is described below
commit 1636afad078d012342ad5ed729356cf27d58ebd2
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Feb 20 18:15:57 2023 +0800
[improve][api] support for create sink (#4171)
---
.../apache/seatunnel/api/table/connector/TableSource.java | 3 +++
.../apache/seatunnel/api/table/factory/FactoryUtil.java | 15 +++++++++++----
.../seatunnel/api/table/factory/TableFactoryContext.java | 7 +++++++
3 files changed, 21 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index d9b6294c4..acf7b4c23 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.api.source.SourceSplit;
import java.io.Serializable;
+/**
+ * Used to support authentication and processing of {@link
SupportReadingMetadata}
+ */
public interface TableSource<T, SplitT extends SourceSplit, StateT extends
Serializable> {
SeaTunnelSource<T, SplitT, StateT> createSource();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 46596ff63..5da8f3c48 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -103,10 +103,17 @@ public final class FactoryUtil {
}
public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createAndPrepareSink(
- ClassLoader classLoader, String factoryIdentifier) {
- // todo: do we need to set table?
- TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
factory = discoverFactory(classLoader, TableSinkFactory.class,
factoryIdentifier);
- return factory.createSink(null).createSink();
+ CatalogTable catalogTable,
+ ClassLoader classLoader,
+ ReadonlyConfig options,
+ String factoryIdentifier) {
+ try {
+ TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
factory = discoverFactory(classLoader, TableSinkFactory.class,
factoryIdentifier);
+ TableFactoryContext context = new
TableFactoryContext(Collections.singletonList(catalogTable), options,
classLoader);
+ return factory.createSink(context).createSink();
+ } catch (Throwable t) {
+ throw new FactoryException(String.format("Unable to create a sink
for identifier '%s'.", factoryIdentifier), t);
+ }
}
public static Catalog createCatalog(String catalogName,
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index 97db8a0e6..77d529e84 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -54,4 +54,11 @@ public class TableFactoryContext {
public List<CatalogTable> getCatalogTables() {
return catalogTables;
}
+
+ /**
+ * @return single table.
+ */
+ public CatalogTable getCatalogTable() {
+ return catalogTables.get(0);
+ }
}