[ 
https://issues.apache.org/jira/browse/FLINK-32714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-32714:
--------------------------------------

    Assignee: He Wang

> JDBC: Add dialect for OceanBase database
> ----------------------------------------
>
>                 Key: FLINK-32714
>                 URL: https://issues.apache.org/jira/browse/FLINK-32714
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: He Wang
>            Assignee: He Wang
>            Priority: Minor
>              Labels: auto-deprioritized-major, pull-request-available
>             Fix For: jdbc-3.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> OceanBase is a distributed relational database, the community edition of 
> OceanBase is open sourced at [https://github.com/oceanbase/oceanbase.]
> The enterprise edition of OceanBase is compatible with MySql and Oracle, 
> which means we can reuse almost all the dialect rules. 
> The difference from other databases is that we must provide the compatibility 
> mode firstly, then the connector can determine which dialect to use, so a 
> startup option like 'compatible-mode'  is needed.
> A dialect implementation for OceanBase is like below: 
> {code:java}
> package org.apache.flink.connector.jdbc.databases.oceanbase;
> import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
> import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
> import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
> import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
> import org.apache.flink.table.types.logical.LogicalTypeRoot;
> import org.apache.flink.table.types.logical.RowType;
> import javax.annotation.Nonnull;
> import java.util.Optional;
> import java.util.Set;
> /** JDBC dialect for OceanBase. */
> public class OceanBaseDialect extends AbstractDialect {
>     private static final long serialVersionUID = 1L;
>     private final AbstractDialect dialect;
>     public OceanBaseDialect(@Nonnull String compatibleMode) {
>         switch (compatibleMode.toLowerCase()) {
>             case "mysql":
>                 this.dialect = new MySqlDialect();
>                 break;
>             case "oracle":
>                 this.dialect = new OracleDialect();
>                 break;
>             default:
>                 throw new IllegalArgumentException(
>                         "Unsupported compatible mode: " + compatibleMode);
>         }
>     }
>     @Override
>     public String dialectName() {
>         return "OceanBase";
>     }
>     @Override
>     public Optional<String> defaultDriverName() {
>         return Optional.of("com.oceanbase.jdbc.Driver");
>     }
>     @Override
>     public Set<LogicalTypeRoot> supportedTypes() {
>         return dialect.supportedTypes();
>     }
>     @Override
>     public JdbcRowConverter getRowConverter(RowType rowType) {
>         return dialect.getRowConverter(rowType);
>     }
>     @Override
>     public String getLimitClause(long limit) {
>         return dialect.getLimitClause(limit);
>     }
>     @Override
>     public String quoteIdentifier(String identifier) {
>         return dialect.quoteIdentifier(identifier);
>     }
>     @Override
>     public Optional<String> getUpsertStatement(
>             String tableName, String[] fieldNames, String[] conditionFields) {
>         return dialect.getUpsertStatement(tableName, fieldNames, 
> conditionFields);
>     }
>     @Override
>     public Optional<Range> timestampPrecisionRange() {
>         return dialect.timestampPrecisionRange();
>     }
>     @Override
>     public Optional<Range> decimalPrecisionRange() {
>         return dialect.decimalPrecisionRange();
>     }
>     @Override
>     public String appendDefaultUrlProperties(String url) {
>         return dialect.appendDefaultUrlProperties(url);
>     }
> }
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to