This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a73bb3e71 [Feature][Connector-V2][JDBC] support sqlite Source & Sink 
(#3089)
a73bb3e71 is described below

commit a73bb3e714025be9f287ef2d8816b63914d33554
Author: nuts.jian <[email protected]>
AuthorDate: Thu Nov 24 16:31:50 2022 +0800

    [Feature][Connector-V2][JDBC] support sqlite Source & Sink (#3089)
---
 docs/en/connector-v2/sink/Jdbc.md                  |   2 +
 docs/en/connector-v2/source/Jdbc.md                |   2 +
 pom.xml                                            |   6 -
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  14 +-
 .../seatunnel/jdbc/config/JdbcConfig.java          |   1 +
 .../internal/dialect/sqlite/SqliteDialect.java     |  62 ++++++
 .../dialect/sqlite/SqliteDialectFactory.java       |  40 ++++
 .../dialect/sqlite/SqliteJdbcRowConverter.java     |  29 +++
 .../internal/dialect/sqlite/SqliteTypeMapper.java  | 172 +++++++++++++++++
 .../internal/options/JdbcConnectionOptions.java    |   1 +
 .../connector-jdbc-flink-e2e/pom.xml               |   5 +
 .../seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java  | 180 +++++++++++++++++
 .../test/resources/jdbc/init_sql/sqlite_init.conf  | 212 +++++++++++++++++++++
 .../jdbc/jdbc_sqlite_source_and_sink.conf          |  51 +++++
 .../jdbc/jdbc_sqlite_source_and_sink_datatype.conf | 147 ++++++++++++++
 .../connector-jdbc-spark-e2e/pom.xml               |   5 +
 .../seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java  | 179 +++++++++++++++++
 .../test/resources/jdbc/init_sql/sqlite_init.conf  | 212 +++++++++++++++++++++
 .../jdbc/jdbc_sqlite_source_and_sink.conf          |  57 ++++++
 .../jdbc/jdbc_sqlite_source_and_sink_datatype.conf | 147 ++++++++++++++
 20 files changed, 1516 insertions(+), 8 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index f145cc220..af586d5ec 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -128,6 +128,7 @@ there are some reference value for params above.
 | Phoenix    | org.apache.phoenix.queryserver.client.Driver | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /          
                                        | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                        |
 | SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
jdbc:sqlserver://localhost:1433                                    | 
com.microsoft.sqlserver.jdbc.SQLServerXADataSource | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                            |
 | Oracle     | oracle.jdbc.OracleDriver                     | 
jdbc:oracle:thin:@localhost:1521/xepdb1                            | 
oracle.jdbc.xa.OracleXADataSource                  | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                            |
+| sqlite     | org.sqlite.JDBC                              | 
jdbc:sqlite:test.db                                                | /          
                                        | 
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc                       
                            |
 | GBase8a    | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                               | /          
                                        | 
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
 |
 | StarRocks  | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | db2        | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                  | 
com.ibm.db2.jcc.DB2XADataSource                    | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                            |
@@ -199,4 +200,5 @@ sink {
 
 ### next version
 
+- [Feature] Support Sqlite JDBC Sink 
([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
 - [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 20bb0bb1e..e6c1c08a9 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -104,6 +104,7 @@ there are some reference value for params above.
 | phoenix    | org.apache.phoenix.queryserver.client.Driver        | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF     | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                        |
 | sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver        | 
jdbc:sqlserver://localhost:1433                                        | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                            |
 | oracle     | oracle.jdbc.OracleDriver                            | 
jdbc:oracle:thin:@localhost:1521/xepdb1                                | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                            |
+| sqlite     | org.sqlite.JDBC                                     | 
jdbc:sqlite:test.db                                                    | 
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc                       
                            |
 | gbase8a    | com.gbase.jdbc.Driver                               | 
jdbc:gbase://e2e_gbase8aDb:5258/test                                   | 
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
 |
 | starrocks  | com.mysql.cj.jdbc.Driver                            | 
jdbc:mysql://localhost:3306/test                                       | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
 | db2        | com.ibm.db2.jcc.DB2Driver                           | 
jdbc:db2://localhost:50000/testdb                                      | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                            |
@@ -156,5 +157,6 @@ parallel:
 ### next version
 
 - [BugFix] Fix jdbc split bug 
([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
+- [Feature] Support Sqlite JDBC Source 
([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
 - [Feature] Support Tablestore Source 
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
 - [Feature] Support JDBC Fetch Size Config 
([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
diff --git a/pom.xml b/pom.xml
index 832e199ec..0517ad23e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -841,12 +841,6 @@
                     </dependency>
                 </dependencies>
             </plugin>
-
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-            </plugin>
-
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 2ef11b89a..b7f40e417 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -36,7 +36,9 @@
         <sqlserver.version>9.2.1.jre8</sqlserver.version>
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
         <oracle.version>12.2.0.1</oracle.version>
+        <sqlite.version>3.39.3.0</sqlite.version>
         <db2.version>db2jcc4</db2.version>
+        <sqlite.version>3.39.3.0</sqlite.version>
         <tablestore.version>5.13.9</tablestore.version>
     </properties>
 
@@ -78,20 +80,24 @@
                 <version>${oracle.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.xerial</groupId>
+                <artifactId>sqlite-jdbc</artifactId>
+                <version>${sqlite.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <dependency>
                 <groupId>com.ibm.db2.jcc</groupId>
                 <artifactId>db2jcc</artifactId>
                 <version>${db2.version}</version>
                 <scope>provided</scope>
             </dependency>
-
             <dependency>
                 <groupId>com.aliyun.openservices</groupId>
                 <artifactId>tablestore-jdbc</artifactId>
                 <version>${tablestore.version}</version>
                 <scope>provided</scope>
             </dependency>
-
         </dependencies>
     </dependencyManagement>
 
@@ -129,6 +135,10 @@
             <groupId>com.oracle.database.jdbc</groupId>
             <artifactId>ojdbc8</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.xerial</groupId>
+            <artifactId>sqlite-jdbc</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.ibm.db2.jcc</groupId>
             <artifactId>db2jcc</artifactId>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
index 12eeac7aa..3db3bdc5e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -111,6 +111,7 @@ public class JdbcConfig implements Serializable {
                 jdbcOptions.transactionTimeoutSec = 
config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC.key());
             }
         }
+
         return jdbcOptions;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
new file mode 100644
index 000000000..bf361dc8c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialect.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class SqliteDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "Sqlite";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new SqliteJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new SqliteTypeMapper();
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return "`" + identifier + "`";
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields) {
+        String updateClause = Arrays.stream(fieldNames)
+                .map(fieldName -> quoteIdentifier(fieldName) + "=VALUES(" + 
quoteIdentifier(fieldName) + ")")
+                .collect(Collectors.joining(", "));
+
+        String conflictFields = Arrays.stream(uniqueKeyFields)
+                .map(this::quoteIdentifier)
+                .collect(Collectors.joining(","));
+
+        String upsertSQL = getInsertIntoStatement(tableName, fieldNames) + " 
ON CONFLICT(" + conflictFields + ") DO UPDATE SET " + updateClause;
+        return Optional.of(upsertSQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
new file mode 100644
index 000000000..e9639ec0c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link SqliteDialect}.
+ */
+
+@AutoService(JdbcDialectFactory.class)
+public class SqliteDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:sqlite:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new SqliteDialect();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java
new file mode 100644
index 000000000..1e56c5d43
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteJdbcRowConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+public class SqliteJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return "Sqlite";
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java
new file mode 100644
index 000000000..87681aa7e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class SqliteTypeMapper implements JdbcDialectTypeMapper {
+
+    // ============================data types=====================
+
+    private static final String SQLITE_UNKNOWN = "UNKNOWN";
+    private static final String SQLITE_BIT = "BIT";
+    private static final String SQLITE_BOOLEAN = "BOOLEAN";
+
+    // -------------------------integer----------------------------
+    private static final String SQLITE_TINYINT = "TINYINT";
+    private static final String SQLITE_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String SQLITE_SMALLINT = "SMALLINT";
+    private static final String SQLITE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String SQLITE_MEDIUMINT = "MEDIUMINT";
+    private static final String SQLITE_MEDIUMINT_UNSIGNED = "MEDIUMINT 
UNSIGNED";
+    private static final String SQLITE_INT = "INT";
+    private static final String SQLITE_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String SQLITE_INTEGER = "INTEGER";
+    private static final String SQLITE_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String SQLITE_BIGINT = "BIGINT";
+    private static final String SQLITE_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String SQLITE_DECIMAL = "DECIMAL";
+    private static final String SQLITE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String SQLITE_FLOAT = "FLOAT";
+    private static final String SQLITE_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String SQLITE_DOUBLE = "DOUBLE";
+    private static final String SQLITE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String SQLITE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    private static final String SQLITE_NUMERIC = "NUMERIC";
+    private static final String SQLITE_REAL = "REAL";
+
+    // -------------------------text----------------------------
+    private static final String SQLITE_CHAR = "CHAR";
+    private static final String SQLITE_CHARACTER = "CHARACTER";
+    private static final String SQLITE_VARYING_CHARACTER = "VARYING_CHARACTER";
+    private static final String SQLITE_NATIVE_CHARACTER = "NATIVE_CHARACTER";
+    private static final String SQLITE_NCHAR = "NCHAR";
+    private static final String SQLITE_VARCHAR = "VARCHAR";
+    private static final String SQLITE_LONGVARCHAR = "LONGVARCHAR";
+    private static final String SQLITE_LONGNVARCHAR = "LONGNVARCHAR";
+    private static final String SQLITE_NVARCHAR = "NVARCHAR";
+    private static final String SQLITE_TINYTEXT = "TINYTEXT";
+    private static final String SQLITE_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String SQLITE_TEXT = "TEXT";
+    private static final String SQLITE_LONGTEXT = "LONGTEXT";
+    private static final String SQLITE_JSON = "JSON";
+    private static final String SQLITE_CLOB = "CLOB";
+
+    // ------------------------------time(text)-------------------------
+    private static final String SQLITE_DATE = "DATE";
+    private static final String SQLITE_DATETIME = "DATETIME";
+    private static final String SQLITE_TIME = "TIME";
+    private static final String SQLITE_TIMESTAMP = "TIMESTAMP";
+
+    // ------------------------------blob-------------------------
+    private static final String SQLITE_TINYBLOB = "TINYBLOB";
+    private static final String SQLITE_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String SQLITE_BLOB = "BLOB";
+    private static final String SQLITE_LONGBLOB = "LONGBLOB";
+    private static final String SQLITE_BINARY = "BINARY";
+    private static final String SQLITE_VARBINARY = "VARBINARY";
+    private static final String SQLITE_LONGVARBINARY = "LONGVARBINARY";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int 
colIndex) throws SQLException {
+        String columnTypeName = 
metadata.getColumnTypeName(colIndex).toUpperCase().trim();
+        switch (columnTypeName) {
+            case SQLITE_BIT:
+            case SQLITE_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case SQLITE_TINYINT:
+            case SQLITE_TINYINT_UNSIGNED:
+            case SQLITE_SMALLINT:
+            case SQLITE_SMALLINT_UNSIGNED:
+                return BasicType.SHORT_TYPE;
+            case SQLITE_MEDIUMINT:
+            case SQLITE_MEDIUMINT_UNSIGNED:
+            case SQLITE_INT:
+            case SQLITE_INTEGER:
+                return BasicType.INT_TYPE;
+            case SQLITE_INT_UNSIGNED:
+            case SQLITE_INTEGER_UNSIGNED:
+            case SQLITE_BIGINT:
+            case SQLITE_BIGINT_UNSIGNED:
+            case SQLITE_NUMERIC:
+                return BasicType.LONG_TYPE;
+            case SQLITE_DECIMAL:
+            case SQLITE_DECIMAL_UNSIGNED:
+            case SQLITE_DOUBLE:
+            case SQLITE_DOUBLE_PRECISION:
+            case SQLITE_REAL:
+                return BasicType.DOUBLE_TYPE;
+            case SQLITE_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case SQLITE_FLOAT_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", 
SQLITE_FLOAT_UNSIGNED);
+                return BasicType.FLOAT_TYPE;
+            case SQLITE_DOUBLE_UNSIGNED:
+                log.warn("{} will probably cause value overflow.", 
SQLITE_DOUBLE_UNSIGNED);
+                return BasicType.DOUBLE_TYPE;
+            case SQLITE_CHARACTER:
+            case SQLITE_VARYING_CHARACTER:
+            case SQLITE_NATIVE_CHARACTER:
+            case SQLITE_NVARCHAR:
+            case SQLITE_NCHAR:
+            case SQLITE_LONGNVARCHAR:
+            case SQLITE_LONGVARCHAR:
+            case SQLITE_CLOB:
+            case SQLITE_CHAR:
+            case SQLITE_TINYTEXT:
+            case SQLITE_MEDIUMTEXT:
+            case SQLITE_TEXT:
+            case SQLITE_VARCHAR:
+            case SQLITE_JSON:
+            case SQLITE_LONGTEXT:
+
+            case SQLITE_DATE:
+            case SQLITE_TIME:
+            case SQLITE_DATETIME:
+            case SQLITE_TIMESTAMP:
+                return BasicType.STRING_TYPE;
+
+            case SQLITE_TINYBLOB:
+            case SQLITE_MEDIUMBLOB:
+            case SQLITE_BLOB:
+            case SQLITE_LONGBLOB:
+            case SQLITE_VARBINARY:
+            case SQLITE_BINARY:
+            case SQLITE_LONGVARBINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+
+            //Doesn't support yet
+            case SQLITE_UNKNOWN:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support SQLite type '%s' on column 
'%s'  yet.",
+                                columnTypeName, jdbcColumnName));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
index 6487c7140..c10cbf96e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
@@ -39,6 +39,7 @@ public class JdbcConnectionOptions
     public int maxRetries = DEFAULT_MAX_RETRIES;
     public String username;
     public String password;
+    public String query;
 
     public boolean autoCommit = JdbcConfig.AUTO_COMMIT.defaultValue();
 
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index f6220d9b0..c2e7bad81 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -108,6 +108,11 @@
             <artifactId>mssql-jdbc</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.xerial</groupId>
+            <artifactId>sqlite-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java
new file mode 100644
index 000000000..4206134c2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = 
generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = 
"https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";;
+
+    private void initTestDb() throws Exception {
+        URI resource = 
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", 
"sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", 
"check_type_sink_table_sql");
+        tmpdir = Paths.get(System.getProperty("java.io.tmpdir")).toString();
+        Connection connection = null;
+        try {
+            Class.forName("org.sqlite.JDBC");
+            connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + 
"/test.db", "", "");
+            Statement statement = connection.createStatement();
+            statement.execute("drop table if exists source");
+            statement.execute("drop table if exists sink");
+            statement.execute("drop table if exists type_source_table");
+            statement.execute("drop table if exists type_sink_table");
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            
statement.execute(config.getString("insert_type_source_table_sql"));
+
+            String sql = "insert into source(age, name) values(?, ?)";
+            connection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+                for (List<Object> row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            connection.commit();
+        } catch (Exception e) {
+            if (null != connection) {
+                try {
+                    connection.rollback();
+                } catch (SQLException ex) {
+                    ex.printStackTrace();
+                }
+            }
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        
taskManager.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new 
File(tmpdir + "/test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = 
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", 
"sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", 
"check_type_sink_table_sql");
+
+        try (Connection connection = 
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = 
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+            resultSet.next();
+            Assertions.assertEquals(resultSet.getInt(1), 2);
+        }
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSink() throws IOException, 
InterruptedException, SQLException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        
taskManager.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), new 
File(tmpdir + "/test.db").toPath().toString());
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<List<Object>> result = new ArrayList<>();
+        try (Connection connection = 
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+            Assertions.assertIterableEquals(TEST_DATASET, result);
+        }
+    }
+
+    @AfterEach
+    public void closeResource() throws SQLException, IOException {
+        // remove the temp test.db file
+        Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+    }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws 
IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+
+        Container.ExecResult mkdirCommands1 = 
jobManager.execInContainer("bash", "-c", "mkdir -p " + "/sqlite");
+        Assertions.assertEquals(0, mkdirCommands1.getExitCode());
+        Container.ExecResult mkdirCommands2 = 
taskManager.execInContainer("bash", "-c", "mkdir -p " + "/sqlite");
+        Assertions.assertEquals(0, mkdirCommands2.getExitCode());
+        jobManager.execInContainer("bash", "-c", "chmod 777 -R /sqlite");
+        taskManager.execInContainer("bash", "-c", "chmod 777 -R /sqlite");
+        try {
+            initTestDb();
+            // copy db file to container, dist file path in container is 
/tmp/seatunnel/data/test.db
+            jobManager.copyFileToContainer(MountableFile.forHostPath(tmpdir + 
"/test.db"), "/sqlite/test.db");
+            taskManager.copyFileToContainer(MountableFile.forHostPath(tmpdir + 
"/test.db"), "/sqlite/test.db");
+            jobManager.execInContainer("bash", "-c", "chmod 777 
/sqlite/test.db");
+            taskManager.execInContainer("bash", "-c", "chmod 777 
/sqlite/test.db");
+        } catch (Exception e) {
+            log.error("init test.db and copy test.db to container error", e);
+            Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
new file mode 100644
index 000000000..aeef3d5fa
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source_table = """ CREATE TABLE source ( `id` INTEGER PRIMARY KEY 
AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, 
`name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `timestamp`
+)
+VALUES
+       (
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               'a',
+               'a',
+               'a',
+               'a',
+               'a',
+               '{}',
+               '2022-09-22',
+               '2022-09-22 15:07:44',
+       '2022-09-22 15:07:55'
+       )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+       count( 1 )
+FROM
+       ( SELECT * FROM type_source_table UNION ALL SELECT * FROM 
type_sink_table ) a
+GROUP BY
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `timestamp`
+"""
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
new file mode 100644
index 000000000..a3273d98d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    Jdbc {
+        driver = org.sqlite.JDBC
+        url = "jdbc:sqlite:/sqlite/test.db"
+        user = ""
+        password = ""
+        query = "select age, name from source"
+    }
+}
+
+transform {
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    Jdbc {
+        driver = org.sqlite.JDBC
+        url = "jdbc:sqlite:/sqlite/test.db"
+        user = ""
+        password = ""
+        query = "insert into sink(age, name) values(?, ?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
new file mode 100644
index 000000000..68525fc8c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:sqlite:/sqlite/test.db"
+        driver = org.sqlite.JDBC
+        user = ""
+        password = ""
+        query = """ SELECT
+                `binary`,
+                `blob`,
+                `long_varbinary`,
+                `longblob`,
+                `tinyblob`,
+                `varbinary`,
+                `tinyint`,
+                `tinyint_unsigned`,
+                `smallint`,
+                `smallint_unsigned`,
+                `mediumint`,
+                `mediumint_unsigned`,
+                `int`,
+                `int_unsigned`,
+                `integer`,
+                `integer_unsigned`,
+                `bigint`,
+                `bigint_unsigned`,
+                `numeric`,
+                `decimal`,
+                `float`,
+                `double`,
+                `double_precision`,
+                `longtext`,
+                `mediumtext`,
+                `text`,
+                `tinytext`,
+                `varchar`,
+                `json`,
+                `date`,
+                `datetime`,
+                `timestamp`
+
+                FROM `type_source_table`
+                """
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:sqlite:/sqlite/test.db"
+        driver = org.sqlite.JDBC
+        user = ""
+        password = ""
+        query = """ INSERT INTO `type_sink_table` (
+                       `binary`,
+                       `blob`,
+                       `long_varbinary`,
+                       `longblob`,
+                       `tinyblob`,
+                       `varbinary`,
+                       `tinyint`,
+                       `tinyint_unsigned`,
+                       `smallint`,
+                       `smallint_unsigned`,
+                       `mediumint`,
+                       `mediumint_unsigned`,
+                       `int`,
+                       `int_unsigned`,
+                       `integer`,
+                       `integer_unsigned`,
+                       `bigint`,
+                       `bigint_unsigned`,
+                       `numeric`,
+                       `decimal`,
+                       `float`,
+                       `double`,
+                       `double_precision`,
+                       `longtext`,
+                       `mediumtext`,
+                       `text`,
+                       `tinytext`,
+                       `varchar`,
+                       `json`,
+                       `date`,
+                       `datetime`,
+                       `timestamp`
+                 )
+                 VALUES
+                       (
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                           ?
+                       )"""
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index ee7629f14..8cd754f05 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -96,6 +96,11 @@
             <groupId>com.microsoft.sqlserver</groupId>
             <artifactId>mssql-jdbc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.xerial</groupId>
+            <artifactId>sqlite-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java
new file mode 100644
index 000000000..c9d2815dc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = 
generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = 
"https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";;
+
+    private void initTestDb() throws Exception {
+        URI resource = 
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", 
"sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", 
"check_type_sink_table_sql");
+        tmpdir = Paths.get(System.getProperty("java.io.tmpdir")).toString();
+        Connection connection = null;
+        try {
+            Class.forName("org.sqlite.JDBC");
+            connection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + 
"/test.db", "", "");
+            Statement statement = connection.createStatement();
+            statement.execute("drop table if exists source");
+            statement.execute("drop table if exists sink");
+            statement.execute("drop table if exists type_source_table");
+            statement.execute("drop table if exists type_sink_table");
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            
statement.execute(config.getString("insert_type_source_table_sql"));
+
+            String sql = "insert into source(age, name) values(?, ?)";
+            connection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+                for (List<Object> row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            connection.commit();
+        } catch (Exception e) {
+            if (null != connection) {
+                try {
+                    connection.rollback();
+                } catch (SQLException ex) {
+                    ex.printStackTrace();
+                }
+            }
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        master.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), 
new File(tmpdir + "/test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = 
Objects.requireNonNull(JdbcSqliteIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", 
"sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", 
"check_type_sink_table_sql");
+
+        try (Connection connection = 
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = 
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+            resultSet.next();
+            Assertions.assertEquals(resultSet.getInt(1), 2);
+        }
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSink() throws IOException, 
InterruptedException, SQLException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        master.copyFileFromContainer(Paths.get("/sqlite/test.db").toString(), 
new File(tmpdir + "/test.db").toPath().toString());
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<List<Object>> result = new ArrayList<>();
+        try (Connection connection = 
DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "/test.db", "", "")) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+            Assertions.assertIterableEquals(TEST_DATASET, result);
+        }
+    }
+
+    @AfterEach
+    public void closeResource() throws SQLException, IOException {
+        // remove the temp test.db
+        Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+    }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws 
IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", 
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd 
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+
+        Container.ExecResult mkdirCommands = container.execInContainer("bash", 
"-c", "mkdir -p " + "/sqlite");
+        Assertions.assertEquals(0, mkdirCommands.getExitCode());
+
+        Container.ExecResult chmodCommands = container.execInContainer("bash", 
"-c", "chmod 777 -R /sqlite");
+        Assertions.assertEquals(0, chmodCommands.getExitCode());
+
+        try {
+            initTestDb();
+            // copy db file to container, dist file path in container is 
/tmp/seatunnel/data/test.db
+            container.copyFileToContainer(MountableFile.forHostPath(tmpdir + 
"/test.db"), "/sqlite/test.db");
+            container.execInContainer("bash", "-c", "chmod 777 
/sqlite/test.db");
+        } catch (Exception e) {
+            log.error("init test.db and copy test.db to container error", e);
+            Files.deleteIfExists(new File(tmpdir + "/test.db").toPath());
+        }
+    }
+
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
new file mode 100644
index 000000000..aeef3d5fa
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/sqlite_init.conf
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source_table = """ CREATE TABLE source ( `id` INTEGER PRIMARY KEY 
AUTOINCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, 
`name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `timestamp`
+)
+VALUES
+       (
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               'a',
+               'a',
+               'a',
+               'a',
+               'a',
+               '{}',
+               '2022-09-22',
+               '2022-09-22 15:07:44',
+       '2022-09-22 15:07:55'
+       )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+       count( 1 )
+FROM
+       ( SELECT * FROM type_source_table UNION ALL SELECT * FROM 
type_sink_table ) a
+GROUP BY
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `timestamp`
+"""
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
new file mode 100644
index 000000000..d67a5677a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    jdbc {
+        driver = org.sqlite.JDBC
+        url = "jdbc:sqlite:/sqlite/test.db"
+        user = ""
+        password = ""
+        query = "select age, name from source"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    jdbc {
+        driver = org.sqlite.JDBC
+        url = "jdbc:sqlite:/sqlite/test.db"
+        user = ""
+        password = ""
+        query = "insert into sink(age, name) values(?, ?)"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of sink plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
new file mode 100644
index 000000000..68525fc8c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink_datatype.conf
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:sqlite:/sqlite/test.db"
+        driver = org.sqlite.JDBC
+        user = ""
+        password = ""
+        query = """ SELECT
+                `binary`,
+                `blob`,
+                `long_varbinary`,
+                `longblob`,
+                `tinyblob`,
+                `varbinary`,
+                `tinyint`,
+                `tinyint_unsigned`,
+                `smallint`,
+                `smallint_unsigned`,
+                `mediumint`,
+                `mediumint_unsigned`,
+                `int`,
+                `int_unsigned`,
+                `integer`,
+                `integer_unsigned`,
+                `bigint`,
+                `bigint_unsigned`,
+                `numeric`,
+                `decimal`,
+                `float`,
+                `double`,
+                `double_precision`,
+                `longtext`,
+                `mediumtext`,
+                `text`,
+                `tinytext`,
+                `varchar`,
+                `json`,
+                `date`,
+                `datetime`,
+                `timestamp`
+
+                FROM `type_source_table`
+                """
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:sqlite:/sqlite/test.db"
+        driver = org.sqlite.JDBC
+        user = ""
+        password = ""
+        query = """ INSERT INTO `type_sink_table` (
+                       `binary`,
+                       `blob`,
+                       `long_varbinary`,
+                       `longblob`,
+                       `tinyblob`,
+                       `varbinary`,
+                       `tinyint`,
+                       `tinyint_unsigned`,
+                       `smallint`,
+                       `smallint_unsigned`,
+                       `mediumint`,
+                       `mediumint_unsigned`,
+                       `int`,
+                       `int_unsigned`,
+                       `integer`,
+                       `integer_unsigned`,
+                       `bigint`,
+                       `bigint_unsigned`,
+                       `numeric`,
+                       `decimal`,
+                       `float`,
+                       `double`,
+                       `double_precision`,
+                       `longtext`,
+                       `mediumtext`,
+                       `text`,
+                       `tinytext`,
+                       `varchar`,
+                       `json`,
+                       `date`,
+                       `datetime`,
+                       `timestamp`
+                 )
+                 VALUES
+                       (
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                           ?
+                       )"""
+    }
+}
\ No newline at end of file

Reply via email to