This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 786bd159 [FLINK-32714] Add dialect for OceanBase database. This closes 
#72
786bd159 is described below

commit 786bd15951887a98f3a4962ba95e4e99b7214d7a
Author: He Wang <wanghe...@qq.com>
AuthorDate: Fri Mar 1 16:21:17 2024 +0800

    [FLINK-32714] Add dialect for OceanBase database. This closes #72
---
 docs/content.zh/docs/connectors/table/jdbc.md      |  83 +++++++++++--
 docs/content/docs/connectors/table/jdbc.md         |  70 ++++++++++-
 flink-connector-jdbc/pom.xml                       |   8 ++
 .../flink/connector/jdbc/catalog/JdbcCatalog.java  |  17 ++-
 .../connector/jdbc/catalog/JdbcCatalogUtils.java   |   5 +-
 .../jdbc/catalog/factory/JdbcCatalogFactory.java   |   5 +-
 .../catalog/factory/JdbcCatalogFactoryOptions.java |   3 +
 .../oceanbase/dialect/OceanBaseDialect.java        | 121 +++++++++++++++++++
 .../oceanbase/dialect/OceanBaseDialectFactory.java |  46 +++++++
 .../oceanbase/dialect/OceanBaseRowConverter.java   | 134 +++++++++++++++++++++
 .../connector/jdbc/dialect/JdbcDialectFactory.java |  15 +++
 .../connector/jdbc/dialect/JdbcDialectLoader.java  |   9 +-
 .../options/InternalJdbcConnectionOptions.java     |   9 +-
 .../connector/jdbc/table/JdbcConnectorOptions.java |   6 +
 .../jdbc/table/JdbcDynamicTableFactory.java        |  24 +++-
 ...flink.connector.jdbc.dialect.JdbcDialectFactory |   1 +
 .../oceanbase/OceanBaseMysqlTestBase.java          |  42 +++++++
 .../oceanbase/OceanBaseOracleTestBase.java         |  44 +++++++
 .../oceanbase/dialect/OceanBaseDialectTest.java    |  52 ++++++++
 .../dialect/OceanBaseMysqlDialectTypeTest.java     |  77 ++++++++++++
 .../dialect/OceanBaseOracleDialectTypeTest.java    |  71 +++++++++++
 .../OceanBaseMySqlDynamicTableSinkITCase.java      |  90 ++++++++++++++
 .../OceanBaseMySqlDynamicTableSourceITCase.java    |  74 ++++++++++++
 .../OceanBaseOracleDynamicTableSinkITCase.java     | 121 +++++++++++++++++++
 .../OceanBaseOracleDynamicTableSourceITCase.java   |  91 ++++++++++++++
 .../oceanbase/table/OceanBaseTableRow.java         |  48 ++++++++
 .../catalog/factory/JdbcCatalogFactoryTest.java    |   3 +-
 .../jdbc/dialect/JdbcDialectTypeTest.java          |   4 +-
 .../jdbc/table/JdbcDynamicTableSinkITCase.java     |   9 +-
 .../connector/jdbc/table/JdbcOutputFormatTest.java |  35 ++++++
 .../databases/oceanbase/OceanBaseContainer.java    |  74 ++++++++++++
 .../databases/oceanbase/OceanBaseDatabase.java     |  72 +++++++++++
 .../databases/oceanbase/OceanBaseImages.java       |  27 +++++
 .../databases/oceanbase/OceanBaseMetadata.java     |  85 +++++++++++++
 .../databases/oceanbase/OceanBaseTestDatabase.java |  25 ++++
 35 files changed, 1567 insertions(+), 33 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/jdbc.md 
b/docs/content.zh/docs/connectors/table/jdbc.md
index 20447e16..cf4e6384 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -58,7 +58,7 @@ JDBC 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref "
 | CrateDB    | `io.crate`                 | `crate-jdbc`           | 
[下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/)                       
                                            |
 | Db2        | `com.ibm.db2.jcc`          | `db2jcc`               | 
[下载](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows)
                           |
 | Trino      | `io.trino`                 | `trino-jdbc`           | 
[下载](https://repo1.maven.org/maven2/io/trino/trino-jdbc/)                       
                                            |
-
+| OceanBase  | `com.oceanbase`            | `oceanbase-client`     | 
[下载](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/)            
                                            |
 
 当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref 
"docs/dev/configuration/overview" >}})了解在集群上执行时如何连接它们。
 
@@ -141,6 +141,13 @@ ON myTopic.key = MyUserTable.id;
       <td>String</td>
       <td>用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。</td>
     </tr>
+    <tr>
+      <td><h5>compatible-mode</h5></td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>数据库的兼容模式。</td>
+    </tr>
     <tr>
       <td><h5>username</h5></td>
       <td>可选</td>
@@ -654,7 +661,7 @@ SELECT * FROM `custom_schema.test_table2`;
 
 数据类型映射
 ----------------
-Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、CrateDB, Derby、Db2、 
SQL Server 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 
Flink 中定义 JDBC 表更加简单。
+Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、CrateDB, Derby、Db2、 
SQL Server、OceanBase 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 
数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。
 
 <table class="table table-bordered">
     <thead>
@@ -666,6 +673,8 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
         <th class="text-left"><a 
href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16";>SQL
 Server type</a></th>
         <th class="text-left"><a 
href="https://www.ibm.com/docs/en/db2-for-zos/12?topic=columns-data-types";>Db2</a></th>
         <th class="text-left"><a 
href="https://trino.io/docs/current/language/types.html";>Trino type</a></th>
+        <th class="text-left"><a 
href="https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000222199";>OceanBase
 MySQL mode type</a></th>
+        <th class="text-left"><a 
href="https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000222012";>OceanBase
 Oracle mode type</a></th>
         <th class="text-left"><a href="{{< ref "docs/dev/table/types" 
>}}">Flink SQL type</a></th>
       </tr>
     </thead>
@@ -679,6 +688,8 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td></td>
       <td><code>TINYINT</code></td>
       <td><code>TINYINT</code></td>
+      <td></td>
+      <td><code>TINYINT</code></td>
     </tr>
     <tr>
       <td>
@@ -696,6 +707,11 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td><code>SMALLINT</code></td>
       <td><code>SMALLINT</code></td>
       <td><code>SMALLINT</code></td>
+      <td>
+        <code>SMALLINT</code><br>
+        <code>TINYINT UNSIGNED</code></td>
+      <td></td>
+      <td><code>SMALLINT</code></td>
     </tr>
     <tr>
       <td>
@@ -711,6 +727,12 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
         <code>INT</code></td>
       <td><code>INT</code></td>
       <td><code>INTEGER</code></td>
+      <td><code>INTEGER</code></td>
+      <td>
+        <code>INT</code><br>
+        <code>MEDIUMINT</code><br>
+        <code>SMALLINT UNSIGNED</code></td>
+      <td></td>
       <td><code>INT</code></td>
     </tr>
     <tr>
@@ -727,6 +749,10 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td><code>BIGINT</code></td>
       <td></td>
       <td><code>BIGINT</code></td>
+      <td>
+        <code>BIGINT</code><br>
+        <code>INT UNSIGNED</code></td>
+      <td></td>
       <td><code>BIGINT</code></td>
     </tr>
    <tr>
@@ -734,17 +760,12 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td></td>
       <td></td>
       <td></td>
+      <td></td> 
       <td></td>
       <td></td>
-      <td><code>DECIMAL(20, 0)</code></td>
-    </tr>
-    <tr>
-      <td><code>BIGINT</code></td>
-      <td></td>
-      <td><code>BIGINT</code></td>
+      <td><code>BIGINT UNSIGNED</code></td>
       <td></td>
-      <td><code>BIGINT</code></td>
-      <td><code>BIGINT</code></td>
+      <td><code>DECIMAL(20, 0)</code></td>
     </tr>
     <tr>
       <td><code>FLOAT</code></td>
@@ -760,6 +781,9 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td><code>REAL</code></td>
       <td><code>FLOAT</code></td>
       <td><code>FLOAT</code></td>
+      <td>
+        <code>BINARY_FLOAT</code></td>
+      <td><code>FLOAT</code></td>
     </tr>
     <tr>
       <td>
@@ -775,6 +799,9 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td><code>FLOAT</code></td>
       <td><code>DOUBLE</code></td>
       <td><code>DOUBLE</code></td>
+      <td><code>DOUBLE</code></td>
+      <td><code>BINARY_DOUBLE</code></td>
+      <td><code>DOUBLE</code></td>
     </tr>
     <tr>
       <td>
@@ -796,6 +823,12 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
         <code>DECIMAL(p, s)</code>
       </td>
       <td><code>DECIMAL(p, s)</code></td>
+      <td>
+        <code>NUMERIC(p, s)</code><br>
+        <code>DECIMAL(p, s)</code></td>
+      <td>
+        <code>FLOAT(s)</code><br>
+        <code>NUMBER(p, s)</code></td>
       <td><code>DECIMAL(p, s)</code></td>
     </tr>
     <tr>
@@ -807,6 +840,11 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td><code>BOOLEAN</code></td>
       <td><code>BIT</code></td>
       <td><code>BOOLEAN</code></td>
+      <td></td>
+      <td>
+        <code>BOOLEAN</code><br>
+        <code>TINYINT(1)</code></td>
+      <td></td>
       <td><code>BOOLEAN</code></td>
     </tr>
     <tr>
@@ -817,6 +855,9 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
+      <td><code>DATE</code></td>
+      <td><code>DATE</code></td>
+      <td><code>DATE</code></td>
     </tr>
     <tr>
       <td><code>TIME [(p)]</code></td>
@@ -826,6 +867,8 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td><code>TIME(0)</code></td>
       <td><code>TIME</code></td>
       <td><code>TIME_WITHOUT_TIME_ZONE</code></td>
+      <td><code>TIME [(p)]</code></td>
+      <td><code>DATE</code></td>
       <td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
     </tr>
     <tr>
@@ -839,6 +882,8 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       </td>
       <td><code>TIMESTAMP [(p)]</code></td>
       <td><code>TIMESTAMP_WITHOUT_TIME_ZONE</code></td>
+      <td><code>DATETIME [(p)]</code></td>
+      <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
       <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
     </tr>
     <tr>
@@ -878,6 +923,15 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
         <code>CHAR</code>
         <code>VARCHAR</code>
       </td>
+      <td>
+        <code>CHAR(n)</code><br>
+        <code>VARCHAR(n)</code><br>
+        <code>TEXT</code></td>
+      <td>
+        <code>CHAR(n)</code><br>
+        <code>NCHAR(n)</code><br>
+        <code>VARCHAR2(n)</code><br>
+        <code>CLOB</code></td>
       <td><code>STRING</code></td>
     </tr>
     <tr>
@@ -896,6 +950,13 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       </td>
       <td></td>
       <td><code>VARBINARY</code></td>
+      <td>
+        <code>BINARY</code><br>
+        <code>VARBINARY</code><br>
+        <code>BLOB</code></td>
+      <td>
+        <code>RAW(s)</code><br>
+        <code>BLOB</code></td>
       <td><code>BYTES</code></td>
     </tr>
     <tr>
@@ -906,6 +967,8 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
       <td></td>
       <td></td>
       <td><code>ARRAY</code></td>
+      <td></td>
+      <td></td>
       <td><code>ARRAY</code></td>
     </tr>
     </tbody>
diff --git a/docs/content/docs/connectors/table/jdbc.md 
b/docs/content/docs/connectors/table/jdbc.md
index c1c6936a..056f2ab5 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -55,6 +55,7 @@ A driver dependency is also required to connect to a 
specified database. Here ar
 | CrateDB    | `io.crate`                 | `crate-jdbc`           | 
[Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/)                 
                                                  |
 | Db2        | `com.ibm.db2.jcc`          | `db2jcc`               | 
[Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows)
                           | 
 | Trino      | `io.trino`                 | `trino-jdbc`           | 
[Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/)                 
                                                  |
+| OceanBase  | `com.oceanbase`            | `oceanbase-client`     | 
[Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/)      
                                                  |
 
 
 JDBC connector and drivers are not part of Flink's binary distribution. See 
how to link with them for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
@@ -139,6 +140,14 @@ Connector Options
       <td>String</td>
       <td>The class name of the JDBC driver to use to connect to this URL, if 
not set, it will automatically be derived from the URL.</td>
     </tr>
+    <tr>
+      <td><h5>compatible-mode</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The compatible mode of database.</td>
+    </tr>
     <tr>
       <td><h5>username</h5></td>
       <td>optional</td>
@@ -647,7 +656,7 @@ SELECT * FROM `custom_schema.test_table2`;
 
 Data Type Mapping
 ----------------
-Flink supports connect to several databases which uses dialect like MySQL, 
Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2. The Derby dialect usually 
used for testing purpose. The field data type mappings from relational 
databases data types to Flink SQL data types are listed in the following table, 
the mapping table can help define JDBC table in Flink easily.
+Flink supports connect to several databases which uses dialect like MySQL, 
Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby 
dialect usually used for testing purpose. The field data type mappings from 
relational databases data types to Flink SQL data types are listed in the 
following table, the mapping table can help define JDBC table in Flink easily.
 
 <table class="table table-bordered">
     <thead>
@@ -659,6 +668,8 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
         <th class="text-left"><a 
href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16";>SQL
 Server type</a></th>
         <th class="text-left"><a 
href="https://www.ibm.com/docs/en/db2-for-zos/12?topic=columns-data-types";>Db2</a></th>
         <th class="text-left"><a 
href="https://trino.io/docs/current/language/types.html";>Trino type</a></th>
+        <th class="text-left"><a 
href="https://en.oceanbase.com/docs/common-oceanbase-database-10000000001106898";>OceanBase
 MySQL mode type</a></th>
+        <th class="text-left"><a 
href="https://en.oceanbase.com/docs/common-oceanbase-database-10000000001107076";>OceanBase
 Oracle mode type</a></th>
         <th class="text-left"><a href="{{< ref "docs/dev/table/types" 
>}}">Flink SQL type</a></th>
       </tr>
     </thead>
@@ -672,6 +683,8 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td></td>
       <td><code>TINYINT</code></td>
       <td><code>TINYINT</code></td>
+      <td></td>
+      <td><code>TINYINT</code></td>
     </tr>
     <tr>
       <td>
@@ -689,6 +702,10 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>SMALLINT</code></td>
       <td><code>SMALLINT</code></td>
       <td><code>SMALLINT</code></td>
+      <td>
+        <code>SMALLINT</code><br>
+        <code>TINYINT UNSIGNED</code></td>
+      <td></td>
       <td><code>SMALLINT</code></td>
     </tr>
     <tr>
@@ -706,6 +723,11 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>INT</code></td>
       <td><code>INTEGER</code></td>
       <td><code>INTEGER</code></td>
+      <td>
+        <code>INT</code><br>
+        <code>MEDIUMINT</code><br>
+        <code>SMALLINT UNSIGNED</code></td>
+      <td></td>
       <td><code>INT</code></td>
     </tr>
     <tr>
@@ -722,6 +744,10 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>BIGINT</code></td>
       <td></td>
       <td><code>BIGINT</code></td>
+      <td>
+        <code>BIGINT</code><br>
+        <code>INT UNSIGNED</code></td>
+      <td></td>
       <td><code>BIGINT</code></td>
     </tr>
    <tr>
@@ -732,6 +758,8 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td></td> 
       <td></td>
       <td></td>
+      <td><code>BIGINT UNSIGNED</code></td>
+      <td></td>
       <td><code>DECIMAL(20, 0)</code></td>
     </tr>
     <tr>
@@ -748,6 +776,9 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>REAL</code></td>
       <td><code>FLOAT</code></td>
       <td><code>FLOAT</code></td>
+      <td>
+        <code>BINARY_FLOAT</code></td>
+      <td><code>FLOAT</code></td>
     </tr>
     <tr>
       <td>
@@ -763,6 +794,9 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>FLOAT</code></td>
       <td><code>DOUBLE</code></td>
       <td><code>DOUBLE</code></td>
+      <td><code>DOUBLE</code></td>
+      <td><code>BINARY_DOUBLE</code></td>
+      <td><code>DOUBLE</code></td>
     </tr>
     <tr>
       <td>
@@ -784,6 +818,12 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
         <code>DECIMAL(p, s)</code>
       </td>
       <td><code>DECIMAL(p, s)</code></td>
+      <td>
+        <code>NUMERIC(p, s)</code><br>
+        <code>DECIMAL(p, s)</code></td>
+      <td>
+        <code>FLOAT(s)</code><br>
+        <code>NUMBER(p, s)</code></td>
       <td><code>DECIMAL(p, s)</code></td>
     </tr>
     <tr>
@@ -796,6 +836,10 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>BIT</code></td>
       <td><code>BOOLEAN</code></td>
       <td></td>
+      <td>
+        <code>BOOLEAN</code><br>
+        <code>TINYINT(1)</code></td>
+      <td></td>
       <td><code>BOOLEAN</code></td>
     </tr>
     <tr>
@@ -807,6 +851,8 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
+      <td><code>DATE</code></td>
+      <td><code>DATE</code></td>
     </tr>
     <tr>
       <td><code>TIME [(p)]</code></td>
@@ -816,6 +862,8 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td><code>TIME(0)</code></td>
       <td><code>TIME</code></td>
       <td><code>TIME_WITHOUT_TIME_ZONE</code></td>
+      <td><code>TIME [(p)]</code></td>
+      <td><code>DATE</code></td>
       <td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
     </tr>
     <tr>
@@ -829,6 +877,8 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       </td>
       <td><code>TIMESTAMP [(p)]</code></td>
       <td><code>TIMESTAMP_WITHOUT_TIME_ZONE</code></td>
+      <td><code>DATETIME [(p)]</code></td>
+      <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
       <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
     </tr>
     <tr>
@@ -868,6 +918,15 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
         <code>CHAR</code>
         <code>VARCHAR</code>
       </td>
+      <td>
+        <code>CHAR(n)</code><br>
+        <code>VARCHAR(n)</code><br>
+        <code>TEXT</code></td>
+      <td>
+        <code>CHAR(n)</code><br>
+        <code>NCHAR(n)</code><br>
+        <code>VARCHAR2(n)</code><br>
+        <code>CLOB</code></td>
       <td><code>STRING</code></td>
     </tr>
     <tr>
@@ -886,6 +945,13 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       </td>
       <td></td>
       <td><code>VARBINARY</code></td>
+      <td>
+        <code>BINARY</code><br>
+        <code>VARBINARY</code><br>
+        <code>BLOB</code></td>
+      <td>
+        <code>RAW(s)</code><br>
+        <code>BLOB</code></td>
       <td><code>BYTES</code></td>
     </tr>
     <tr>
@@ -896,6 +962,8 @@ Flink supports connect to several databases which uses 
dialect like MySQL, Oracl
       <td></td>
       <td></td>
       <td><code>ARRAY</code></td>
+      <td></td>
+      <td></td>
       <td><code>ARRAY</code></td>
     </tr>
     </tbody>
diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index 11cbacb5..f85861fe 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -242,6 +242,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- OceanBase tests -->
+               <dependency>
+                       <groupId>com.oceanbase</groupId>
+                       <artifactId>oceanbase-client</artifactId>
+                       <version>2.4.8</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- ArchUit test dependencies -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
index 8bab2be9..3f6e28fa 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
@@ -40,7 +40,7 @@ public class JdbcCatalog extends AbstractJdbcCatalog {
      * Creates a JdbcCatalog.
      *
      * @deprecated please use {@link JdbcCatalog#JdbcCatalog(ClassLoader, 
String, String, String,
-     *     String, String)} instead.
+     *     String, String, String)} instead.
      */
     public JdbcCatalog(
             String catalogName,
@@ -54,7 +54,8 @@ public class JdbcCatalog extends AbstractJdbcCatalog {
                 defaultDatabase,
                 username,
                 pwd,
-                baseUrl);
+                baseUrl,
+                null);
     }
 
     /**
@@ -66,6 +67,7 @@ public class JdbcCatalog extends AbstractJdbcCatalog {
      * @param username the username used to connect the database
      * @param pwd the password used to connect the database
      * @param baseUrl the base URL of the database, e.g. 
jdbc:mysql://localhost:3306
+     * @param compatibleMode the compatible mode of the database
      */
     public JdbcCatalog(
             ClassLoader userClassLoader,
@@ -73,12 +75,19 @@ public class JdbcCatalog extends AbstractJdbcCatalog {
             String defaultDatabase,
             String username,
             String pwd,
-            String baseUrl) {
+            String baseUrl,
+            String compatibleMode) {
         super(userClassLoader, catalogName, defaultDatabase, username, pwd, 
baseUrl);
 
         internal =
                 JdbcCatalogUtils.createCatalog(
-                        userClassLoader, catalogName, defaultDatabase, 
username, pwd, baseUrl);
+                        userClassLoader,
+                        catalogName,
+                        defaultDatabase,
+                        username,
+                        pwd,
+                        baseUrl,
+                        compatibleMode);
     }
 
     // ------ databases -----
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
index 84ac0abd..09d4d924 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
@@ -48,8 +48,9 @@ public class JdbcCatalogUtils {
             String defaultDatabase,
             String username,
             String pwd,
-            String baseUrl) {
-        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader);
+            String baseUrl,
+            String compatibleMode) {
+        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, compatibleMode, 
userClassLoader);
 
         if (dialect instanceof PostgresDialect) {
             return new PostgresCatalog(
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
index 9677744d..15225c4f 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.BASE_URL;
+import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.COMPATIBLE_MODE;
 import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.DEFAULT_DATABASE;
 import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.PASSWORD;
 import static 
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.USERNAME;
@@ -60,6 +61,7 @@ public class JdbcCatalogFactory implements CatalogFactory {
     public Set<ConfigOption<?>> optionalOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
         options.add(PROPERTY_VERSION);
+        options.add(COMPATIBLE_MODE);
         return options;
     }
 
@@ -75,6 +77,7 @@ public class JdbcCatalogFactory implements CatalogFactory {
                 helper.getOptions().get(DEFAULT_DATABASE),
                 helper.getOptions().get(USERNAME),
                 helper.getOptions().get(PASSWORD),
-                helper.getOptions().get(BASE_URL));
+                helper.getOptions().get(BASE_URL),
+                helper.getOptions().get(COMPATIBLE_MODE));
     }
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
index 0cdde626..ab1ce130 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryOptions.java
@@ -44,5 +44,8 @@ public class JdbcCatalogFactoryOptions {
     public static final ConfigOption<String> BASE_URL =
             ConfigOptions.key("base-url").stringType().noDefaultValue();
 
+    public static final ConfigOption<String> COMPATIBLE_MODE =
+            ConfigOptions.key("compatible-mode").stringType().noDefaultValue();
+
     private JdbcCatalogFactoryOptions() {}
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java
new file mode 100644
index 00000000..885d3392
--- /dev/null
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialect.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
+import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nonnull;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** JDBC dialect for OceanBase. */
+@Internal
+public class OceanBaseDialect extends AbstractDialect {
+
+    private static final long serialVersionUID = 1L;
+
+    private final AbstractDialect dialect;
+
+    public OceanBaseDialect(@Nonnull String compatibleMode) {
+        switch (compatibleMode.toLowerCase()) {
+            case "mysql":
+                this.dialect = new MySqlDialect();
+                break;
+            case "oracle":
+                this.dialect = new OracleDialect();
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported compatible mode: " + compatibleMode);
+        }
+    }
+
+    @Override
+    public String dialectName() {
+        return "OceanBase";
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.oceanbase.jdbc.Driver");
+    }
+
+    @Override
+    public Set<LogicalTypeRoot> supportedTypes() {
+        return EnumSet.of(
+                LogicalTypeRoot.CHAR,
+                LogicalTypeRoot.VARCHAR,
+                LogicalTypeRoot.BOOLEAN,
+                LogicalTypeRoot.VARBINARY,
+                LogicalTypeRoot.DECIMAL,
+                LogicalTypeRoot.TINYINT,
+                LogicalTypeRoot.SMALLINT,
+                LogicalTypeRoot.INTEGER,
+                LogicalTypeRoot.BIGINT,
+                LogicalTypeRoot.FLOAT,
+                LogicalTypeRoot.DOUBLE,
+                LogicalTypeRoot.DATE,
+                LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new OceanBaseRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        return dialect.getLimitClause(limit);
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return dialect.quoteIdentifier(identifier);
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] conditionFields) {
+        return dialect.getUpsertStatement(tableName, fieldNames, 
conditionFields);
+    }
+
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return dialect.timestampPrecisionRange();
+    }
+
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return dialect.decimalPrecisionRange();
+    }
+
+    @Override
+    public String appendDefaultUrlProperties(String url) {
+        return dialect.appendDefaultUrlProperties(url);
+    }
+}
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java
new file mode 100644
index 00000000..46152379
--- /dev/null
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+import javax.annotation.Nonnull;
+
+/** Factory for {@link OceanBaseDialect}. */
+@Internal
+public class OceanBaseDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:oceanbase:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        throw new UnsupportedOperationException(
+                "Can't create JdbcDialect without compatible mode for 
OceanBase");
+    }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode) {
+        return new OceanBaseDialect(compatibleMode);
+    }
+}
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java
new file mode 100644
index 00000000..95981217
--- /dev/null
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseRowConverter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Blob;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
+ * OceanBase.
+ */
+@Internal
+public class OceanBaseRowConverter extends AbstractJdbcRowConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String converterName() {
+        return "OceanBase";
+    }
+
+    public OceanBaseRowConverter(RowType rowType) {
+        super(rowType);
+    }
+
+    public JdbcDeserializationConverter createInternalConverter(LogicalType 
type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+                return val ->
+                        val instanceof Number
+                                ? ((Number) val).intValue() == 1
+                                : Boolean.parseBoolean(val.toString());
+            case FLOAT:
+                return val -> val instanceof Number ? ((Number) 
val).floatValue() : val;
+            case DOUBLE:
+                return val -> val instanceof Number ? ((Number) 
val).doubleValue() : val;
+            case TINYINT:
+                return val -> val instanceof Number ? ((Number) 
val).byteValue() : val;
+            case SMALLINT:
+                return val -> val instanceof Number ? ((Number) 
val).shortValue() : val;
+            case INTEGER:
+                return val -> val instanceof Number ? ((Number) 
val).intValue() : val;
+            case BIGINT:
+                return val -> val instanceof Number ? ((Number) 
val).longValue() : val;
+            case DECIMAL:
+                final int precision = ((DecimalType) type).getPrecision();
+                final int scale = ((DecimalType) type).getScale();
+                // using decimal(20, 0) to support db type bigint unsigned, 
user should define
+                // decimal(20, 0) in SQL,
+                // but other precision like decimal(30, 0) can work too from 
lenient consideration.
+                return val ->
+                        val instanceof BigInteger
+                                ? DecimalData.fromBigDecimal(
+                                        new BigDecimal((BigInteger) val, 0), 
precision, scale)
+                                : val instanceof BigDecimal
+                                        ? DecimalData.fromBigDecimal(
+                                                (BigDecimal) val, precision, 
scale)
+                                        : DecimalData.fromBigDecimal(
+                                                new 
BigDecimal(val.toString()), precision, scale);
+            case DATE:
+                return val ->
+                        val instanceof Date
+                                ? (int) (((Date) 
val).toLocalDate().toEpochDay())
+                                : val instanceof Timestamp
+                                        ? (int)
+                                                (((Timestamp) val)
+                                                        .toLocalDateTime()
+                                                        .toLocalDate()
+                                                        .toEpochDay())
+                                        : val;
+            case TIME_WITHOUT_TIME_ZONE:
+                return val ->
+                        val instanceof Time
+                                ? (int) (((Time) 
val).toLocalTime().toNanoOfDay() / 1_000_000L)
+                                : val instanceof Timestamp
+                                        ? (int)
+                                                (((Timestamp) val)
+                                                                
.toLocalDateTime()
+                                                                .toLocalTime()
+                                                                .toNanoOfDay()
+                                                        / 1_000_000L)
+                                        : val;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return val ->
+                        val instanceof LocalDateTime
+                                ? 
TimestampData.fromLocalDateTime((LocalDateTime) val)
+                                : val instanceof Timestamp
+                                        ? 
TimestampData.fromTimestamp((Timestamp) val)
+                                        : val;
+            case CHAR:
+            case VARCHAR:
+                return val -> StringData.fromString(val.toString());
+            case BINARY:
+            case VARBINARY:
+            case RAW:
+                return val ->
+                        val instanceof Blob
+                                ? ((Blob) val).getBytes(1, (int) ((Blob) 
val).length())
+                                : val instanceof byte[] ? val : 
val.toString().getBytes();
+            default:
+                return super.createInternalConverter(type);
+        }
+    }
+}
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
index 682b61cd..84f4bd78 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.jdbc.dialect;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.StringUtils;
 
 /**
  * A factory to create a specific {@link JdbcDialect}. This factory is used 
with Java's Service
@@ -46,4 +47,18 @@ public interface JdbcDialectFactory {
 
     /** @return Creates a new instance of the {@link JdbcDialect}. */
     JdbcDialect create();
+
+    /**
+     * Creates a new instance of the {@link JdbcDialect} based on compatible 
mode.
+     *
+     * @param compatibleMode the compatible mode of database
+     * @return a new instance of {@link JdbcDialect}
+     */
+    default JdbcDialect create(String compatibleMode) {
+        if (StringUtils.isNullOrWhitespaceOnly(compatibleMode)) {
+            return create();
+        }
+        throw new UnsupportedOperationException(
+                "Not supported option 'compatible-mode' with value: " + 
compatibleMode);
+    }
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
index 6dfad9b0..4e9b9ba0 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
@@ -37,16 +37,21 @@ public final class JdbcDialectLoader {
 
     private JdbcDialectLoader() {}
 
+    public static JdbcDialect load(String url, ClassLoader classLoader) {
+        return load(url, null, classLoader);
+    }
+
     /**
      * Loads the unique JDBC Dialect that can handle the given database url.
      *
      * @param url A database URL.
+     * @param compatibleMode the compatible mode of database
      * @param classLoader the classloader used to load the factory
      * @throws IllegalStateException if the loader cannot find exactly one 
dialect that can
      *     unambiguously process the given database URL.
      * @return The loaded dialect.
      */
-    public static JdbcDialect load(String url, ClassLoader classLoader) {
+    public static JdbcDialect load(String url, String compatibleMode, 
ClassLoader classLoader) {
         List<JdbcDialectFactory> foundFactories = 
discoverFactories(classLoader);
 
         if (foundFactories.isEmpty()) {
@@ -87,7 +92,7 @@ public final class JdbcDialectLoader {
                                     .collect(Collectors.joining("\n"))));
         }
 
-        return matchingFactories.get(0).create();
+        return matchingFactories.get(0).create(compatibleMode);
     }
 
     private static List<JdbcDialectFactory> discoverFactories(ClassLoader 
classLoader) {
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
index 6d9aeca5..8158cd8c 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/InternalJdbcConnectionOptions.java
@@ -109,6 +109,7 @@ public class InternalJdbcConnectionOptions extends 
JdbcConnectionOptions {
         private String dbURL;
         private String tableName;
         private String driverName;
+        private String compatibleMode;
         private String username;
         private String password;
         private JdbcDialect dialect;
@@ -161,6 +162,12 @@ public class InternalJdbcConnectionOptions extends 
JdbcConnectionOptions {
             return this;
         }
 
+        /** optional, compatible mode. */
+        public Builder setCompatibleMode(String compatibleMode) {
+            this.compatibleMode = compatibleMode;
+            return this;
+        }
+
         /** required, JDBC DB url. */
         public Builder setDBUrl(String dbURL) {
             this.dbURL = dbURL;
@@ -188,7 +195,7 @@ public class InternalJdbcConnectionOptions extends 
JdbcConnectionOptions {
                 if (classLoader == null) {
                     classLoader = 
Thread.currentThread().getContextClassLoader();
                 }
-                this.dialect = JdbcDialectLoader.load(dbURL, classLoader);
+                this.dialect = JdbcDialectLoader.load(dbURL, compatibleMode, 
classLoader);
             }
             if (this.driverName == null) {
                 Optional<String> optional = dialect.defaultDriverName();
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
index 4e61a128..0b39df45 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java
@@ -62,6 +62,12 @@ public class JdbcConnectorOptions {
                             "The class name of the JDBC driver to use to 
connect to this URL. "
                                     + "If not set, it will automatically be 
derived from the URL.");
 
+    public static final ConfigOption<String> COMPATIBLE_MODE =
+            ConfigOptions.key("compatible-mode")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The compatible mode of database.");
+
     public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT =
             ConfigOptions.key("connection.max-retry-timeout")
                     .durationType()
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 3434f603..3eecfa15 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.COMPATIBLE_MODE;
 import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER;
 import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
 import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
@@ -89,7 +90,10 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         helper.validate();
         validateConfigOptions(config, context.getClassLoader());
         validateDataTypeWithJdbcDialect(
-                context.getPhysicalRowDataType(), config.get(URL), 
context.getClassLoader());
+                context.getPhysicalRowDataType(),
+                config.get(URL),
+                config.get(COMPATIBLE_MODE),
+                context.getClassLoader());
         InternalJdbcConnectionOptions jdbcOptions =
                 getJdbcOptions(config, context.getClassLoader());
 
@@ -112,7 +116,10 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         helper.validate();
         validateConfigOptions(config, context.getClassLoader());
         validateDataTypeWithJdbcDialect(
-                context.getPhysicalRowDataType(), config.get(URL), 
context.getClassLoader());
+                context.getPhysicalRowDataType(),
+                config.get(URL),
+                config.get(COMPATIBLE_MODE),
+                context.getClassLoader());
         return new JdbcDynamicTableSource(
                 getJdbcOptions(helper.getOptions(), context.getClassLoader()),
                 getJdbcReadOptions(helper.getOptions()),
@@ -122,8 +129,8 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
     }
 
     private static void validateDataTypeWithJdbcDialect(
-            DataType dataType, String url, ClassLoader classLoader) {
-        final JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader);
+            DataType dataType, String url, String compatibleMode, ClassLoader 
classLoader) {
+        final JdbcDialect dialect = JdbcDialectLoader.load(url, 
compatibleMode, classLoader);
         dialect.validate((RowType) dataType.getLogicalType());
     }
 
@@ -135,7 +142,9 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                         .setClassLoader(classLoader)
                         .setDBUrl(url)
                         .setTableName(readableConfig.get(TABLE_NAME))
-                        .setDialect(JdbcDialectLoader.load(url, classLoader))
+                        .setDialect(
+                                JdbcDialectLoader.load(
+                                        url, 
readableConfig.get(COMPATIBLE_MODE), classLoader))
                         
.setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
                         .setConnectionCheckTimeoutSeconds(
                                 (int) 
readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
@@ -143,6 +152,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
         readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
         readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+        
readableConfig.getOptional(COMPATIBLE_MODE).ifPresent(builder::setCompatibleMode);
         return builder.build();
     }
 
@@ -223,6 +233,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> optionalOptions = new HashSet<>();
         optionalOptions.add(DRIVER);
+        optionalOptions.add(COMPATIBLE_MODE);
         optionalOptions.add(USERNAME);
         optionalOptions.add(PASSWORD);
         optionalOptions.add(SCAN_PARTITION_COLUMN);
@@ -257,6 +268,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                         USERNAME,
                         PASSWORD,
                         DRIVER,
+                        COMPATIBLE_MODE,
                         SINK_BUFFER_FLUSH_MAX_ROWS,
                         SINK_BUFFER_FLUSH_INTERVAL,
                         SINK_MAX_RETRIES,
@@ -268,7 +280,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
 
     private void validateConfigOptions(ReadableConfig config, ClassLoader 
classLoader) {
         String jdbcUrl = config.get(URL);
-        JdbcDialectLoader.load(jdbcUrl, classLoader);
+        JdbcDialectLoader.load(jdbcUrl, config.get(COMPATIBLE_MODE), 
classLoader);
 
         checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
 
diff --git 
a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
 
b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
index 4032c6f3..9a3cf26c 100644
--- 
a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++ 
b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
@@ -15,6 +15,7 @@
 
 org.apache.flink.connector.jdbc.databases.derby.dialect.DerbyDialectFactory
 org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialectFactory
+org.apache.flink.connector.jdbc.databases.oceanbase.dialect.OceanBaseDialectFactory
 
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactory
 org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory
 
org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseMysqlTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseMysqlTestBase.java
new file mode 100644
index 00000000..4cac1d60
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseMysqlTestBase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase;
+
+import 
org.apache.flink.connector.jdbc.databases.oceanbase.table.OceanBaseTableRow;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import 
org.apache.flink.connector.jdbc.testutils.databases.oceanbase.OceanBaseDatabase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableField;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Base class for OceanBase Mysql mode testing. */
+@ExtendWith(OceanBaseDatabase.class)
+public interface OceanBaseMysqlTestBase extends DatabaseTest {
+
+    default TableRow tableRow(String name, TableField... fields) {
+        return new OceanBaseTableRow("mysql", name, fields);
+    }
+
+    @Override
+    default DatabaseMetadata getMetadata() {
+        return OceanBaseDatabase.getMetadata();
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseOracleTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseOracleTestBase.java
new file mode 100644
index 00000000..31aad559
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/OceanBaseOracleTestBase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase;
+
+import 
org.apache.flink.connector.jdbc.databases.oceanbase.table.OceanBaseTableRow;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import 
org.apache.flink.connector.jdbc.testutils.databases.oceanbase.OceanBaseTestDatabase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableField;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Base class for OceanBase Oracle mode testing. */
+@ExtendWith(OceanBaseTestDatabase.class)
+public interface OceanBaseOracleTestBase extends DatabaseTest {
+
+    default TableRow tableRow(String name, TableField... fields) {
+        return new OceanBaseTableRow("oracle", name, fields);
+    }
+
+    @Override
+    default DatabaseMetadata getMetadata() {
+        // OceanBase Oracle mode is only available in OceanBase Enterprise 
Edition, which
+        // does not provide docker image, so here use OceanBaseTestDatabase to 
test locally.
+        return OceanBaseTestDatabase.getMetadata();
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectTest.java
new file mode 100644
index 00000000..7bab63b7
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseDialectTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OceanBaseDialect}. */
+public class OceanBaseDialectTest {
+
+    @Test
+    void testMysqlAppendDefaultUrlProperties() {
+        OceanBaseDialect dialect = new OceanBaseDialect("mysql");
+        String jdbcUrl = "jdbc:oceanbase://localhost:2883/foo";
+
+        assertThat(dialect.appendDefaultUrlProperties(jdbcUrl))
+                .isEqualTo(jdbcUrl + "?rewriteBatchedStatements=true");
+
+        assertThat(dialect.appendDefaultUrlProperties(jdbcUrl + "?foo=bar"))
+                .isEqualTo(jdbcUrl + "?foo=bar&rewriteBatchedStatements=true");
+
+        assertThat(
+                        dialect.appendDefaultUrlProperties(
+                                jdbcUrl + 
"?foo=bar&rewriteBatchedStatements=false"))
+                .isEqualTo(jdbcUrl + 
"?foo=bar&rewriteBatchedStatements=false");
+    }
+
+    @Test
+    void testOracleAppendDefaultUrlProperties() {
+        OceanBaseDialect dialect = new OceanBaseDialect("oracle");
+        String jdbcUrl = "jdbc:oceanbase://localhost:2883/foo";
+
+        
assertThat(dialect.appendDefaultUrlProperties(jdbcUrl)).isEqualTo(jdbcUrl);
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseMysqlDialectTypeTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseMysqlDialectTypeTest.java
new file mode 100644
index 00000000..675280c4
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseMysqlDialectTypeTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The OceanBase MySql mode params for {@link JdbcDialectTypeTest}. */
+public class OceanBaseMysqlDialectTypeTest extends JdbcDialectTypeTest {
+
+    public OceanBaseMysqlDialectTypeTest() {
+        ddlFormat =
+                "CREATE TABLE T (f0 %s)"
+                        + " WITH ("
+                        + "  'connector'='jdbc',"
+                        + "  'url'='jdbc:%s:memory:test',"
+                        + "  'table-name'='myTable',"
+                        + "  'compatible-mode'='mysql'"
+                        + ")";
+    }
+
+    @Override
+    protected String testDialect() {
+        return "oceanbase";
+    }
+
+    @Override
+    protected List<TestItem> testData() {
+        return Arrays.asList(
+                createTestItem("CHAR"),
+                createTestItem("VARCHAR"),
+                createTestItem("BOOLEAN"),
+                createTestItem("TINYINT"),
+                createTestItem("SMALLINT"),
+                createTestItem("INTEGER"),
+                createTestItem("BIGINT"),
+                createTestItem("FLOAT"),
+                createTestItem("DOUBLE"),
+                createTestItem("DECIMAL(10, 4)"),
+                createTestItem("DECIMAL(38, 18)"),
+                createTestItem("DATE"),
+                createTestItem("TIME"),
+                createTestItem("TIMESTAMP(3)"),
+                createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+                createTestItem("VARBINARY"),
+
+                // Not valid data
+                createTestItem("BINARY", "The OceanBase dialect doesn't 
support type: BINARY(1)."),
+                createTestItem(
+                        "VARBINARY(10)",
+                        "The OceanBase dialect doesn't support type: 
VARBINARY(10)."),
+                createTestItem(
+                        "TIMESTAMP(9) WITHOUT TIME ZONE",
+                        "The precision of field 'f0' is out of the TIMESTAMP 
precision range [0, 6] supported by OceanBase dialect."),
+                createTestItem(
+                        "TIMESTAMP_LTZ(3)",
+                        "The OceanBase dialect doesn't support type: 
TIMESTAMP_LTZ(3)."));
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseOracleDialectTypeTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseOracleDialectTypeTest.java
new file mode 100644
index 00000000..f58d7014
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/dialect/OceanBaseOracleDialectTypeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The OceanBase Oracle mode params for {@link JdbcDialectTypeTest}. */
+public class OceanBaseOracleDialectTypeTest extends JdbcDialectTypeTest {
+
+    public OceanBaseOracleDialectTypeTest() {
+        ddlFormat =
+                "CREATE TABLE T (f0 %s)"
+                        + " WITH ("
+                        + "  'connector'='jdbc',"
+                        + "  'url'='jdbc:%s:memory:test',"
+                        + "  'table-name'='myTable',"
+                        + "  'compatible-mode'='oracle'"
+                        + ")";
+    }
+
+    @Override
+    protected String testDialect() {
+        return "oceanbase";
+    }
+
+    @Override
+    protected List<TestItem> testData() {
+        return Arrays.asList(
+                createTestItem("CHAR"),
+                createTestItem("VARCHAR"),
+                createTestItem("BOOLEAN"),
+                createTestItem("TINYINT"),
+                createTestItem("SMALLINT"),
+                createTestItem("INTEGER"),
+                createTestItem("BIGINT"),
+                createTestItem("FLOAT"),
+                createTestItem("DOUBLE"),
+                createTestItem("DECIMAL(10, 4)"),
+                createTestItem("DECIMAL(38, 18)"),
+                createTestItem("DATE"),
+                createTestItem("TIME"),
+                createTestItem("TIMESTAMP(3)"),
+                createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+                createTestItem("VARBINARY"),
+
+                // Not valid data
+                createTestItem("BINARY", "The OceanBase dialect doesn't 
support type: BINARY(1)."),
+                createTestItem(
+                        "VARBINARY(10)",
+                        "The OceanBase dialect doesn't support type: 
VARBINARY(10)."));
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
new file mode 100644
index 00000000..67f55152
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSinkITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.table;
+
+import 
org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseMysqlTestBase;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+
+import java.util.Map;
+
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+
+/** The Table Sink ITCase for OceanBase MySql mode. */
+public class OceanBaseMySqlDynamicTableSinkITCase extends 
JdbcDynamicTableSinkITCase
+        implements OceanBaseMysqlTestBase {
+
+    @Override
+    protected Map<String, String> getOptions() {
+        Map<String, String> options = super.getOptions();
+        options.put("compatible-mode", "mysql");
+        return options;
+    }
+
+    @Override
+    protected TableRow createUpsertOutputTable() {
+        return tableRow(
+                "dynamicSinkForUpsert",
+                pkField("cnt", DataTypes.BIGINT().notNull()),
+                field("lencnt", DataTypes.BIGINT().notNull()),
+                pkField("cTag", DataTypes.INT().notNull()),
+                field("ts", dbType("DATETIME(3)"), DataTypes.TIMESTAMP()));
+    }
+
+    @Override
+    protected TableRow createAppendOutputTable() {
+        return tableRow(
+                "dynamicSinkForAppend",
+                field("id", DataTypes.INT().notNull()),
+                field("num", DataTypes.BIGINT().notNull()),
+                field("ts", dbType("DATETIME(3)"), DataTypes.TIMESTAMP()));
+    }
+
+    @Override
+    protected TableRow createBatchOutputTable() {
+        return tableRow(
+                "dynamicSinkForBatch",
+                field("NAME", DataTypes.VARCHAR(20).notNull()),
+                field("SCORE", DataTypes.BIGINT().notNull()));
+    }
+
+    @Override
+    protected TableRow createRealOutputTable() {
+        return tableRow("REAL_TABLE", field("real_data", dbType("REAL"), 
DataTypes.FLOAT()));
+    }
+
+    @Override
+    protected TableRow createCheckpointOutputTable() {
+        return tableRow("checkpointTable", field("id", 
DataTypes.BIGINT().notNull()));
+    }
+
+    @Override
+    protected TableRow createUserOutputTable() {
+        return tableRow(
+                "USER_TABLE",
+                pkField("user_id", DataTypes.VARCHAR(20).notNull()),
+                field("user_name", DataTypes.VARCHAR(20).notNull()),
+                field("email", DataTypes.VARCHAR(255)),
+                field("balance", DataTypes.DECIMAL(18, 2)),
+                field("balance2", DataTypes.DECIMAL(18, 2)));
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
new file mode 100644
index 00000000..441b1715
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseMySqlDynamicTableSourceITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.table;
+
+import 
org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseMysqlTestBase;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+
+/** The Table Source ITCase for OceanBase MySql mode. */
+public class OceanBaseMySqlDynamicTableSourceITCase extends 
JdbcDynamicTableSourceITCase
+        implements OceanBaseMysqlTestBase {
+
+    @Override
+    protected TableRow createInputTable() {
+        return tableRow(
+                "jdbcDynamicTableSource",
+                pkField("id", DataTypes.BIGINT().notNull()),
+                field("decimal_col", DataTypes.DECIMAL(10, 4)),
+                field("timestamp6_col", DataTypes.TIMESTAMP(6)),
+                // other fields
+                field("real_col", dbType("REAL"), DataTypes.DOUBLE()),
+                field("double_col", DataTypes.DOUBLE()),
+                field("time_col", dbType("TIME"), DataTypes.TIME()),
+                field("timestamp9_col", DataTypes.TIMESTAMP(6)));
+    }
+
+    protected List<Row> getTestData() {
+        return Arrays.asList(
+                Row.of(
+                        1L,
+                        BigDecimal.valueOf(100.1234),
+                        LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+                        1.175E-37D,
+                        1.79769E308D,
+                        LocalTime.parse("15:35"),
+                        LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                Row.of(
+                        2L,
+                        BigDecimal.valueOf(101.1234),
+                        LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+                        -1.175E-37D,
+                        -1.79769E308,
+                        LocalTime.parse("15:36:01"),
+                        LocalDateTime.parse("2020-01-01T15:36:01.123456")));
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
new file mode 100644
index 00000000..e9e06fa4
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSinkITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.table;
+
+import 
org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseOracleTestBase;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSinkITCase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Disabled;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+
+/** The Table Sink ITCase for OceanBase Oracle mode. */
+@Disabled
+public class OceanBaseOracleDynamicTableSinkITCase extends 
JdbcDynamicTableSinkITCase
+        implements OceanBaseOracleTestBase {
+
+    @Override
+    protected Map<String, String> getOptions() {
+        Map<String, String> options = super.getOptions();
+        options.put("compatible-mode", "oracle");
+        return options;
+    }
+
+    @Override
+    protected TableRow createUpsertOutputTable() {
+        return tableRow(
+                "dynamicSinkForUpsert",
+                pkField("cnt", dbType("NUMBER"), DataTypes.BIGINT().notNull()),
+                field("lencnt", dbType("NUMBER"), 
DataTypes.BIGINT().notNull()),
+                pkField("cTag", DataTypes.INT().notNull()),
+                field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
+    }
+
+    @Override
+    protected TableRow createAppendOutputTable() {
+        return tableRow(
+                "dynamicSinkForAppend",
+                field("id", DataTypes.INT().notNull()),
+                field("num", dbType("NUMBER"), DataTypes.BIGINT().notNull()),
+                field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
+    }
+
+    @Override
+    protected TableRow createBatchOutputTable() {
+        return tableRow(
+                "dynamicSinkForBatch",
+                field("NAME", DataTypes.VARCHAR(20).notNull()),
+                field("SCORE", dbType("NUMBER"), 
DataTypes.BIGINT().notNull()));
+    }
+
+    @Override
+    protected TableRow createRealOutputTable() {
+        return tableRow("REAL_TABLE", field("real_data", dbType("REAL"), 
DataTypes.FLOAT()));
+    }
+
+    @Override
+    protected TableRow createCheckpointOutputTable() {
+        return tableRow(
+                "checkpointTable", field("id", dbType("NUMBER"), 
DataTypes.BIGINT().notNull()));
+    }
+
+    @Override
+    protected TableRow createUserOutputTable() {
+        return tableRow(
+                "USER_TABLE",
+                pkField("user_id", DataTypes.VARCHAR(20).notNull()),
+                field("user_name", DataTypes.VARCHAR(20).notNull()),
+                field("email", DataTypes.VARCHAR(255)),
+                field("balance", dbType("NUMBER"), DataTypes.DECIMAL(18, 2)),
+                field("balance2", dbType("NUMBER"), DataTypes.DECIMAL(18, 2)));
+    }
+
+    @Override
+    protected List<Row> testUserData() {
+        return Arrays.asList(
+                Row.of(
+                        "user1",
+                        "Tom",
+                        "tom...@gmail.com",
+                        new BigDecimal("8.1"),
+                        new BigDecimal("16.2")),
+                Row.of(
+                        "user3",
+                        "Bailey",
+                        "bai...@qq.com",
+                        new BigDecimal("9.99"),
+                        new BigDecimal("19.98")),
+                Row.of(
+                        "user4",
+                        "Tina",
+                        "t...@gmail.com",
+                        new BigDecimal("11.3"),
+                        new BigDecimal("22.6")));
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
new file mode 100644
index 00000000..a1a6fce5
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseOracleDynamicTableSourceITCase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.table;
+
+import 
org.apache.flink.connector.jdbc.databases.oceanbase.OceanBaseOracleTestBase;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Disabled;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+
+/** The Table Source ITCase for OceanBase Oracle mode. */
+@Disabled
+public class OceanBaseOracleDynamicTableSourceITCase extends 
JdbcDynamicTableSourceITCase
+        implements OceanBaseOracleTestBase {
+
+    @Override
+    protected TableRow createInputTable() {
+        return tableRow(
+                "jdbDynamicTableSource",
+                field("id", dbType("INTEGER"), DataTypes.BIGINT().notNull()),
+                field("decimal_col", DataTypes.DECIMAL(10, 4)),
+                field("timestamp6_col", dbType("TIMESTAMP"), 
DataTypes.TIMESTAMP(6)),
+                // other fields
+                field("float_col", dbType("FLOAT"), DataTypes.FLOAT()),
+                field("binary_float_col", dbType("BINARY_FLOAT"), 
DataTypes.FLOAT()),
+                field("binary_double_col", dbType("BINARY_DOUBLE"), 
DataTypes.DOUBLE()),
+                field("char_col", dbType("CHAR"), DataTypes.CHAR(1)),
+                field("nchar_col", dbType("NCHAR(3)"), DataTypes.VARCHAR(3)),
+                field("varchar2_col", dbType("VARCHAR2(30)"), 
DataTypes.VARCHAR(30)),
+                field("date_col", dbType("DATE"), DataTypes.DATE()),
+                field("timestamp9_col", dbType("TIMESTAMP(9)"), 
DataTypes.TIMESTAMP(9)),
+                field("clob_col", dbType("CLOB"), DataTypes.STRING()));
+    }
+
+    protected List<Row> getTestData() {
+        return Arrays.asList(
+                Row.of(
+                        1L,
+                        BigDecimal.valueOf(100.1234),
+                        LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+                        1.12345F,
+                        1.175E-10F,
+                        1.79769E+40D,
+                        "a",
+                        "abc",
+                        "abcdef",
+                        LocalDate.parse("1997-01-01"),
+                        LocalDateTime.parse("2020-01-01T15:35:00.123456789"),
+                        "Hello World"),
+                Row.of(
+                        2L,
+                        BigDecimal.valueOf(101.1234),
+                        LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+                        1.12345F,
+                        1.175E-10F,
+                        1.79769E+40D,
+                        "a",
+                        "abc",
+                        "abcdef",
+                        LocalDate.parse("1997-01-02"),
+                        LocalDateTime.parse("2020-01-01T15:36:01.123456789"),
+                        "Hey Leonard"));
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseTableRow.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseTableRow.java
new file mode 100644
index 00000000..29f79b35
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oceanbase/table/OceanBaseTableRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.oceanbase.table;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.tables.TableField;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** TableRow for OceanBase. */
+public class OceanBaseTableRow extends TableRow {
+
+    private final String compatibleMode;
+
+    public OceanBaseTableRow(String compatibleMode, String name, TableField[] 
fields) {
+        super(name, fields);
+        this.compatibleMode = compatibleMode;
+    }
+
+    @Override
+    public String getCreateQueryForFlink(
+            DatabaseMetadata metadata,
+            String newName,
+            List<String> newFields,
+            List<String> withParams) {
+        List<String> params = new ArrayList<>(withParams);
+        params.add("'compatible-mode'='" + compatibleMode + "'");
+        return super.getCreateQueryForFlink(metadata, newName, newFields, 
params);
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/factory/JdbcCatalogFactoryTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/factory/JdbcCatalogFactoryTest.java
index b4d8aace..cef7e3b9 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/factory/JdbcCatalogFactoryTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/factory/JdbcCatalogFactoryTest.java
@@ -57,7 +57,8 @@ class JdbcCatalogFactoryTest implements PostgresTestBase {
                         PostgresCatalog.DEFAULT_DATABASE,
                         getMetadata().getUsername(),
                         getMetadata().getPassword(),
-                        baseUrl);
+                        baseUrl,
+                        null);
     }
 
     @Test
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java
index 7ff4d6c4..d73c61e4 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java
@@ -37,7 +37,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class JdbcDialectTypeTest {
 
-    private static final String DDL_FORMAT =
+    protected String ddlFormat =
             "CREATE TABLE T (f0 %s)"
                     + " WITH ("
                     + "  'connector'='jdbc',"
@@ -91,7 +91,7 @@ public class JdbcDialectTypeTest {
     @ParameterizedTest
     @MethodSource("testData")
     void testDataTypeValidate(TestItem testItem) {
-        String sqlDDL = String.format(DDL_FORMAT, testItem.dataTypeExpr, 
testItem.dialect);
+        String sqlDDL = String.format(ddlFormat, testItem.dataTypeExpr, 
testItem.dialect);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
index c7f46deb..2d4735c6 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
@@ -361,8 +361,7 @@ public abstract class JdbcDynamicTableSinkITCase extends 
AbstractTestBase implem
                 .containsExactlyInAnyOrderElementsOf(testUserData());
     }
 
-    @Test
-    void testFlushBufferWhenCheckpoint() throws Exception {
+    protected Map<String, String> getOptions() {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "jdbc");
         options.put("url", getMetadata().getJdbcUrl());
@@ -370,10 +369,14 @@ public abstract class JdbcDynamicTableSinkITCase extends 
AbstractTestBase implem
         options.put("password", getMetadata().getPassword());
         options.put("table-name", checkpointOutputTable.getTableName());
         options.put("sink.buffer-flush.interval", "0");
+        return options;
+    }
 
+    @Test
+    void testFlushBufferWhenCheckpoint() throws Exception {
         ResolvedSchema schema = checkpointOutputTable.getTableResolvedSchema();
 
-        DynamicTableSink tableSink = createTableSink(schema, options);
+        DynamicTableSink tableSink = createTableSink(schema, getOptions());
 
         SinkRuntimeProviderContext context = new 
SinkRuntimeProviderContext(false);
         SinkFunctionProvider sinkProvider =
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
index 0fc68f66..fff4ad7a 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
@@ -152,6 +152,41 @@ class JdbcOutputFormatTest extends JdbcDataTestBase {
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    @Test
+    void testInvalidCompatibleMode() {
+        assertThatThrownBy(
+                        () -> {
+                            InternalJdbcConnectionOptions jdbcOptions =
+                                    InternalJdbcConnectionOptions.builder()
+                                            
.setDriverName(getMetadata().getDriverClass())
+                                            
.setDBUrl(getMetadata().getJdbcUrl())
+                                            .setTableName(INPUT_TABLE)
+                                            
.setCompatibleMode("invalidCompatibleMode")
+                                            .build();
+                            JdbcDmlOptions dmlOptions =
+                                    JdbcDmlOptions.builder()
+                                            
.withTableName(jdbcOptions.getTableName())
+                                            
.withDialect(jdbcOptions.getDialect())
+                                            .withFieldNames(fieldNames)
+                                            .build();
+
+                            outputFormat =
+                                    new JdbcOutputFormatBuilder()
+                                            .setJdbcOptions(jdbcOptions)
+                                            .setFieldDataTypes(fieldDataTypes)
+                                            .setJdbcDmlOptions(dmlOptions)
+                                            .setJdbcExecutionOptions(
+                                                    
JdbcExecutionOptions.builder().build())
+                                            .build();
+
+                            JdbcOutputSerializer<RowData> serializer =
+                                    JdbcOutputSerializer.of(
+                                            
getSerializer(TypeInformation.of(RowData.class), true));
+                            outputFormat.open(serializer);
+                        })
+                .isInstanceOf(UnsupportedOperationException.class);
+    }
+
     @Test
     void testIncompatibleTypes() {
         assertThatThrownBy(
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseContainer.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseContainer.java
new file mode 100644
index 00000000..ba31718e
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseContainer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.oceanbase;
+
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/** {@link JdbcDatabaseContainer} for OceanBase. */
+public class OceanBaseContainer extends 
JdbcDatabaseContainer<OceanBaseContainer> {
+
+    public static final Integer SQL_PORT = 2881;
+
+    public OceanBaseContainer(String dockerImageName) {
+        this(DockerImageName.parse(dockerImageName));
+
+        addExposedPort(SQL_PORT);
+    }
+
+    public OceanBaseContainer(DockerImageName dockerImageName) {
+        super(dockerImageName);
+    }
+
+    @Override
+    public String getDriverClassName() {
+        return "com.oceanbase.jdbc.Driver";
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return getJdbcUrl("test");
+    }
+
+    public String getJdbcUrl(String databaseName) {
+        String additionalUrlParams = constructUrlParameters("?", "&");
+        return "jdbc:oceanbase://"
+                + getHost()
+                + ":"
+                + getMappedPort(SQL_PORT)
+                + "/"
+                + databaseName
+                + additionalUrlParams;
+    }
+
+    @Override
+    public String getUsername() {
+        return "root@test";
+    }
+
+    @Override
+    public String getPassword() {
+        return "";
+    }
+
+    @Override
+    protected String getTestQueryString() {
+        return "SELECT 1";
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseDatabase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseDatabase.java
new file mode 100644
index 00000000..50001f65
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseDatabase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.oceanbase;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+/** OceanBase database for testing. */
+public class OceanBaseDatabase extends DatabaseExtension implements 
OceanBaseImages {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseDatabase.class);
+
+    private static final OceanBaseContainer CONTAINER =
+            new OceanBaseContainer(OCEANBASE_CE_4)
+                    .withEnv("MODE", "slim")
+                    .withEnv("FASTBOOT", "true")
+                    .withEnv("OB_DATAFILE_SIZE", "1G")
+                    .withEnv("OB_LOG_DISK_SIZE", "4G")
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    private static OceanBaseMetadata metadata;
+
+    public static OceanBaseMetadata getMetadata() {
+        if (!CONTAINER.isRunning()) {
+            throw new FlinkRuntimeException("Container is stopped.");
+        }
+        if (metadata == null) {
+            metadata = new OceanBaseMetadata(CONTAINER);
+        }
+        return metadata;
+    }
+
+    @Override
+    protected DatabaseMetadata startDatabase() throws Exception {
+        CONTAINER.start();
+        try (Connection connection = getMetadata().getConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("SET GLOBAL time_zone = '+00:00'");
+        }
+        return getMetadata();
+    }
+
+    @Override
+    protected void stopDatabase() throws Exception {
+        CONTAINER.stop();
+        metadata = null;
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseImages.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseImages.java
new file mode 100644
index 00000000..70194da4
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseImages.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.oceanbase;
+
+/** OceanBase docker images. */
+public interface OceanBaseImages {
+
+    String OCEANBASE_CE_4 = "oceanbase/oceanbase-ce:4.2.1_bp3";
+
+    String OCEANBASE_CE_3 = "oceanbase/oceanbase-ce:3.1.4";
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseMetadata.java
new file mode 100644
index 00000000..b9df95e5
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseMetadata.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.oceanbase;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import javax.sql.XADataSource;
+
+/** OceanBase metadata. */
+public class OceanBaseMetadata implements DatabaseMetadata {
+
+    private final String username;
+    private final String password;
+    private final String url;
+    private final String driver;
+    private final String version;
+
+    public OceanBaseMetadata(OceanBaseContainer container) {
+        this.username = container.getUsername();
+        this.password = container.getPassword();
+        this.url = container.getJdbcUrl();
+        this.driver = container.getDriverClassName();
+        this.version = container.getDockerImageName();
+    }
+
+    public OceanBaseMetadata(
+            String username, String password, String url, String driver, 
String version) {
+        this.username = username;
+        this.password = password;
+        this.url = url;
+        this.driver = driver;
+        this.version = version;
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return this.url;
+    }
+
+    @Override
+    public String getJdbcUrlWithCredentials() {
+        return String.format("%s?user=%s&password=%s", getJdbcUrl(), 
getUsername(), getPassword());
+    }
+
+    @Override
+    public String getUsername() {
+        return this.username;
+    }
+
+    @Override
+    public String getPassword() {
+        return this.password;
+    }
+
+    @Override
+    public XADataSource buildXaDataSource() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getDriverClass() {
+        return this.driver;
+    }
+
+    @Override
+    public String getVersion() {
+        return this.version;
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseTestDatabase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseTestDatabase.java
new file mode 100644
index 00000000..bf4353fb
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/oceanbase/OceanBaseTestDatabase.java
@@ -0,0 +1,25 @@
+package org.apache.flink.connector.jdbc.testutils.databases.oceanbase;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+/** OceanBase database for locally testing. */
+public class OceanBaseTestDatabase extends DatabaseExtension {
+
+    public static OceanBaseMetadata getMetadata() {
+        return new OceanBaseMetadata(
+                System.getenv("test.oceanbase.username"),
+                System.getenv("test.oceanbase.password"),
+                System.getenv("test.oceanbase.url"),
+                "com.oceanbase.jdbc.Driver",
+                "test");
+    }
+
+    @Override
+    protected DatabaseMetadata startDatabase() throws Exception {
+        return getMetadata();
+    }
+
+    @Override
+    protected void stopDatabase() throws Exception {}
+}

Reply via email to