This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 80cf8f4e4 [Feature][Connector-V2][Maxcompute] Add Maxcompute source & 
sink connector (#3640)
80cf8f4e4 is described below

commit 80cf8f4e4286b41966f17a0889af9bcccfd0dc83
Author: stdnt-xiao <[email protected]>
AuthorDate: Sun Dec 11 17:05:41 2022 +0800

    [Feature][Connector-V2][Maxcompute] Add Maxcompute source & sink connector 
(#3640)
---
 docs/en/connector-v2/sink/Maxcompute.md            |  79 ++++++
 docs/en/connector-v2/source/Maxcompute.md          |  82 ++++++
 plugin-mapping.properties                          |   2 +
 .../connector-maxcompute/pom.xml                   |  61 ++++
 .../maxcompute/config/MaxcomputeConfig.java        |  59 ++++
 .../exception/MaxcomputeConnectorException.java    |  36 +++
 .../seatunnel/maxcompute/sink/MaxcomputeSink.java  |  67 +++++
 .../maxcompute/sink/MaxcomputeSinkFactory.java     |  48 ++++
 .../maxcompute/sink/MaxcomputeWriter.java          |  88 ++++++
 .../maxcompute/source/MaxcomputeSource.java        |  74 +++++
 .../maxcompute/source/MaxcomputeSourceFactory.java |  48 ++++
 .../maxcompute/source/MaxcomputeSourceReader.java  | 107 +++++++
 .../maxcompute/source/MaxcomputeSourceSplit.java   |  39 +++
 .../source/MaxcomputeSourceSplitEnumerator.java    | 149 ++++++++++
 .../maxcompute/source/MaxcomputeSourceState.java   |  33 +++
 .../maxcompute/util/MaxcomputeTypeMapper.java      | 311 +++++++++++++++++++++
 .../seatunnel/maxcompute/util/MaxcomputeUtil.java  | 109 ++++++++
 .../main/resources/maxcompute_to_maxcompute.conf   |  68 +++++
 .../src/test/java/BasicTypeToOdpsTypeTest.java     |  97 +++++++
 .../src/test/java/MaxcomputeSourceFactoryTest.java |  30 ++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   8 +-
 22 files changed, 1595 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/sink/Maxcompute.md 
b/docs/en/connector-v2/sink/Maxcompute.md
new file mode 100644
index 000000000..302dca7ae
--- /dev/null
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -0,0 +1,79 @@
+# Maxcompute
+
+> Maxcompute sink connector
+
+## Description
+
+Used to read data from Maxcompute.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name              | type     | required  | default value |
+|-------------------|----------|-----------|---------------|
+| accessId          | string   | yes       | -             |
+| accesskey         | string   | yes       | -             |
+| endpoint          | string   | yes       | -             |
+| project           | string   | yes       | -             |
+| table_name        | string   | yes       | -             |
+| partition_spec    | string   | no        | -             |
+| overwrite         | boolean  | no        | false         |
+| common-options    | string   | no        |               |
+
+### accessId [string]
+
+`accessId` Your Maxcompute accessId which cloud be access from Alibaba Cloud.
+
+### accesskey [string]
+
+`accesskey` Your Maxcompute accessKey which cloud be access from Alibaba Cloud.
+
+### endpoint [string]
+
+`endpoint` Your Maxcompute endpoint start with http.
+
+### project [string]
+
+`project` Your Maxcompute project which is created in Alibaba Cloud.
+
+### table_name [string]
+
+`table_name` Target Maxcompute table name eg: fake.
+
+### partition_spec [string]
+
+`partition_spec` This spec of Maxcompute partition table eg:ds='20220101'.
+
+### overwrite [boolean]
+
+`overwrite` Whether to overwrite the table or partition, default: false.
+
+### common options 
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+sink {
+  Maxcompute {
+    accessId="<your access id>"
+    accesskey="<your access Key>"
+    endpoint="<http://service.odps.aliyun.com/api>"
+    project="<your project>"
+    table_name="<your table name>"
+    #partition_spec="<your partition spec>"
+    #overwrite = false
+  }
+}
+```
+
+## Changelog
+
+### next version
+
+- [Feature] Add Maxcompute Sink 
Connector([3640](https://github.com/apache/incubator-seatunnel/pull/3640))
diff --git a/docs/en/connector-v2/source/Maxcompute.md 
b/docs/en/connector-v2/source/Maxcompute.md
new file mode 100644
index 000000000..133d65949
--- /dev/null
+++ b/docs/en/connector-v2/source/Maxcompute.md
@@ -0,0 +1,82 @@
+# Maxcompute
+
+> Maxcompute source connector
+
+## Description
+
+Used to read data from Maxcompute.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name            | type   | required  | default value |
+|-----------------|--------|-----------|---------------|
+| accessId        | string | yes       | -             |
+| accesskey       | string | yes       | -             |
+| endpoint        | string | yes       | -             |
+| project         | string | yes       | -             |
+| table_name      | string | yes       | -             |
+| partition_spec  | string | no        | -             |
+| split_row       | int    | no        | 10000         |
+| common-options  | string | no        |               |
+
+### accessId [string]
+
+`accessId` Your Maxcompute accessId which cloud be access from Alibaba Cloud.
+
+### accesskey [string]
+
+`accesskey` Your Maxcompute accessKey which cloud be access from Alibaba Cloud.
+
+### endpoint [string]
+
+`endpoint` Your Maxcompute endpoint start with http.
+
+### project [string]
+
+`project` Your Maxcompute project which is created in Alibaba Cloud.
+
+### table_name [string]
+
+`table_name` Target Maxcompute table name eg: fake.
+
+### partition_spec [string]
+
+`partition_spec` This spec of Maxcompute partition table eg:ds='20220101'.
+
+### split_row [int]
+
+`split_row` Number of rows per split, default: 10000.
+
+### common options 
+
+Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+source {
+  Maxcompute {
+    accessId="<your access id>"
+    accesskey="<your access Key>"
+    endpoint="<http://service.odps.aliyun.com/api>"
+    project="<your project>"
+    table_name="<your table name>"
+    #partition_spec="<your partition spec>"
+    #split_row = 10000
+  }
+}
+```
+
+## Changelog
+
+### next version
+
+- [Feature] Add Maxcompute Source 
Connector([3640](https://github.com/apache/incubator-seatunnel/pull/3640))
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 35f5c2014..f3bec6c14 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -160,4 +160,6 @@ seatunnel.source.Notion = connector-http-notion
 seatunnel.sink.RabbitMQ = connector-rabbitmq
 seatunnel.source.RabbitMQ = connector-rabbitmq
 seatunnel.source.OpenMldb = connector-openmldb
+seatunnel.source.Maxcompute = connector-maxcompute
+seatunnel.sink.Maxcompute = connector-maxcompute
 seatunnel.source.MySQL-CDC = connector-cdc-mysql
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-maxcompute/pom.xml 
b/seatunnel-connectors-v2/connector-maxcompute/pom.xml
new file mode 100644
index 000000000..66c6503f5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-maxcompute/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-maxcompute</artifactId>
+
+    <properties>
+        <maxcompute.version>0.31.3</maxcompute.version>
+        <commons.lang3.version>3.4</commons.lang3.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <version>${maxcompute.version}-public</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons.lang3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
new file mode 100644
index 000000000..29420520b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class MaxcomputeConfig implements Serializable {
+    private static final int SPLIT_ROW_DEFAULT = 10000;
+    public static final Option<String> ACCESS_ID = Options.key("accessId")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Your Maxcompute accessId which cloud be access from 
Alibaba Cloud");
+    public static final Option<String> ACCESS_KEY = Options.key("accesskey")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Your Maxcompute accessKey which cloud be access from 
Alibaba Cloud");
+    public static final Option<String> ENDPOINT = Options.key("endpoint")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Your Maxcompute endpoint start with http");
+    public static final Option<String> PROJECT = Options.key("project")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Your Maxcompute project which is created in Alibaba 
Cloud");
+    public static final Option<String> TABLE_NAME = Options.key("table_name")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Target Maxcompute table name eg: fake");
+    public static final Option<String> PARTITION_SPEC = 
Options.key("partition_spec")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("This spec of Maxcompute partition table.");
+    public static final Option<Integer> SPLIT_ROW = Options.key("split_row")
+        .intType()
+        .defaultValue(SPLIT_ROW_DEFAULT)
+        .withDescription("Number of rows per split. default: 10000");
+    public static final Option<Boolean> OVERWRITE = Options.key("overwrite")
+        .booleanType()
+        .defaultValue(false)
+        .withDescription("Whether to overwrite the table or partition");
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java
new file mode 100644
index 000000000..83cdaaa3e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/exception/MaxcomputeConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class MaxcomputeConnectorException extends SeaTunnelRuntimeException {
+
+    public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public MaxcomputeConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
new file mode 100644
index 000000000..cae4005b7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@AutoService(SeaTunnelSink.class)
+public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MaxcomputeSink.class);
+    private Config pluginConfig;
+    private SeaTunnelRowType typeInfo;
+
+    @Override
+    public String getPluginName() {
+        return "Maxcompute";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+        MaxcomputeUtil.initTableOrPartition(pluginConfig);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.typeInfo = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.typeInfo;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
+        return new MaxcomputeWriter(this.typeInfo, this.pluginConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
new file mode 100644
index 000000000..92ef0e664
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MaxcomputeSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Maxcompute";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
+            .optional(PARTITION_SPEC, OVERWRITE)
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
new file mode 100644
index 000000000..d2dcb0d23
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.RecordWriter;
+import com.aliyun.odps.tunnel.TableTunnel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final RecordWriter recordWriter;
+    private final TableTunnel.UploadSession session;
+    private final TableSchema tableSchema;
+
+    private Config pluginConfig;
+
+    public MaxcomputeWriter(SeaTunnelRowType seaTunnelRowType, Config 
pluginConfig) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.pluginConfig = pluginConfig;
+        try {
+            Table table = MaxcomputeUtil.getTable(pluginConfig);
+            this.tableSchema = table.getSchema();
+            TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
+            if (this.pluginConfig.hasPath(PARTITION_SPEC.key())) {
+                PartitionSpec partitionSpec = new 
PartitionSpec(this.pluginConfig.getString(PARTITION_SPEC.key()));
+                session = 
tunnel.createUploadSession(pluginConfig.getString(PROJECT.key()), 
pluginConfig.getString(TABLE_NAME.key()), partitionSpec);
+            } else {
+                session = 
tunnel.createUploadSession(pluginConfig.getString(PROJECT.key()), 
pluginConfig.getString(TABLE_NAME.key()));
+            }
+            this.recordWriter = 
session.openRecordWriter(Thread.currentThread().getId());
+            log.info("open record writer success");
+        } catch (Exception e) {
+            throw new 
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+        }
+    }
+
+    @Override
+    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
+        Record record = 
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.seaTunnelRowType);
+        recordWriter.write(record);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.recordWriter.close();
+        try {
+            this.session.commit(new Long[]{Thread.currentThread().getId()});
+        } catch (Exception e) {
+            throw new 
MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
new file mode 100644
index 000000000..b7381739a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class MaxcomputeSource implements SeaTunnelSource<SeaTunnelRow, 
MaxcomputeSourceSplit, MaxcomputeSourceState> {
+    private SeaTunnelRowType typeInfo;
+    private Config pluginConfig;
+
+    @Override
+    public String getPluginName() {
+        return "Maxcompute";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) {
+        this.typeInfo = MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig);
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public SeaTunnelRowType getProducedType() {
+        return this.typeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, MaxcomputeSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
+        return new MaxcomputeSourceReader(this.pluginConfig, readerContext, 
this.typeInfo);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceSplitEnumerator<MaxcomputeSourceSplit, MaxcomputeSourceState> 
createEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit> 
enumeratorContext) throws Exception {
+        return new MaxcomputeSourceSplitEnumerator(enumeratorContext, 
this.pluginConfig);
+    }
+
+    @Override
+    public SourceSplitEnumerator<MaxcomputeSourceSplit, MaxcomputeSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit> 
enumeratorContext, MaxcomputeSourceState checkpointState) throws Exception {
+        return new MaxcomputeSourceSplitEnumerator(enumeratorContext, 
this.pluginConfig, checkpointState);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
new file mode 100644
index 000000000..152ca8e54
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MaxcomputeSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Maxcompute";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
+            .optional(PARTITION_SPEC, SPLIT_ROW)
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
new file mode 100644
index 000000000..c3e978b0f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class MaxcomputeSourceReader implements SourceReader<SeaTunnelRow, 
MaxcomputeSourceSplit> {
+    private final SourceReader.Context context;
+    private final Set<MaxcomputeSourceSplit> sourceSplits;
+    private Config pluginConfig;
+    boolean noMoreSplit;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    public MaxcomputeSourceReader(Config pluginConfig, SourceReader.Context 
context, SeaTunnelRowType seaTunnelRowType) {
+        this.pluginConfig = pluginConfig;
+        this.context = context;
+        this.sourceSplits = new HashSet<>();
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        sourceSplits.forEach(source -> {
+            try {
+                TableTunnel.DownloadSession session = 
MaxcomputeUtil.getDownloadSession(pluginConfig);
+                TunnelRecordReader recordReader = 
session.openRecordReader(source.getSplitId(), source.getRowNum());
+                log.info("open record reader success");
+                Record record;
+                while ((record = recordReader.read()) != null) {
+                    SeaTunnelRow seaTunnelRow = 
MaxcomputeTypeMapper.getSeaTunnelRowData(record, seaTunnelRowType);
+                    output.collect(seaTunnelRow);
+                }
+                recordReader.close();
+            } catch (Exception e) {
+                throw new 
MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
+            }
+        });
+        if (this.noMoreSplit && 
Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded Maxcompute source");
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public List<MaxcomputeSourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return new ArrayList<>(sourceSplits);
+    }
+
+    @Override
+    public void addSplits(List<MaxcomputeSourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        this.noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java
new file mode 100644
index 000000000..4ab73c335
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.Getter;
+
+public class MaxcomputeSourceSplit implements SourceSplit {
+    @Getter
+    private int splitId;
+    @Getter
+    private long rowNum;
+
+    public MaxcomputeSourceSplit(int splitId, long rowNum) {
+        this.splitId = splitId;
+        this.rowNum = rowNum;
+    }
+
+    @Override
+    public String splitId() {
+        return String.valueOf(this.splitId);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
new file mode 100644
index 000000000..ad2363f3d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class MaxcomputeSourceSplitEnumerator implements 
SourceSplitEnumerator<MaxcomputeSourceSplit, MaxcomputeSourceState> {
+    private final Context<MaxcomputeSourceSplit> enumeratorContext;
+    private final Map<Integer, Set<MaxcomputeSourceSplit>> pendingSplits;
+    private Set<MaxcomputeSourceSplit> assignedSplits;
+    private Config pluginConfig;
+
+    public 
MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit>
 enumeratorContext, Config pluginConfig) {
+        this.enumeratorContext = enumeratorContext;
+        this.pluginConfig = pluginConfig;
+        this.pendingSplits = new HashMap<>();
+        this.assignedSplits = new HashSet<>();
+    }
+
+    public 
MaxcomputeSourceSplitEnumerator(SourceSplitEnumerator.Context<MaxcomputeSourceSplit>
 enumeratorContext, Config pluginConfig,
+                                           MaxcomputeSourceState sourceState) {
+        this(enumeratorContext, pluginConfig);
+        this.assignedSplits = sourceState.getAssignedSplit();
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void addSplitsBack(List<MaxcomputeSourceSplit> splits, int 
subtaskId) {
+        addSplitChangeToPendingAssignments(splits);
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplits.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+    }
+
+    @Override
+    public MaxcomputeSourceState snapshotState(long checkpointId) {
+        return new MaxcomputeSourceState(assignedSplits);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+    }
+
+    private void discoverySplits() throws TunnelException {
+        TableTunnel.DownloadSession session = 
MaxcomputeUtil.getDownloadSession(this.pluginConfig);
+        long recordCount = session.getRecordCount();
+        int numReaders = enumeratorContext.currentParallelism();
+        int splitRowNum = (int) Math.ceil((double) recordCount / numReaders);
+        int splitRow = SPLIT_ROW.defaultValue();
+        if (this.pluginConfig.hasPath(SPLIT_ROW.key())) {
+            splitRow = this.pluginConfig.getInt(SPLIT_ROW.key());
+        }
+        Set<MaxcomputeSourceSplit> allSplit = new HashSet<>();
+        for (int i = 0; i < numReaders; i++) {
+            int readerStart = i * splitRowNum;
+            int readerEnd = (int) Math.min((i + 1) * splitRowNum, recordCount);
+            for (int num = readerStart; num < readerEnd; num += splitRow) {
+                allSplit.add(new MaxcomputeSourceSplit(num, Math.min(splitRow, 
readerEnd - num)));
+            }
+        }
+        assignedSplits.forEach(allSplit::remove);
+        addSplitChangeToPendingAssignments(allSplit);
+        log.debug("Assigned {} to {} readers.", allSplit, numReaders);
+        log.info("Calculated splits successfully, the size of splits is {}.", 
allSplit.size());
+    }
+
+    private void 
addSplitChangeToPendingAssignments(Collection<MaxcomputeSourceSplit> newSplits) 
{
+        for (MaxcomputeSourceSplit split : newSplits) {
+            int ownerReader = split.getSplitId() % 
enumeratorContext.currentParallelism();
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+    }
+
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<MaxcomputeSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);
+
+            if (pendingAssignmentForReader != null && 
!pendingAssignmentForReader.isEmpty()) {
+                // Mark pending splits as already assigned
+                assignedSplits.addAll(pendingAssignmentForReader);
+                // Assign pending splits to reader
+                log.info("Assigning splits to readers {} {}", pendingReader, 
pendingAssignmentForReader);
+                enumeratorContext.assignSplit(pendingReader, new 
ArrayList<>(pendingAssignmentForReader));
+            }
+            enumeratorContext.signalNoMoreSplits(pendingReader);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java
new file mode 100644
index 000000000..1abcb000c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class MaxcomputeSourceState implements Serializable {
+    private Set<MaxcomputeSourceSplit> assignedSplit;
+
+    public MaxcomputeSourceState(Set<MaxcomputeSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
+    }
+
+    public Set<MaxcomputeSourceSplit> getAssignedSplit() {
+        return assignedSplit;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
new file mode 100644
index 000000000..fa6382c4f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import static com.aliyun.odps.OdpsType.ARRAY;
+import static com.aliyun.odps.OdpsType.BIGINT;
+import static com.aliyun.odps.OdpsType.BINARY;
+import static com.aliyun.odps.OdpsType.BOOLEAN;
+import static com.aliyun.odps.OdpsType.DATE;
+import static com.aliyun.odps.OdpsType.DECIMAL;
+import static com.aliyun.odps.OdpsType.DOUBLE;
+import static com.aliyun.odps.OdpsType.FLOAT;
+import static com.aliyun.odps.OdpsType.INT;
+import static com.aliyun.odps.OdpsType.MAP;
+import static com.aliyun.odps.OdpsType.SMALLINT;
+import static com.aliyun.odps.OdpsType.STRING;
+import static com.aliyun.odps.OdpsType.TIMESTAMP;
+import static com.aliyun.odps.OdpsType.TINYINT;
+import static com.aliyun.odps.OdpsType.VOID;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.DecimalTypeInfo;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+@Slf4j
+public class MaxcomputeTypeMapper implements Serializable {
+
+    private static SeaTunnelDataType<?> maxcomputeType2SeaTunnelType(TypeInfo 
typeInfo) {
+        switch (typeInfo.getOdpsType()) {
+            case BIGINT:
+                return BasicType.LONG_TYPE;
+            case DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case DECIMAL:
+                return mappingDecimalType((DecimalTypeInfo) typeInfo);
+            case MAP:
+                return mappingMapType((MapTypeInfo) typeInfo);
+            case ARRAY:
+                return mappingListType((ArrayTypeInfo) typeInfo);
+            case VOID:
+                return BasicType.VOID_TYPE;
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+                return BasicType.INT_TYPE;
+            case FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return BasicType.STRING_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TIMESTAMP:
+            case DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case STRUCT:
+                return mappingStructType((StructTypeInfo) typeInfo);
+            case INTERVAL_DAY_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case INTERVAL_YEAR_MONTH:
+            default:
+                throw new 
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format(
+                    "Doesn't support Maxcompute type '%s' .",
+                    typeInfo.getTypeName()));
+        }
+    }
+
+    private static DecimalType mappingDecimalType(DecimalTypeInfo 
decimalTypeInfo) {
+        return new DecimalType(decimalTypeInfo.getPrecision(), 
decimalTypeInfo.getScale());
+    }
+
+    private static MapType mappingMapType(MapTypeInfo mapTypeInfo) {
+        return new 
MapType(maxcomputeType2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()), 
maxcomputeType2SeaTunnelType(mapTypeInfo.getValueTypeInfo()));
+    }
+
+    private static ArrayType mappingListType(ArrayTypeInfo arrayTypeInfo) {
+        switch (arrayTypeInfo.getOdpsType()) {
+            case BOOLEAN:
+                return ArrayType.BOOLEAN_ARRAY_TYPE;
+            case INT:
+                return ArrayType.INT_ARRAY_TYPE;
+            case BIGINT:
+                return ArrayType.LONG_ARRAY_TYPE;
+            case FLOAT:
+                return ArrayType.FLOAT_ARRAY_TYPE;
+            case DOUBLE:
+                return ArrayType.DOUBLE_ARRAY_TYPE;
+            case STRING:
+                return ArrayType.STRING_ARRAY_TYPE;
+            default:
+                throw new 
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format(
+                    "Doesn't support Maxcompute type '%s' .",
+                    arrayTypeInfo.getTypeName()));
+        }
+    }
+
+    private static SeaTunnelRowType mappingStructType(StructTypeInfo 
structType) {
+        List<TypeInfo> fields = structType.getFieldTypeInfos();
+        List<String> fieldNames = new ArrayList<>(fields.size());
+        List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>(fields.size());
+        for (TypeInfo field : fields) {
+            fieldNames.add(field.getTypeName());
+            fieldTypes.add(maxcomputeType2SeaTunnelType(field));
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new String[0]),
+            fieldTypes.toArray(new SeaTunnelDataType[0]));
+    }
+
+    private static OdpsType seaTunnelType2MaxcomputeType(SeaTunnelDataType<?> 
seaTunnelDataType) {
+        switch (seaTunnelDataType.getSqlType()) {
+            case ARRAY:
+                return ARRAY;
+            case MAP:
+                return MAP;
+            case STRING:
+                return STRING;
+            case BOOLEAN:
+                return BOOLEAN;
+            case TINYINT:
+                return TINYINT;
+            case SMALLINT:
+                return SMALLINT;
+            case INT:
+                return INT;
+            case BIGINT:
+                return BIGINT;
+            case FLOAT:
+                return FLOAT;
+            case DOUBLE:
+                return DOUBLE;
+            case DECIMAL:
+                return DECIMAL;
+            case BYTES:
+                return BINARY;
+            case DATE:
+                return DATE;
+            case TIMESTAMP:
+                return TIMESTAMP;
+            case NULL:
+                return VOID;
+            case TIME:
+            default:
+                throw new 
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format(
+                    "Doesn't support SeaTunnelDataType type '%s' .",
+                    seaTunnelDataType.getSqlType()));
+        }
+    }
+
+    public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) {
+        Table table = MaxcomputeUtil.getTable(pluginConfig);
+        TableSchema tableSchema = table.getSchema();
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+            for (int i = 0; i < tableSchema.getColumns().size(); i++) {
+                fieldNames.add(tableSchema.getColumns().get(i).getName());
+                TypeInfo maxcomputeTypeInfo = 
tableSchema.getColumns().get(i).getTypeInfo();
+                SeaTunnelDataType<?> seaTunnelDataType = 
maxcomputeType2SeaTunnelType(maxcomputeTypeInfo);
+                seaTunnelDataTypes.add(seaTunnelDataType);
+            }
+        } catch (Exception e) {
+            throw new 
MaxcomputeConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, e);
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new 
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new 
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+    }
+
+    public static TableSchema seaTunnelRowType2TableSchema(SeaTunnelRowType 
seaTunnelRowType) {
+        TableSchema tableSchema = new TableSchema();
+        for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+            OdpsType odpsType = 
seaTunnelType2MaxcomputeType(seaTunnelRowType.getFieldType(i));
+            Column column = new Column(seaTunnelRowType.getFieldName(i), 
odpsType);
+            tableSchema.addColumn(column);
+        }
+        return tableSchema;
+    }
+
+    private static Object resolveObject(Object field, SeaTunnelDataType<?> 
fieldType) {
+        if (field == null) {
+            return null;
+        }
+        switch (fieldType.getSqlType()) {
+            case ARRAY:
+                ArrayList<Object> origArray = new ArrayList<>();
+                java.util.Arrays.stream(((Record) 
field).getColumns()).iterator().forEachRemaining(origArray::add);
+                SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) 
fieldType).getElementType();
+                switch (elementType.getSqlType()) {
+                    case STRING:
+                        return origArray.toArray(new String[0]);
+                    case BOOLEAN:
+                        return origArray.toArray(new Boolean[0]);
+                    case INT:
+                        return origArray.toArray(new Integer[0]);
+                    case BIGINT:
+                        return origArray.toArray(new Long[0]);
+                    case FLOAT:
+                        return origArray.toArray(new Float[0]);
+                    case DOUBLE:
+                        return origArray.toArray(new Double[0]);
+                    default:
+                        String errorMsg = String.format("SeaTunnel array type 
not support this type [%s] now", fieldType.getSqlType());
+                        throw new 
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel 
not support this data type now");
+                }
+            case MAP:
+                HashMap<Object, Object> dataMap = new HashMap<>();
+                SeaTunnelDataType<?> keyType = ((MapType<?, ?>) 
fieldType).getKeyType();
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) 
fieldType).getValueType();
+                HashMap<Object, Object> origDataMap = (HashMap<Object, 
Object>) field;
+                origDataMap.forEach((key, value) -> 
dataMap.put(resolveObject(key, keyType), resolveObject(value, valueType)));
+                return dataMap;
+            case BOOLEAN:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DECIMAL:
+            case DATE:
+                return field;
+            case STRING:
+                return field.toString();
+            case TINYINT:
+                return Byte.parseByte(field.toString());
+            case SMALLINT:
+                return Short.parseShort(field.toString());
+            case NULL:
+                return null;
+            case BYTES:
+                ByteBuffer buffer = (ByteBuffer) field;
+                byte[] bytes = new byte[buffer.remaining()];
+                buffer.get(bytes, 0, bytes.length);
+                return bytes;
+            case TIMESTAMP:
+                Instant instant = Instant.ofEpochMilli((long) field);
+                return LocalDateTime.ofInstant(instant, ZoneId.of("+8"));
+            default:
+                // do nothing
+                // never got in there
+                throw new 
MaxcomputeConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel 
not support this data type now");
+        }
+    }
+
+    public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType 
typeInfo) throws SQLException {
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+        for (int i = 0; i < rs.getColumns().length; i++) {
+            fields.add(resolveObject(rs.get(i), seaTunnelDataTypes[i]));
+        }
+        return new SeaTunnelRow(fields.toArray());
+    }
+
+    public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow, 
SeaTunnelRowType seaTunnelRowType) {
+        TableSchema tableSchema = 
seaTunnelRowType2TableSchema(seaTunnelRowType);
+        ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
+        for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
+            arrayRecord.set(i, resolveObject(seaTunnelRow.getField(i), 
seaTunnelRowType.getFieldType(i)));
+        }
+        return arrayRecord;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
new file mode 100644
index 000000000..4bf37b95b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
+
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.tunnel.TableTunnel;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MaxcomputeUtil {
+    public static Table getTable(Config pluginConfig) {
+        Odps odps = getOdps(pluginConfig);
+        Table table = 
odps.tables().get(pluginConfig.getString(TABLE_NAME.key()));
+        return table;
+    }
+
+    public static TableTunnel getTableTunnel(Config pluginConfig) {
+        Odps odps = getOdps(pluginConfig);
+        TableTunnel tunnel = new TableTunnel(odps);
+        return tunnel;
+    }
+
+    public static Odps getOdps(Config pluginConfig) {
+        Account account = new 
AliyunAccount(pluginConfig.getString(ACCESS_ID.key()), 
pluginConfig.getString(ACCESS_KEY.key()));
+        Odps odps = new Odps(account);
+        odps.setEndpoint(pluginConfig.getString(ENDPOINT.key()));
+        odps.setDefaultProject(pluginConfig.getString(PROJECT.key()));
+        return odps;
+    }
+
+    public static TableTunnel.DownloadSession getDownloadSession(Config 
pluginConfig) {
+        TableTunnel tunnel = getTableTunnel(pluginConfig);
+        TableTunnel.DownloadSession session;
+        try {
+            if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
+                PartitionSpec partitionSpec = new 
PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
+                session = 
tunnel.createDownloadSession(pluginConfig.getString(PROJECT.key()), 
pluginConfig.getString(TABLE_NAME.key()), partitionSpec);
+            } else {
+                session = 
tunnel.createDownloadSession(pluginConfig.getString(PROJECT.key()), 
pluginConfig.getString(TABLE_NAME.key()));
+            }
+        } catch (Exception e) {
+            throw new 
MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
+        }
+        return session;
+    }
+
+    public static void initTableOrPartition(Config pluginConfig) {
+        Boolean overwrite = OVERWRITE.defaultValue();
+        if (pluginConfig.hasPath(OVERWRITE.key())) {
+            overwrite = pluginConfig.getBoolean(OVERWRITE.key());
+        }
+        try {
+            Table table = MaxcomputeUtil.getTable(pluginConfig);
+            if (pluginConfig.hasPath(PARTITION_SPEC.key())) {
+                PartitionSpec partitionSpec = new 
PartitionSpec(pluginConfig.getString(PARTITION_SPEC.key()));
+                if (overwrite) {
+                    try {
+                        table.deletePartition(partitionSpec, true);
+                    } catch (NullPointerException e) {
+                        log.debug("NullPointerException when delete table 
partition");
+                    }
+                }
+                table.createPartition(partitionSpec, true);
+            } else {
+                if (overwrite) {
+                    try {
+                        table.truncate();
+                    } catch (NullPointerException e) {
+                        log.debug("NullPointerException when truncate table");
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new 
MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf
new file mode 100644
index 000000000..403876619
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/resources/maxcompute_to_maxcompute.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 2
+  job.mode = "STREAMING"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  MaxcomputeSource {
+    accessId="<your access id>"
+    accesskey="<your access Key>"
+    endpoint="<http://service.odps.aliyun.com/api>"
+    project="<your project>"
+    table_name="<your table name>"
+    #partition_spec="<your partition spec>"
+    #split_row = 10000
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+  sql {
+    source_table_name = "fake"
+    sql = "select * from fake"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform
+}
+
+sink {
+  MaxcomputeSink {
+    accessId="<your access id>"
+    accesskey="<your access Key>"
+    endpoint="<http://service.odps.aliyun.com/api>"
+    project="<your project>"
+    result_table_name="<your table name>"
+    #partition_spec="<your partition spec>"
+    #overwrite = false
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java
new file mode 100644
index 000000000..da14d0680
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/BasicTypeToOdpsTypeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+
+public class BasicTypeToOdpsTypeTest {
+
+    private static void testType(String fieldName, SeaTunnelDataType<?> 
seaTunnelDataType, OdpsType odpsType, Object object) throws SQLException {
+        SeaTunnelRowType typeInfo = new SeaTunnelRowType(new String[]{
+            fieldName
+        }, new SeaTunnelDataType<?>[]{
+            seaTunnelDataType
+        });
+
+        ArrayRecord record = new ArrayRecord(new Column[]{
+            new Column(fieldName, odpsType)
+        });
+        record.set(fieldName, object);
+
+        SeaTunnelRow seaTunnelRow = 
MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
+        Record tRecord = 
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, typeInfo);
+
+        for (int i = 0; i < tRecord.getColumns().length; i++) {
+            Assertions.assertEquals(record.get(i), tRecord.get(i));
+        }
+    }
+
+    @SneakyThrows
+    @Test
+    void testSTRING_TYPE_2_STRING() {
+        testType("STRING_TYPE_2_STRING", BasicType.STRING_TYPE, 
OdpsType.STRING, "hello");
+    }
+
+    @SneakyThrows
+    @Test
+    void testBOOLEAN_TYPE_2_BOOLEAN() {
+        testType("BOOLEAN_TYPE_2_BOOLEAN", BasicType.BOOLEAN_TYPE, 
OdpsType.BOOLEAN, Boolean.TRUE);
+    }
+
+    @SneakyThrows
+    @Test
+    void testSHORT_TYPE_2_SMALLINT() {
+        testType("SHORT_TYPE_2_SMALLINT", BasicType.SHORT_TYPE, 
OdpsType.SMALLINT, Short.MAX_VALUE);
+    }
+
+    @SneakyThrows
+    @Test
+    void testLONG_TYPE_2_BIGINT() {
+        testType("LONG_TYPE_2_BIGINT", BasicType.LONG_TYPE, OdpsType.BIGINT, 
Long.MAX_VALUE);
+    }
+
+    @SneakyThrows
+    @Test
+    void testFLOAT_TYPE_2_FLOAT_TYPE() {
+        testType("FLOAT_TYPE_2_FLOAT_TYPE", BasicType.FLOAT_TYPE, 
OdpsType.FLOAT, Float.MAX_VALUE);
+    }
+
+    @SneakyThrows
+    @Test
+    void testDOUBLE_TYPE_2_DOUBLE() {
+        testType("DOUBLE_TYPE_2_DOUBLE", BasicType.DOUBLE_TYPE, 
OdpsType.DOUBLE, Double.MAX_VALUE);
+    }
+
+    @SneakyThrows
+    @Test
+    void testVOID_TYPE_2_VOID() {
+        testType("VOID_TYPE_2_VOID", BasicType.VOID_TYPE, OdpsType.VOID, null);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java
new file mode 100644
index 000000000..53627c2f6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/MaxcomputeSourceFactoryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MaxcomputeSourceFactoryTest {
+    @Test
+    void optionRule() {
+        Assertions.assertNotNull((new MaxcomputeSourceFactory()).optionRule());
+        Assertions.assertNotNull((new MaxcomputeSinkFactory()).optionRule());
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 7a7a7d3af..3a0ba4cd4 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -65,6 +65,7 @@
         <module>connector-slack</module>
         <module>connector-rabbitmq</module>
         <module>connector-openmldb</module>
+        <module>connector-maxcompute</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 5e8134f9d..c588db021 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -407,6 +407,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-maxcompute</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-cdc-mysql</artifactId>
@@ -575,4 +581,4 @@
             </build>
         </profile>
     </profiles>
-</project>
+</project>
\ No newline at end of file

Reply via email to