This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c687050d88 [Feature][CDC] Support for preferring numeric fields as 
split keys (#5384)
c687050d88 is described below

commit c687050d882873c93a414ca81e6b548ee3c29a94
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Wed Sep 27 10:00:32 2023 +0800

    [Feature][CDC] Support for preferring numeric fields as split keys (#5384)
    
    * [Feature][CDC] Support for preferring numeric fields as split keys
    
    ---------
    
    Co-authored-by: zhouyao <[email protected]>
---
 .../connector-cdc/connector-cdc-base/pom.xml       |  10 +-
 .../splitter/AbstractJdbcSourceChunkSplitter.java  |  44 +++-
 .../jdbc/source/JdbcSourceChunkSplitterTest.java   | 245 +++++++++++++++++++++
 3 files changed, 296 insertions(+), 3 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index 9d813003fa..baefc91758 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -30,6 +30,7 @@
 
     <properties>
         <hikaricp.version>4.0.3</hikaricp.version>
+        <junit.vserion>4.13.2</junit.vserion>
     </properties>
 
     <dependencyManagement>
@@ -37,7 +38,7 @@
             <dependency>
                 <groupId>com.zaxxer</groupId>
                 <artifactId>HikariCP</artifactId>
-                <version>4.0.3</version>
+                <version>${hikaricp.version}</version>
             </dependency>
 
             <dependency>
@@ -97,6 +98,13 @@
             <artifactId>seatunnel-format-compatible-debezium-json</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.vserion}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index e99e7dab4b..dc1d977358 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -309,6 +309,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
     }
 
     // 
------------------------------------------------------------------------------------------
+
     /** Returns the distribution factor of the given table. */
     @SuppressWarnings("MagicNumber")
     protected double calculateDistributionFactor(
@@ -356,6 +357,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
             JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
             throws SQLException {
         Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
+        Column splitColumn = null;
         if (primaryKey.isPresent()) {
             List<String> pkColumns = primaryKey.get().getColumnNames();
 
@@ -363,7 +365,10 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
             for (String pkColumn : pkColumns) {
                 Column column = table.columnWithName(pkColumn);
                 if (isEvenlySplitColumn(column)) {
-                    return column;
+                    splitColumn = columnComparable(splitColumn, column);
+                    if (sqlTypePriority(splitColumn) == 1) {
+                        return splitColumn;
+                    }
                 }
             }
         }
@@ -377,11 +382,17 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
                 for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : 
uniqueKeyColumns) {
                     Column column = 
table.columnWithName(uniqueKeyColumn.getColumnName());
                     if (isEvenlySplitColumn(column)) {
-                        return column;
+                        splitColumn = columnComparable(splitColumn, column);
+                        if (sqlTypePriority(splitColumn) == 1) {
+                            return splitColumn;
+                        }
                     }
                 }
             }
         }
+        if (splitColumn != null) {
+            return splitColumn;
+        }
 
         throw new UnsupportedOperationException(
                 String.format(
@@ -410,4 +421,33 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
             log.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
         }
     }
+
+    private int sqlTypePriority(Column splitColumn) {
+        switch (fromDbzColumn(splitColumn).getSqlType()) {
+            case TINYINT:
+                return 1;
+            case SMALLINT:
+                return 2;
+            case INT:
+                return 3;
+            case BIGINT:
+                return 4;
+            case DECIMAL:
+                return 5;
+            case STRING:
+                return 6;
+            default:
+                return Integer.MAX_VALUE;
+        }
+    }
+
+    private Column columnComparable(Column then, Column other) {
+        if (then == null) {
+            return other;
+        }
+        if (sqlTypePriority(then) > sqlTypePriority(other)) {
+            return other;
+        }
+        return then;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
new file mode 100644
index 0000000000..86500f248f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jdbc.source;
+
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
+import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+public class JdbcSourceChunkSplitterTest {
+
+    @Test
+    public void splitColumnTest() throws SQLException {
+        TestJdbcSourceChunkSplitter testJdbcSourceChunkSplitter =
+                new TestJdbcSourceChunkSplitter(null, new TestSourceDialect());
+        Column splitColumn =
+                testJdbcSourceChunkSplitter.getSplitColumn(
+                        null, new TestSourceDialect(), new TableId("", "", 
""));
+        Assertions.assertEquals(splitColumn.typeName(), "tinyint");
+    }
+
+    private class TestJdbcSourceChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
+
+        public TestJdbcSourceChunkSplitter(
+                JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
+            super(sourceConfig, dialect);
+        }
+
+        @Override
+        public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
+                throws SQLException {
+            return new Object[0];
+        }
+
+        @Override
+        public Object queryMin(
+                JdbcConnection jdbc, TableId tableId, String columnName, 
Object excludedLowerBound)
+                throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Object[] sampleDataFromColumn(
+                JdbcConnection jdbc, TableId tableId, String columnName, int 
samplingRate)
+                throws SQLException {
+            return new Object[0];
+        }
+
+        @Override
+        public Object queryNextChunkMax(
+                JdbcConnection jdbc,
+                TableId tableId,
+                String columnName,
+                int chunkSize,
+                Object includedLowerBound)
+                throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
+                throws SQLException {
+            return null;
+        }
+
+        @Override
+        public String buildSplitScanQuery(
+                TableId tableId,
+                SeaTunnelRowType splitKeyType,
+                boolean isFirstSplit,
+                boolean isLastSplit) {
+            return null;
+        }
+
+        @Override
+        public SeaTunnelDataType<?> fromDbzColumn(Column splitColumn) {
+            String typeName = splitColumn.typeName();
+            switch (typeName) {
+                case "varchar":
+                    return BasicType.STRING_TYPE;
+                case "tinyint":
+                    return BasicType.BYTE_TYPE;
+                case "smallint":
+                    return BasicType.SHORT_TYPE;
+                case "int":
+                    return BasicType.INT_TYPE;
+                case "bigint":
+                    return BasicType.LONG_TYPE;
+                case "decimal":
+                    return new DecimalType(20, 0);
+                default:
+                    return BasicType.STRING_TYPE;
+            }
+        }
+
+        @Override
+        public Column getSplitColumn(
+                JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
+                throws SQLException {
+            return super.getSplitColumn(jdbc, dialect, tableId);
+        }
+    }
+
+    private class TestSourceDialect implements JdbcDataSourceDialect {
+
+        @Override
+        public String getName() {
+            return null;
+        }
+
+        @Override
+        public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig 
sourceConfig) {
+            return false;
+        }
+
+        @Override
+        public ChunkSplitter createChunkSplitter(JdbcSourceConfig 
sourceConfig) {
+            return null;
+        }
+
+        @Override
+        public List<TableId> discoverDataCollections(JdbcSourceConfig 
sourceConfig) {
+            return null;
+        }
+
+        @Override
+        public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+            return null;
+        }
+
+        @Override
+        public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
+
+            Table table =
+                    Table.editor()
+                            .tableId(tableId)
+                            .addColumns(
+                                    Column.editor()
+                                            .name("string_col")
+                                            .jdbcType(Types.VARCHAR)
+                                            .type("varchar")
+                                            .create(),
+                                    Column.editor()
+                                            .name("smallint")
+                                            .jdbcType(Types.SMALLINT)
+                                            .type("smallint")
+                                            .create(),
+                                    Column.editor()
+                                            .name("int")
+                                            .jdbcType(Types.INTEGER)
+                                            .type("int")
+                                            .create(),
+                                    Column.editor()
+                                            .name("decimal")
+                                            .jdbcType(Types.DECIMAL)
+                                            .type("decimal")
+                                            .create(),
+                                    Column.editor()
+                                            .name("tinyint_col")
+                                            .jdbcType(Types.TINYINT)
+                                            .type("tinyint")
+                                            .create(),
+                                    Column.editor()
+                                            .name("bigint_col")
+                                            .jdbcType(Types.BIGINT)
+                                            .type("bigint")
+                                            .create())
+                            .create();
+            return new 
TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
+        }
+
+        @Override
+        public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase 
sourceSplitBase) {
+            return null;
+        }
+
+        @Override
+        public JdbcSourceFetchTaskContext createFetchTaskContext(
+                SourceSplitBase sourceSplitBase, JdbcSourceConfig 
taskSourceConfig) {
+            return null;
+        }
+
+        @Override
+        public Optional<PrimaryKey> getPrimaryKey(JdbcConnection 
jdbcConnection, TableId tableId)
+                throws SQLException {
+            return Optional.of(
+                    PrimaryKey.of(
+                            "pkName",
+                            Arrays.asList(
+                                    "string_col",
+                                    "smallint",
+                                    "int",
+                                    "decimal",
+                                    "tinyint_col",
+                                    "bigint_col")));
+        }
+
+        @Override
+        public List<ConstraintKey> getUniqueKeys(JdbcConnection 
jdbcConnection, TableId tableId)
+                throws SQLException {
+            return new ArrayList<ConstraintKey>();
+        }
+    }
+}

Reply via email to