This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b9164b8ba [Feature][API & Connector & Doc] add parallelism and column
projection interface (#3829)
b9164b8ba is described below
commit b9164b8ba18ba1b647e8a99f7b4d9c3c078b7600
Author: Eric <[email protected]>
AuthorDate: Sun Jan 1 17:48:49 2023 +0800
[Feature][API & Connector & Doc] add parallelism and column projection
interface (#3829)
* add transform doc
* add transform v2 document
* remove transform v1 from document
* improve document
* fix dead link
* fix dead link
* fix dead link
* update supported connnector num
* Update docs/en/transform-v2/replace.md
Co-authored-by: Zongwen Li <[email protected]>
* fix ci
* fix ci error
* add Parallelism and SchemaProjection inteface to Source Connector
* update schemaprojection to columnprojection
* fix code style
* tmp
* revert FactoryUtil update
Co-authored-by: Zongwen Li <[email protected]>
---
docs/en/concept/connector-v2-features.md | 12 +++----
docs/en/connector-v2/sink/AmazonDynamoDB.md | 1 -
docs/en/connector-v2/sink/Assert.md | 1 -
docs/en/connector-v2/sink/Cassandra.md | 1 -
docs/en/connector-v2/sink/Clickhouse.md | 1 -
docs/en/connector-v2/sink/ClickhouseFile.md | 1 -
docs/en/connector-v2/sink/Console.md | 1 -
docs/en/connector-v2/sink/Datahub.md | 1 -
docs/en/connector-v2/sink/DingTalk.md | 1 -
docs/en/connector-v2/sink/Doris.md | 1 -
docs/en/connector-v2/sink/Elasticsearch.md | 1 -
docs/en/connector-v2/sink/Email.md | 1 -
docs/en/connector-v2/sink/Enterprise-WeChat.md | 1 -
docs/en/connector-v2/sink/Feishu.md | 1 -
docs/en/connector-v2/sink/FtpFile.md | 1 -
docs/en/connector-v2/sink/Greenplum.md | 1 -
docs/en/connector-v2/sink/HdfsFile.md | 1 -
docs/en/connector-v2/sink/Hive.md | 1 -
docs/en/connector-v2/sink/Http.md | 1 -
docs/en/connector-v2/sink/InfluxDB.md | 1 -
docs/en/connector-v2/sink/IoTDB.md | 14 ++++----
docs/en/connector-v2/sink/Jdbc.md | 1 -
docs/en/connector-v2/sink/Kafka.md | 2 --
docs/en/connector-v2/sink/Kudu.md | 1 -
docs/en/connector-v2/sink/LocalFile.md | 1 -
docs/en/connector-v2/sink/Maxcompute.md | 1 -
docs/en/connector-v2/sink/MongoDB.md | 5 ---
docs/en/connector-v2/sink/Neo4j.md | 1 -
docs/en/connector-v2/sink/OssFile.md | 1 -
docs/en/connector-v2/sink/OssJindoFile.md | 1 -
docs/en/connector-v2/sink/Phoenix.md | 1 -
docs/en/connector-v2/sink/Rabbitmq.md | 1 -
docs/en/connector-v2/sink/Redis.md | 1 -
docs/en/connector-v2/sink/S3-Redshift.md | 1 -
docs/en/connector-v2/sink/S3File.md | 1 -
docs/en/connector-v2/sink/Sentry.md | 2 --
docs/en/connector-v2/sink/SftpFile.md | 1 -
docs/en/connector-v2/sink/Slack.md | 1 -
docs/en/connector-v2/sink/Socket.md | 1 -
docs/en/connector-v2/sink/StarRocks.md | 1 -
docs/en/connector-v2/sink/Tablestore.md | 1 -
docs/en/connector-v2/source/AmazonDynamoDB.md | 2 +-
docs/en/connector-v2/source/Cassandra.md | 2 +-
docs/en/connector-v2/source/Clickhouse.md | 2 +-
docs/en/connector-v2/source/Elasticsearch.md | 2 +-
docs/en/connector-v2/source/FakeSource.md | 2 +-
docs/en/connector-v2/source/FtpFile.md | 2 +-
docs/en/connector-v2/source/Gitlab.md | 2 +-
docs/en/connector-v2/source/GoogleSheets.md | 2 +-
docs/en/connector-v2/source/Greenplum.md | 2 +-
docs/en/connector-v2/source/HdfsFile.md | 2 +-
docs/en/connector-v2/source/Hive.md | 2 +-
docs/en/connector-v2/source/Http.md | 4 +--
docs/en/connector-v2/source/Hudi.md | 2 +-
docs/en/connector-v2/source/Iceberg.md | 5 ++-
docs/en/connector-v2/source/InfluxDB.md | 2 +-
docs/en/connector-v2/source/IoTDB.md | 2 +-
docs/en/connector-v2/source/Jdbc.md | 2 +-
docs/en/connector-v2/source/Jira.md | 2 +-
docs/en/connector-v2/source/Klaviyo.md | 2 +-
docs/en/connector-v2/source/Kudu.md | 2 +-
docs/en/connector-v2/source/Lemlist.md | 2 +-
docs/en/connector-v2/source/LocalFile.md | 2 +-
docs/en/connector-v2/source/Maxcompute.md | 2 +-
docs/en/connector-v2/source/MongoDB.md | 2 +-
docs/en/connector-v2/source/MyHours.md | 2 +-
docs/en/connector-v2/source/MySQL-CDC.md | 2 +-
docs/en/connector-v2/source/Neo4j.md | 4 +--
docs/en/connector-v2/source/Notion.md | 2 +-
docs/en/connector-v2/source/OneSignal.md | 2 +-
docs/en/connector-v2/source/OpenMldb.md | 2 +-
docs/en/connector-v2/source/OssFile.md | 2 +-
docs/en/connector-v2/source/OssJindoFile.md | 2 +-
docs/en/connector-v2/source/Phoenix.md | 2 +-
docs/en/connector-v2/source/Rabbitmq.md | 2 +-
docs/en/connector-v2/source/Redis.md | 2 +-
docs/en/connector-v2/source/S3File.md | 2 +-
docs/en/connector-v2/source/SftpFile.md | 2 +-
docs/en/connector-v2/source/Socket.md | 2 +-
docs/en/connector-v2/source/kafka.md | 2 +-
docs/en/connector-v2/source/pulsar.md | 2 +-
.../api/source/SupportColumnProjection.java | 24 ++++++++++++++
.../seatunnel/api/source/SupportParallelism.java | 24 ++++++++++++++
.../source/AmazonDynamoDBSource.java | 3 +-
.../cassandra/source/CassandraSource.java | 3 +-
.../cdc/mysql/source/MySqlIncrementalSource.java | 3 +-
.../source/source/SqlServerIncrementalSource.java | 4 ++-
.../clickhouse/source/ClickhouseSource.java | 25 ++++++++++----
.../elasticsearch/source/ElasticsearchSource.java | 6 ++--
.../seatunnel/fake/source/FakeSource.java | 5 ++-
.../seatunnel/file/source/BaseFileSource.java | 14 +++++---
.../seatunnel/hudi/source/HudiSource.java | 3 +-
.../seatunnel/iceberg/source/IcebergSource.java | 5 ++-
.../seatunnel/influxdb/source/InfluxDBSource.java | 9 +++--
.../seatunnel/iotdb/source/IoTDBSource.java | 6 +++-
.../seatunnel/jdbc/source/JdbcSource.java | 38 +++++++++++++++-------
.../seatunnel/kafka/source/KafkaSource.java | 4 ++-
.../seatunnel/kudu/source/KuduSource.java | 3 +-
.../maxcompute/source/MaxcomputeSource.java | 4 ++-
.../mongodb/source/MongodbSourceReader.java | 3 +-
.../seatunnel/neo4j/source/Neo4jSource.java | 3 +-
.../seatunnel/openmldb/source/OpenMldbSource.java | 3 +-
.../seatunnel/pulsar/source/PulsarSource.java | 4 ++-
.../seatunnel/rabbitmq/source/RabbitmqSource.java | 4 ++-
104 files changed, 211 insertions(+), 147 deletions(-)
diff --git a/docs/en/concept/connector-v2-features.md
b/docs/en/concept/connector-v2-features.md
index 24bd65b65..c6612b2e5 100644
--- a/docs/en/concept/connector-v2-features.md
+++ b/docs/en/concept/connector-v2-features.md
@@ -24,11 +24,13 @@ and then locate the **Split** and **offset** read last time
and continue to send
For example `File`, `Kafka`.
-### schema projection
+### column projection
-If the source connector supports selective reading of certain columns or
redefine columns order or supports the data format read through `schema`
params, we think it supports schema projection.
+If the connector supports reading only specified columns from the data source
(note that if you read all columns first and then filter unnecessary columns
through the schema, this method is not a real column projection)
-For example `JDBCSource` can use sql define read columns, `KafkaSource` can
use `schema` params to define the read schema.
+For example `JDBCSource` can use sql define read columns.
+
+`KafkaSource` will read all content from topic and then use `schema` to filter
unnecessary columns, This is not `column projection`.
### batch
@@ -60,10 +62,6 @@ For sink connector, the sink connector supports exactly-once
if any piece of dat
* The target database supports key deduplication. For example `MySQL`, `Kudu`.
* The target support **XA Transaction**(This transaction can be used across
sessions. Even if the program that created the transaction has ended, the newly
started program only needs to know the ID of the last transaction to resubmit
or roll back the transaction). Then we can use **Two-phase Commit** to ensure
**exactly-once**. For example `File`, `MySQL`.
-### schema projection
-
-If a sink connector supports the fields and their types or redefine columns
order written in the configuration, we think it supports schema projection.
-
### cdc(change data capture)
If a sink connector supports writing row
kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think
it supports cdc(change data capture).
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/AmazonDynamoDB.md
b/docs/en/connector-v2/sink/AmazonDynamoDB.md
index c1bf515be..23e4e36b3 100644
--- a/docs/en/connector-v2/sink/AmazonDynamoDB.md
+++ b/docs/en/connector-v2/sink/AmazonDynamoDB.md
@@ -10,7 +10,6 @@ Write data to Amazon DynamoDB
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Assert.md
b/docs/en/connector-v2/sink/Assert.md
index 14a4606a4..fe55e9c90 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -9,7 +9,6 @@ A flink sink plugin which can assert illegal data by user
defined rules
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Cassandra.md
b/docs/en/connector-v2/sink/Cassandra.md
index 0a4ece086..7a2bb165e 100644
--- a/docs/en/connector-v2/sink/Cassandra.md
+++ b/docs/en/connector-v2/sink/Cassandra.md
@@ -9,7 +9,6 @@ Write data to Apache Cassandra.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Clickhouse.md
b/docs/en/connector-v2/sink/Clickhouse.md
index 90bec5fc9..5ab4072f3 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -12,7 +12,6 @@ Used to write data to Clickhouse.
The Clickhouse sink plug-in can achieve accuracy once by implementing
idempotent writing, and needs to cooperate with aggregatingmergetree and other
engines that support deduplication.
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
:::tip
diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md
b/docs/en/connector-v2/sink/ClickhouseFile.md
index 1eb2458d0..e8faeb990 100644
--- a/docs/en/connector-v2/sink/ClickhouseFile.md
+++ b/docs/en/connector-v2/sink/ClickhouseFile.md
@@ -11,7 +11,6 @@ should be `true`. Supports Batch and Streaming mode.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
:::tip
diff --git a/docs/en/connector-v2/sink/Console.md
b/docs/en/connector-v2/sink/Console.md
index 134b74c85..c3f66ff9d 100644
--- a/docs/en/connector-v2/sink/Console.md
+++ b/docs/en/connector-v2/sink/Console.md
@@ -10,7 +10,6 @@ Used to send data to Console. Both support streaming and
batch mode.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Datahub.md
b/docs/en/connector-v2/sink/Datahub.md
index e7942800b..b2d772b00 100644
--- a/docs/en/connector-v2/sink/Datahub.md
+++ b/docs/en/connector-v2/sink/Datahub.md
@@ -9,7 +9,6 @@ A sink plugin which use send message to DataHub
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/DingTalk.md
b/docs/en/connector-v2/sink/DingTalk.md
index 095cbb604..23db2e12d 100644
--- a/docs/en/connector-v2/sink/DingTalk.md
+++ b/docs/en/connector-v2/sink/DingTalk.md
@@ -9,7 +9,6 @@ A sink plugin which use DingTalk robot send message
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Doris.md
b/docs/en/connector-v2/sink/Doris.md
index 6cac1bc32..df75b5ee7 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -8,7 +8,6 @@ The internal implementation of Doris sink connector is cached
and imported by st
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md
b/docs/en/connector-v2/sink/Elasticsearch.md
index 1e5e650c9..0ee3e699c 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -7,7 +7,6 @@ Output data to `Elasticsearch`.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
:::tip
diff --git a/docs/en/connector-v2/sink/Email.md
b/docs/en/connector-v2/sink/Email.md
index 719f67bea..a1642dadc 100644
--- a/docs/en/connector-v2/sink/Email.md
+++ b/docs/en/connector-v2/sink/Email.md
@@ -11,7 +11,6 @@ Send the data as a file to email.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Enterprise-WeChat.md
b/docs/en/connector-v2/sink/Enterprise-WeChat.md
index d933bfc3a..22fd815c7 100644
--- a/docs/en/connector-v2/sink/Enterprise-WeChat.md
+++ b/docs/en/connector-v2/sink/Enterprise-WeChat.md
@@ -16,7 +16,6 @@ A sink plugin which use Enterprise WeChat robot send message
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Feishu.md
b/docs/en/connector-v2/sink/Feishu.md
index b0f1d497f..349048001 100644
--- a/docs/en/connector-v2/sink/Feishu.md
+++ b/docs/en/connector-v2/sink/Feishu.md
@@ -13,7 +13,6 @@ Used to launch Feishu web hooks using data.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/FtpFile.md
b/docs/en/connector-v2/sink/FtpFile.md
index 840bd502d..5d7c00996 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -20,7 +20,6 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/Greenplum.md
b/docs/en/connector-v2/sink/Greenplum.md
index 2aac08538..e0f6268e5 100644
--- a/docs/en/connector-v2/sink/Greenplum.md
+++ b/docs/en/connector-v2/sink/Greenplum.md
@@ -9,7 +9,6 @@ Write data to Greenplum using [Jdbc connector](Jdbc.md).
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
:::tip
diff --git a/docs/en/connector-v2/sink/HdfsFile.md
b/docs/en/connector-v2/sink/HdfsFile.md
index f37749b40..eed6ff210 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -20,7 +20,6 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 14faf5213..269cc1cef 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -19,7 +19,6 @@ If you use SeaTunnel Engine, You need put
seatunnel-hadoop3-3.1.4-uber.jar and h
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/Http.md
b/docs/en/connector-v2/sink/Http.md
index 0cb26e4cb..2eecb6ab5 100644
--- a/docs/en/connector-v2/sink/Http.md
+++ b/docs/en/connector-v2/sink/Http.md
@@ -13,7 +13,6 @@ Used to launch web hooks using data.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/InfluxDB.md
b/docs/en/connector-v2/sink/InfluxDB.md
index f2cb27406..fd25234b8 100644
--- a/docs/en/connector-v2/sink/InfluxDB.md
+++ b/docs/en/connector-v2/sink/InfluxDB.md
@@ -9,7 +9,6 @@ Write data to InfluxDB.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/IoTDB.md
b/docs/en/connector-v2/sink/IoTDB.md
index f5bafb8bc..7c845eb46 100644
--- a/docs/en/connector-v2/sink/IoTDB.md
+++ b/docs/en/connector-v2/sink/IoTDB.md
@@ -6,6 +6,12 @@
Used to write data to IoTDB.
+:::tip
+
+There is a conflict of thrift version between IoTDB and Spark.Therefore, you
need to execute `rm -f $SPARK_HOME/jars/libthrift*` and `cp
$IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/` to resolve it.
+
+:::
+
## Key features
- [x] [exactly-once](../../concept/connector-v2-features.md)
@@ -13,14 +19,6 @@ Used to write data to IoTDB.
IoTDB supports the `exactly-once` feature through idempotent writing. If two
pieces of data have
the same `key` and `timestamp`, the new data will overwrite the old one.
-- [ ] [schema projection](../../concept/connector-v2-features.md)
-
-:::tip
-
-There is a conflict of thrift version between IoTDB and Spark.Therefore, you
need to execute `rm -f $SPARK_HOME/jars/libthrift*` and `cp
$IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/` to resolve it.
-
-:::
-
## Options
| name | type | required | default value
|
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 9f2aa2e75..09f7f14fc 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -22,7 +22,6 @@ e.g. If you use MySQL, should download and copy
`mysql-connector-java-xxx.jar` t
Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once`
for the database which is
support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index aa3f3b54b..e8860824b 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -11,8 +11,6 @@ Write Rows to a Kafka topic.
By default, we will use 2pc to guarantee the message is sent to kafka exactly
once.
-- [ ] [schema projection](../../concept/connector-v2-features.md)
-
## Options
| name | type | required | default value |
diff --git a/docs/en/connector-v2/sink/Kudu.md
b/docs/en/connector-v2/sink/Kudu.md
index 5448bf85b..3785b584c 100644
--- a/docs/en/connector-v2/sink/Kudu.md
+++ b/docs/en/connector-v2/sink/Kudu.md
@@ -11,7 +11,6 @@ Write data to Kudu.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/LocalFile.md
b/docs/en/connector-v2/sink/LocalFile.md
index 6bc4f6f08..ab5753f27 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -20,7 +20,6 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/Maxcompute.md
b/docs/en/connector-v2/sink/Maxcompute.md
index 302dca7ae..3898fbf1f 100644
--- a/docs/en/connector-v2/sink/Maxcompute.md
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -9,7 +9,6 @@ Used to read data from Maxcompute.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/MongoDB.md
b/docs/en/connector-v2/sink/MongoDB.md
index b5007fb21..0aebed29e 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -8,12 +8,7 @@ Write data to `MongoDB`
## Key features
-- [x] [batch](../../concept/connector-v2-features.md)
-- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
-- [ ] [parallelism](../../concept/connector-v2-features.md)
-- [ ] [support user-defined split](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Neo4j.md
b/docs/en/connector-v2/sink/Neo4j.md
index 08595c976..5d3c94ddc 100644
--- a/docs/en/connector-v2/sink/Neo4j.md
+++ b/docs/en/connector-v2/sink/Neo4j.md
@@ -11,7 +11,6 @@ Write data to Neo4j.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/OssFile.md
b/docs/en/connector-v2/sink/OssFile.md
index d0e7ac692..a45983ea7 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -23,7 +23,6 @@ It only supports hadoop version **2.9.X+**.
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md
b/docs/en/connector-v2/sink/OssJindoFile.md
index 8378d127d..92fb7131d 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -23,7 +23,6 @@ It only supports hadoop version **2.9.X+**.
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/Phoenix.md
b/docs/en/connector-v2/sink/Phoenix.md
index ef707d145..a99c554c6 100644
--- a/docs/en/connector-v2/sink/Phoenix.md
+++ b/docs/en/connector-v2/sink/Phoenix.md
@@ -15,7 +15,6 @@ Two ways of connecting Phoenix with Java JDBC. One is to
connect to zookeeper th
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md
b/docs/en/connector-v2/sink/Rabbitmq.md
index 5a6c9a7de..aa0aeb784 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -9,7 +9,6 @@ Used to write data to Rabbitmq.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Redis.md
b/docs/en/connector-v2/sink/Redis.md
index e77f0f944..102c6e0b6 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -9,7 +9,6 @@ Used to write data to Redis.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/S3-Redshift.md
b/docs/en/connector-v2/sink/S3-Redshift.md
index b9f235338..7a519b770 100644
--- a/docs/en/connector-v2/sink/S3-Redshift.md
+++ b/docs/en/connector-v2/sink/S3-Redshift.md
@@ -18,7 +18,6 @@ Output data to AWS Redshift.
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/S3File.md
b/docs/en/connector-v2/sink/S3File.md
index 39c866539..a9040e9bc 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -22,7 +22,6 @@ To use this connector you need put hadoop-aws-3.1.4.jar and
aws-java-sdk-bundle-
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/Sentry.md
b/docs/en/connector-v2/sink/Sentry.md
index 3f1c3247b..5fd5a028d 100644
--- a/docs/en/connector-v2/sink/Sentry.md
+++ b/docs/en/connector-v2/sink/Sentry.md
@@ -7,8 +7,6 @@ Write message to Sentry.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
-
## Options
diff --git a/docs/en/connector-v2/sink/SftpFile.md
b/docs/en/connector-v2/sink/SftpFile.md
index 32dc881c9..d71ea8b7d 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -20,7 +20,6 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
diff --git a/docs/en/connector-v2/sink/Slack.md
b/docs/en/connector-v2/sink/Slack.md
index 6f011bfc3..a22462f9f 100644
--- a/docs/en/connector-v2/sink/Slack.md
+++ b/docs/en/connector-v2/sink/Slack.md
@@ -11,7 +11,6 @@ Used to send data to Slack Channel. Both support streaming
and batch mode.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Socket.md
b/docs/en/connector-v2/sink/Socket.md
index a1ab7b440..0eaa12660 100644
--- a/docs/en/connector-v2/sink/Socket.md
+++ b/docs/en/connector-v2/sink/Socket.md
@@ -10,7 +10,6 @@ Used to send data to Socket Server. Both support streaming
and batch mode.
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/StarRocks.md
b/docs/en/connector-v2/sink/StarRocks.md
index 7f17154b6..8cab5aabe 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -8,7 +8,6 @@ The internal implementation of StarRocks sink connector is
cached and imported b
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/sink/Tablestore.md
b/docs/en/connector-v2/sink/Tablestore.md
index 15ca34eda..b37f0d736 100644
--- a/docs/en/connector-v2/sink/Tablestore.md
+++ b/docs/en/connector-v2/sink/Tablestore.md
@@ -9,7 +9,6 @@ Write data to `Tablestore`
## Key features
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
## Options
diff --git a/docs/en/connector-v2/source/AmazonDynamoDB.md
b/docs/en/connector-v2/source/AmazonDynamoDB.md
index ceb37226b..515f96a4c 100644
--- a/docs/en/connector-v2/source/AmazonDynamoDB.md
+++ b/docs/en/connector-v2/source/AmazonDynamoDB.md
@@ -11,7 +11,7 @@ Read data from Amazon DynamoDB.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Cassandra.md
b/docs/en/connector-v2/source/Cassandra.md
index e2a6e4e8c..933f91512 100644
--- a/docs/en/connector-v2/source/Cassandra.md
+++ b/docs/en/connector-v2/source/Cassandra.md
@@ -11,7 +11,7 @@ Read data from Apache Cassandra.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Clickhouse.md
b/docs/en/connector-v2/source/Clickhouse.md
index 8df3d7c3d..6775fd3ef 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -11,7 +11,7 @@ Used to read data from Clickhouse.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
supports query SQL and can achieve projection effect.
diff --git a/docs/en/connector-v2/source/Elasticsearch.md
b/docs/en/connector-v2/source/Elasticsearch.md
index 8b3bcb644..76dcb7a32 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -13,7 +13,7 @@ support version >= 2.x and < 8.x.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
index a1e651e47..9e6337f10 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -12,7 +12,7 @@ just for some test cases such as type conversion or connector
new feature testin
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index 3c650388b..abbb274a0 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -19,7 +19,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/Gitlab.md
b/docs/en/connector-v2/source/Gitlab.md
index d0b1df1bc..f6493bd50 100644
--- a/docs/en/connector-v2/source/Gitlab.md
+++ b/docs/en/connector-v2/source/Gitlab.md
@@ -11,7 +11,7 @@ Used to read data from Gitlab.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/GoogleSheets.md
b/docs/en/connector-v2/source/GoogleSheets.md
index 1d9d65d35..2ac636dc1 100644
--- a/docs/en/connector-v2/source/GoogleSheets.md
+++ b/docs/en/connector-v2/source/GoogleSheets.md
@@ -11,7 +11,7 @@ Used to read data from GoogleSheets.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [ ] file format
diff --git a/docs/en/connector-v2/source/Greenplum.md
b/docs/en/connector-v2/source/Greenplum.md
index 8fb34de70..94a0765de 100644
--- a/docs/en/connector-v2/source/Greenplum.md
+++ b/docs/en/connector-v2/source/Greenplum.md
@@ -11,7 +11,7 @@ Read Greenplum data through [Jdbc connector](Jdbc.md).
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
supports query SQL and can achieve projection effect.
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index b8bedec19..8a1e14533 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index 2ba9af465..b12b4802a 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -21,7 +21,7 @@ If you use SeaTunnel Engine, You need put
seatunnel-hadoop3-3.1.4-uber.jar and h
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/Http.md
b/docs/en/connector-v2/source/Http.md
index 6e0ceb281..42fc4991a 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -11,7 +11,7 @@ Used to read data from Http.
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
@@ -211,7 +211,7 @@ This parameter helps you configure the schema,so this
parameter must be used wit
If your data looks something like this:
```json
-{
+{
"store": {
"book": [
{
diff --git a/docs/en/connector-v2/source/Hudi.md
b/docs/en/connector-v2/source/Hudi.md
index 5d059cb90..4d6ef7b66 100644
--- a/docs/en/connector-v2/source/Hudi.md
+++ b/docs/en/connector-v2/source/Hudi.md
@@ -16,7 +16,7 @@ Currently, only supports hudi cow table and Snapshot Query
with Batch Mode
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Iceberg.md
b/docs/en/connector-v2/source/Iceberg.md
index a4400a530..72dacc076 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -11,10 +11,9 @@ Source connector for Apache Iceberg. It can support batch
and stream mode.
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-
- [x] data format
- [x] parquet
- [x] orc
@@ -140,7 +139,7 @@ source {
}
```
-schema projection
+column projection
```hocon
source {
diff --git a/docs/en/connector-v2/source/InfluxDB.md
b/docs/en/connector-v2/source/InfluxDB.md
index 3e711080e..67c9c362a 100644
--- a/docs/en/connector-v2/source/InfluxDB.md
+++ b/docs/en/connector-v2/source/InfluxDB.md
@@ -11,7 +11,7 @@ Read external data source data through InfluxDB.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
supports query SQL and can achieve projection effect.
diff --git a/docs/en/connector-v2/source/IoTDB.md
b/docs/en/connector-v2/source/IoTDB.md
index a2402dd5c..7844cae3e 100644
--- a/docs/en/connector-v2/source/IoTDB.md
+++ b/docs/en/connector-v2/source/IoTDB.md
@@ -11,7 +11,7 @@ Read external data source data through IoTDB.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
supports query SQL and can achieve projection effect.
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 7aacdb8bf..5f45f4b4f 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -19,7 +19,7 @@ e.g. If you use MySQL, should download and copy
`mysql-connector-java-xxx.jar` t
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
supports query SQL and can achieve projection effect.
diff --git a/docs/en/connector-v2/source/Jira.md
b/docs/en/connector-v2/source/Jira.md
index 1614b783c..728a69d5c 100644
--- a/docs/en/connector-v2/source/Jira.md
+++ b/docs/en/connector-v2/source/Jira.md
@@ -11,7 +11,7 @@ Used to read data from Jira.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Klaviyo.md
b/docs/en/connector-v2/source/Klaviyo.md
index 242f08b68..f62ac0f3d 100644
--- a/docs/en/connector-v2/source/Klaviyo.md
+++ b/docs/en/connector-v2/source/Klaviyo.md
@@ -11,7 +11,7 @@ Used to read data from Klaviyo.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Kudu.md
b/docs/en/connector-v2/source/Kudu.md
index 5da03dc11..c296a98b1 100644
--- a/docs/en/connector-v2/source/Kudu.md
+++ b/docs/en/connector-v2/source/Kudu.md
@@ -13,7 +13,7 @@ Used to read data from Kudu.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Lemlist.md
b/docs/en/connector-v2/source/Lemlist.md
index 922ebba23..80bcea30b 100644
--- a/docs/en/connector-v2/source/Lemlist.md
+++ b/docs/en/connector-v2/source/Lemlist.md
@@ -11,7 +11,7 @@ Used to read data from Lemlist.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index dea16a005..9f0fd8eed 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/Maxcompute.md
b/docs/en/connector-v2/source/Maxcompute.md
index 133d65949..47bc1c811 100644
--- a/docs/en/connector-v2/source/Maxcompute.md
+++ b/docs/en/connector-v2/source/Maxcompute.md
@@ -10,7 +10,7 @@ Used to read data from Maxcompute.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/MongoDB.md
b/docs/en/connector-v2/source/MongoDB.md
index 8b6325706..bfc60e256 100644
--- a/docs/en/connector-v2/source/MongoDB.md
+++ b/docs/en/connector-v2/source/MongoDB.md
@@ -11,7 +11,7 @@ Read data from MongoDB.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/MyHours.md
b/docs/en/connector-v2/source/MyHours.md
index de02a030f..c38c57c55 100644
--- a/docs/en/connector-v2/source/MyHours.md
+++ b/docs/en/connector-v2/source/MyHours.md
@@ -11,7 +11,7 @@ Used to read data from My Hours.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md
b/docs/en/connector-v2/source/MySQL-CDC.md
index d12b7bd06..11786c68c 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -12,7 +12,7 @@ describes how to setup the MySQL CDC connector to run SQL
queries against MySQL
- [ ] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [x] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Neo4j.md
b/docs/en/connector-v2/source/Neo4j.md
index 346532001..ed35bd6d0 100644
--- a/docs/en/connector-v2/source/Neo4j.md
+++ b/docs/en/connector-v2/source/Neo4j.md
@@ -13,7 +13,7 @@ Read data from Neo4j.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
@@ -64,7 +64,7 @@ Query statement.
returned fields of `query`
-see [schema projection](../../concept/connector-v2-features.md)
+see [column projection](../../concept/connector-v2-features.md)
### max_transaction_retry_time [long]
diff --git a/docs/en/connector-v2/source/Notion.md
b/docs/en/connector-v2/source/Notion.md
index e26a1e1c8..3285027e2 100644
--- a/docs/en/connector-v2/source/Notion.md
+++ b/docs/en/connector-v2/source/Notion.md
@@ -11,7 +11,7 @@ Used to read data from Notion.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/OneSignal.md
b/docs/en/connector-v2/source/OneSignal.md
index 852fc75ff..b48c2fedb 100644
--- a/docs/en/connector-v2/source/OneSignal.md
+++ b/docs/en/connector-v2/source/OneSignal.md
@@ -11,7 +11,7 @@ Used to read data from OneSignal.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/OpenMldb.md
b/docs/en/connector-v2/source/OpenMldb.md
index 28934af8b..54834b59d 100644
--- a/docs/en/connector-v2/source/OpenMldb.md
+++ b/docs/en/connector-v2/source/OpenMldb.md
@@ -11,7 +11,7 @@ Used to read data from OpenMldb.
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/OssFile.md
b/docs/en/connector-v2/source/OssFile.md
index ea3256379..3bdb618fc 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -25,7 +25,7 @@ It only supports hadoop version **2.9.X+**.
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/OssJindoFile.md
b/docs/en/connector-v2/source/OssJindoFile.md
index 2eb5d5dda..cadba8df9 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -25,7 +25,7 @@ It only supports hadoop version **2.9.X+**.
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/Phoenix.md
b/docs/en/connector-v2/source/Phoenix.md
index 32e4c94ba..03a580521 100644
--- a/docs/en/connector-v2/source/Phoenix.md
+++ b/docs/en/connector-v2/source/Phoenix.md
@@ -15,7 +15,7 @@ Two ways of connecting Phoenix with Java JDBC. One is to
connect to zookeeper th
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
supports query SQL and can achieve projection effect.
diff --git a/docs/en/connector-v2/source/Rabbitmq.md
b/docs/en/connector-v2/source/Rabbitmq.md
index 47030cd46..e7758d195 100644
--- a/docs/en/connector-v2/source/Rabbitmq.md
+++ b/docs/en/connector-v2/source/Rabbitmq.md
@@ -11,7 +11,7 @@ Used to read data from Rabbitmq.
- [ ] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
index 930f915ea..f6d93ad7a 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -11,7 +11,7 @@ Used to read data from Redis.
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/S3File.md
b/docs/en/connector-v2/source/S3File.md
index b0f3805dc..b01dc2ea5 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -24,7 +24,7 @@ To use this connector you need put hadoop-aws-3.1.4.jar and
aws-java-sdk-bundle-
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/SftpFile.md
b/docs/en/connector-v2/source/SftpFile.md
index 75d7a66ce..26108f6ca 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -19,7 +19,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
diff --git a/docs/en/connector-v2/source/Socket.md
b/docs/en/connector-v2/source/Socket.md
index eccc1f4fe..8da8e8d2e 100644
--- a/docs/en/connector-v2/source/Socket.md
+++ b/docs/en/connector-v2/source/Socket.md
@@ -11,7 +11,7 @@ Used to read data from Socket.
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 61b96936d..801240879 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -11,7 +11,7 @@ Source connector for Apache Kafka.
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git a/docs/en/connector-v2/source/pulsar.md
b/docs/en/connector-v2/source/pulsar.md
index 44eaa4d7d..bbe74b78f 100644
--- a/docs/en/connector-v2/source/pulsar.md
+++ b/docs/en/connector-v2/source/pulsar.md
@@ -11,7 +11,7 @@ Source connector for Apache Pulsar.
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportColumnProjection.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportColumnProjection.java
new file mode 100644
index 000000000..9f44d4d16
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportColumnProjection.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+/**
+ * Mark whether the Source connector supports ColumnProjection
+ */
+public interface SupportColumnProjection {
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportParallelism.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportParallelism.java
new file mode 100644
index 000000000..94e0f3e1b
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportParallelism.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+/**
+ * Mark whether the Source connector supports parallelism
+ */
+public interface SupportParallelism {
+}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
index 90efd22a4..02dec1680 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -48,7 +49,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@AutoService(SeaTunnelSource.class)
-public class AmazonDynamoDBSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
+public class AmazonDynamoDBSource extends
AbstractSingleSplitSource<SeaTunnelRow> implements SupportColumnProjection {
private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
index a128af301..23b181625 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -48,7 +49,7 @@ import com.datastax.oss.driver.api.core.cql.Row;
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
-public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
private SeaTunnelRowType rowTypeInfo;
private CassandraConfig cassandraConfig;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index b473c97a7..0d5d5e694 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,7 +42,7 @@ import com.google.auto.service.AutoService;
import java.time.ZoneId;
@AutoService(SeaTunnelSource.class)
-public class MySqlIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> {
+public class MySqlIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> implements SupportParallelism {
static final String IDENTIFIER = "MySQL-CDC";
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index bb85300ab..aabb82ae8 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -22,6 +22,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.uti
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
@@ -44,7 +45,8 @@ import io.debezium.relational.TableId;
import java.time.ZoneId;
@AutoService(SeaTunnelSource.class)
-public class SqlServerIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> {
+public class SqlServerIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> implements
+ SupportParallelism {
@Override
public String getPluginName() {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index f9d857b9c..49981b890 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -29,6 +29,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -53,7 +55,8 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@AutoService(SeaTunnelSource.class)
-public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow,
ClickhouseSourceSplit, ClickhouseSourceState> {
+public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow,
ClickhouseSourceSplit, ClickhouseSourceState>,
+ SupportParallelism, SupportColumnProjection {
private List<ClickHouseNode> servers;
private SeaTunnelRowType rowTypeInfo;
@@ -66,10 +69,13 @@ public class ClickhouseSource implements
SeaTunnelSource<SeaTunnelRow, Clickhous
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(config,
HOST.key(), DATABASE.key(), SQL.key(), USERNAME.key(), PASSWORD.key());
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(config, HOST.key(), DATABASE.key(),
SQL.key(), USERNAME.key(),
+ PASSWORD.key());
if (!result.isSuccess()) {
throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE,
+ result.getMsg()));
}
servers = ClickhouseUtil.createNodes(config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
@@ -94,7 +100,8 @@ public class ClickhouseSource implements
SeaTunnelSource<SeaTunnelRow, Clickhous
} catch (ClickHouseException e) {
throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, e.getMessage()));
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE,
+ e.getMessage()));
}
}
@@ -114,17 +121,21 @@ public class ClickhouseSource implements
SeaTunnelSource<SeaTunnelRow, Clickhous
}
@Override
- public SourceReader<SeaTunnelRow, ClickhouseSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ public SourceReader<SeaTunnelRow, ClickhouseSourceSplit>
createReader(SourceReader.Context readerContext)
+ throws Exception {
return new ClickhouseSourceReader(servers, readerContext,
this.rowTypeInfo, sql);
}
@Override
- public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState>
createEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit>
enumeratorContext) throws Exception {
+ public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState>
createEnumerator(
+ SourceSplitEnumerator.Context<ClickhouseSourceSplit>
enumeratorContext) throws Exception {
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
}
@Override
- public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<ClickhouseSourceSplit>
enumeratorContext, ClickhouseSourceState checkpointState) throws Exception {
+ public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<ClickhouseSourceSplit>
enumeratorContext, ClickhouseSourceState checkpointState)
+ throws Exception {
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index d3fb36483..98299c5bb 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -39,8 +41,8 @@ import java.util.List;
import java.util.Map;
@AutoService(SeaTunnelSource.class)
-public class ElasticsearchSource implements SeaTunnelSource<SeaTunnelRow,
ElasticsearchSourceSplit, ElasticsearchSourceState> {
-
+public class ElasticsearchSource implements SeaTunnelSource<SeaTunnelRow,
ElasticsearchSourceSplit, ElasticsearchSourceState>,
+ SupportParallelism, SupportColumnProjection {
private Config pluginConfig;
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 9e5b06d01..dd61002bf 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -41,7 +43,8 @@ import com.google.auto.service.AutoService;
import java.util.Collections;
@AutoService(SeaTunnelSource.class)
-public class FakeSource implements SeaTunnelSource<SeaTunnelRow,
FakeSourceSplit, FakeSourceState> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow,
FakeSourceSplit, FakeSourceState>, SupportParallelism,
+ SupportColumnProjection {
private JobContext jobContext;
private SeaTunnelSchema schema;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
index 02d1e024a..14e5502d4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -32,7 +33,8 @@ import
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceSta
import java.util.List;
-public abstract class BaseFileSource implements SeaTunnelSource<SeaTunnelRow,
FileSourceSplit, FileSourceState> {
+public abstract class BaseFileSource implements SeaTunnelSource<SeaTunnelRow,
FileSourceSplit, FileSourceState>,
+ SupportParallelism {
protected SeaTunnelRowType rowType;
protected ReadStrategy readStrategy;
protected HadoopConf hadoopConf;
@@ -49,17 +51,21 @@ public abstract class BaseFileSource implements
SeaTunnelSource<SeaTunnelRow, Fi
}
@Override
- public SourceReader<SeaTunnelRow, FileSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ public SourceReader<SeaTunnelRow, FileSourceSplit>
createReader(SourceReader.Context readerContext)
+ throws Exception {
return new BaseFileSourceReader(readStrategy, hadoopConf,
readerContext);
}
@Override
- public SourceSplitEnumerator<FileSourceSplit, FileSourceState>
createEnumerator(SourceSplitEnumerator.Context<FileSourceSplit>
enumeratorContext) throws Exception {
+ public SourceSplitEnumerator<FileSourceSplit, FileSourceState>
createEnumerator(
+ SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext)
throws Exception {
return new FileSourceSplitEnumerator(enumeratorContext, filePaths);
}
@Override
- public SourceSplitEnumerator<FileSourceSplit, FileSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<FileSourceSplit>
enumeratorContext, FileSourceState checkpointState) throws Exception {
+ public SourceSplitEnumerator<FileSourceSplit, FileSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext,
FileSourceState checkpointState)
+ throws Exception {
return new FileSourceSplitEnumerator(enumeratorContext, filePaths,
checkpointState);
}
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
index 46edd20a6..8c0779296 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,7 +47,7 @@ import com.google.auto.service.AutoService;
import java.io.IOException;
@AutoService(SeaTunnelSource.class)
-public class HudiSource implements SeaTunnelSource<SeaTunnelRow,
HudiSourceSplit, HudiSourceState> {
+public class HudiSource implements SeaTunnelSource<SeaTunnelRow,
HudiSourceSplit, HudiSourceState>, SupportParallelism {
private SeaTunnelRowType typeInfo;
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
index b09160a71..38ba7be82 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -54,7 +56,8 @@ import java.util.ArrayList;
import java.util.List;
@AutoService(SeaTunnelSource.class)
-public class IcebergSource implements SeaTunnelSource<SeaTunnelRow,
IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> {
+public class IcebergSource implements SeaTunnelSource<SeaTunnelRow,
IcebergFileScanTaskSplit, IcebergSplitEnumeratorState>,
+ SupportParallelism, SupportColumnProjection {
private static final long serialVersionUID = 4343414808223919870L;
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 2c961517f..7497577ad 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -53,7 +55,8 @@ import java.util.stream.Collectors;
@Slf4j
@AutoService(SeaTunnelSource.class)
-public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow,
InfluxDBSourceSplit, InfluxDBSourceState> {
+public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow,
InfluxDBSourceSplit, InfluxDBSourceState>,
+ SupportParallelism, SupportColumnProjection {
private SeaTunnelRowType typeInfo;
private SourceConfig sourceConfig;
@@ -111,7 +114,9 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
}
@Override
- public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit>
enumeratorContext, InfluxDBSourceState checkpointState) throws Exception {
+ public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<InfluxDBSourceSplit> enumeratorContext,
InfluxDBSourceState checkpointState)
+ throws Exception {
return new InfluxDBSourceSplitEnumerator(enumeratorContext,
checkpointState, sourceConfig);
}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
index 9e66302ce..938b447d0 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,7 +48,9 @@ import java.util.HashMap;
import java.util.Map;
@AutoService(SeaTunnelSource.class)
-public class IoTDBSource implements SeaTunnelSource<SeaTunnelRow,
IoTDBSourceSplit, IoTDBSourceState> {
+public class IoTDBSource implements SeaTunnelSource<SeaTunnelRow,
IoTDBSourceSplit, IoTDBSourceState>,
+ SupportParallelism,
+ SupportColumnProjection {
private JobContext jobContext;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 076f8a17a..9fe16bb30 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -54,7 +56,8 @@ import java.util.HashMap;
import java.util.Map;
@AutoService(SeaTunnelSource.class)
-public class JdbcSource implements SeaTunnelSource<SeaTunnelRow,
JdbcSourceSplit, JdbcSourceState> {
+public class JdbcSource implements SeaTunnelSource<SeaTunnelRow,
JdbcSourceSplit, JdbcSourceState>, SupportParallelism,
+ SupportColumnProjection {
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
private JdbcSourceOptions jdbcSourceOptions;
@@ -106,7 +109,8 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
}
@Override
- public SourceReader<SeaTunnelRow, JdbcSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ public SourceReader<SeaTunnelRow, JdbcSourceSplit>
createReader(SourceReader.Context readerContext)
+ throws Exception {
return new JdbcSourceReader(inputFormat, readerContext);
}
@@ -116,12 +120,15 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
}
@Override
- public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
createEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext) throws Exception {
+ public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
createEnumerator(
+ SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext)
throws Exception {
return new JdbcSourceSplitEnumerator(enumeratorContext,
jdbcSourceOptions, partitionParameter);
}
@Override
- public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext, JdbcSourceState checkpointState) throws Exception {
+ public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext,
JdbcSourceState checkpointState)
+ throws Exception {
return new JdbcSourceSplitEnumerator(enumeratorContext,
jdbcSourceOptions, partitionParameter, checkpointState);
}
@@ -138,13 +145,15 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
} catch (Exception e) {
LOG.warn("get row type info exception", e);
}
- return new SeaTunnelRowType(fieldNames.toArray(new String[0]),
seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
+ return new SeaTunnelRowType(fieldNames.toArray(new String[0]),
+ seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
}
private PartitionParameter initPartitionParameter(String columnName,
Connection connection) throws SQLException {
long max = Long.MAX_VALUE;
long min = Long.MIN_VALUE;
- if (jdbcSourceOptions.getPartitionLowerBound().isPresent() &&
jdbcSourceOptions.getPartitionUpperBound().isPresent()) {
+ if (jdbcSourceOptions.getPartitionLowerBound().isPresent() &&
+ jdbcSourceOptions.getPartitionUpperBound().isPresent()) {
max = jdbcSourceOptions.getPartitionUpperBound().get();
min = jdbcSourceOptions.getPartitionLowerBound().get();
return new PartitionParameter(columnName, min, max,
jdbcSourceOptions.getPartitionNumber().orElse(null));
@@ -152,9 +161,11 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
try (ResultSet rs =
connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s)
" +
"FROM (%s) tt", columnName, columnName, query))) {
if (rs.next()) {
- max = jdbcSourceOptions.getPartitionUpperBound().isPresent() ?
jdbcSourceOptions.getPartitionUpperBound().get() :
+ max = jdbcSourceOptions.getPartitionUpperBound().isPresent() ?
+ jdbcSourceOptions.getPartitionUpperBound().get() :
Long.parseLong(rs.getString(1));
- min = jdbcSourceOptions.getPartitionLowerBound().isPresent() ?
jdbcSourceOptions.getPartitionLowerBound().get() :
+ min = jdbcSourceOptions.getPartitionLowerBound().isPresent() ?
+ jdbcSourceOptions.getPartitionLowerBound().get() :
Long.parseLong(rs.getString(2));
}
}
@@ -169,15 +180,18 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
fieldTypes.put(typeInfo.getFieldName(i),
typeInfo.getFieldType(i));
}
if (!fieldTypes.containsKey(partitionColumn)) {
- throw new
JdbcConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("field
%s not contain in query %s",
- partitionColumn, query));
+ throw new
JdbcConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format("field %s not contain in query %s",
+ partitionColumn, query));
}
SeaTunnelDataType<?> partitionColumnType =
fieldTypes.get(partitionColumn);
if (!isNumericType(partitionColumnType)) {
- throw new
JdbcConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("%s is
not numeric type", partitionColumn));
+ throw new
JdbcConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format("%s is not numeric type", partitionColumn));
}
PartitionParameter partitionParameter =
initPartitionParameter(partitionColumn, connection);
- query = String.format("SELECT * FROM (%s) tt where " +
partitionColumn + " >= ? AND " + partitionColumn + " <= ?", query);
+ query = String.format(
+ "SELECT * FROM (%s) tt where " + partitionColumn + " >= ? AND
" + partitionColumn + " <= ?", query);
return partitionParameter;
} else {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index ced1f1212..52d7a067e 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -41,6 +41,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -70,7 +71,8 @@ import java.util.Map;
import java.util.Properties;
@AutoService(SeaTunnelSource.class)
-public class KafkaSource implements SeaTunnelSource<SeaTunnelRow,
KafkaSourceSplit, KafkaSourceState> {
+public class KafkaSource implements SeaTunnelSource<SeaTunnelRow,
KafkaSourceSplit, KafkaSourceState>,
+ SupportParallelism {
private static final String DEFAULT_CONSUMER_GROUP =
"SeaTunnel-Consumer-Group";
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index fe76f2e5f..fbf4f3896 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -55,7 +56,7 @@ import java.util.List;
@Slf4j
@AutoService(SeaTunnelSource.class)
-public class KuduSource implements SeaTunnelSource<SeaTunnelRow,
KuduSourceSplit, KuduSourceState> {
+public class KuduSource implements SeaTunnelSource<SeaTunnelRow,
KuduSourceSplit, KuduSourceState>, SupportParallelism {
private SeaTunnelRowType rowTypeInfo;
private KuduInputFormat kuduInputFormat;
private PartitionParameter partitionParameter;
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
index b7381739a..d71dcd943 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
@@ -32,7 +33,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@AutoService(SeaTunnelSource.class)
-public class MaxcomputeSource implements SeaTunnelSource<SeaTunnelRow,
MaxcomputeSourceSplit, MaxcomputeSourceState> {
+public class MaxcomputeSource implements SeaTunnelSource<SeaTunnelRow,
MaxcomputeSourceSplit, MaxcomputeSourceState>,
+ SupportParallelism {
private SeaTunnelRowType typeInfo;
private Config pluginConfig;
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
index 208a44510..65470f3ce 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.source;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
@@ -40,7 +41,7 @@ import java.io.IOException;
import java.util.Optional;
@Slf4j
-public class MongodbSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
+public class MongodbSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> implements SupportColumnProjection {
private final SingleSplitReaderContext context;
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index 67172d837..283106d5b 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -32,6 +32,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -55,7 +56,7 @@ import org.neo4j.driver.AuthTokens;
import java.net.URI;
@AutoService(SeaTunnelSource.class)
-public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
private final Neo4jSourceQueryInfo neo4jSourceQueryInfo = new
Neo4jSourceQueryInfo();
private SeaTunnelRowType rowType;
diff --git
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
index 8364d18c4..b7dede85d 100644
---
a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
+++
b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -53,7 +54,7 @@ import java.sql.Types;
import java.util.List;
@AutoService(SeaTunnelSource.class)
-public class OpenMldbSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+public class OpenMldbSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
private OpenMldbParameters openMldbParameters;
private JobContext jobContext;
private SeaTunnelRowType seaTunnelRowType;
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index fe35f4bac..674ecbf2c 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -44,6 +44,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -78,7 +79,8 @@ import java.util.Arrays;
import java.util.regex.Pattern;
@AutoService(SeaTunnelSource.class)
-public class PulsarSource<T> implements SeaTunnelSource<T,
PulsarPartitionSplit, PulsarSplitEnumeratorState> {
+public class PulsarSource<T> implements SeaTunnelSource<T,
PulsarPartitionSplit, PulsarSplitEnumeratorState>,
+ SupportParallelism {
private DeserializationSchema<T> deserialization;
private PulsarAdminConfig adminConfig;
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
index 28a5ddc7e..27e37c559 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -52,7 +53,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
-public class RabbitmqSource implements SeaTunnelSource<SeaTunnelRow,
RabbitmqSplit, RabbitmqSplitEnumeratorState> {
+public class RabbitmqSource implements SeaTunnelSource<SeaTunnelRow,
RabbitmqSplit, RabbitmqSplitEnumeratorState>,
+ SupportParallelism {
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private JobContext jobContext;