This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new f6d1d4810 [FLINK-35912][cdc-connector] SqlServer CDC doesn't chunk
UUID-typed columns correctly (#3497)
f6d1d4810 is described below
commit f6d1d4810af34354da262402c3871c7db3d1a414
Author: lipl <[email protected]>
AuthorDate: Tue Aug 6 17:43:09 2024 +0800
[FLINK-35912][cdc-connector] SqlServer CDC doesn't chunk UUID-typed columns
correctly (#3497)
* resolve conficts
* polish code to trigger ci
---------
Co-authored-by: Kael <[email protected]>
Co-authored-by: gongzhongqiang <[email protected]>
---
.../assigner/splitter/JdbcSourceChunkSplitter.java | 8 +--
.../assigner/splitter/OracleChunkSplitter.java | 4 +-
.../source/dialect/SqlServerChunkSplitter.java | 9 +++
.../sqlserver/source/utils/SqlServerTypeUtils.java | 3 +
.../sqlserver/source/utils/SqlServerUtils.java | 66 +++++++++++++++----
.../source/utils/SQLServerUUIDComparatorTest.java | 75 ++++++++++++++++++++++
6 files changed, 147 insertions(+), 18 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 1d50164cc..508e8cb9b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -214,12 +214,12 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
}
/** ChunkEnd less than or equal to max. */
- protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+ protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column
splitColumn) {
return ObjectUtils.compare(chunkEnd, max) <= 0;
}
/** ChunkEnd greater than or equal to max. */
- protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+ protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column
splitColumn) {
return ObjectUtils.compare(chunkEnd, max) >= 0;
}
@@ -368,7 +368,7 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
int count = 0;
- while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
+ while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max,
splitColumn)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
@@ -397,7 +397,7 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
- if (isChunkEndGeMax(chunkEnd, max)) {
+ if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
return null;
} else {
return chunkEnd;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
index adf4c98a9..ffcdb4bb2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
@@ -102,7 +102,7 @@ public class OracleChunkSplitter extends
JdbcSourceChunkSplitter {
/** ChunkEnd less than or equal to max. */
@Override
- protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+ protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column
splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
@@ -116,7 +116,7 @@ public class OracleChunkSplitter extends
JdbcSourceChunkSplitter {
/** ChunkEnd greater than or equal to max. */
@Override
- protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+ protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column
splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index fb338a0ea..e79d0bcfa 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -64,4 +64,13 @@ public class SqlServerChunkSplitter extends
JdbcSourceChunkSplitter {
throws SQLException {
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}
+
+ protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
+ }
+
+ /** ChunkEnd greater than or equal to max. */
+ protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column
splitColumn) {
+ return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
index 3d7163189..8f1853837 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -27,6 +27,9 @@ import java.sql.Types;
/** Utilities for converting from SqlServer types to Flink types. */
public class SqlServerTypeUtils {
+ /** Microsoft SQL type GUID's type name. */
+ static final String UNIQUEIDENTIFIRER = "uniqueidentifier";
+
/** Returns a corresponding Flink data type from a debezium {@link
Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
index b2292a825..e389a5128 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.sqlserver.source.utils;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnOffset;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
@@ -40,14 +41,17 @@ import org.apache.kafka.connect.source.SourceRecord;
import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.FIELD;
@@ -297,8 +301,7 @@ public class SqlServerUtils {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition),
Optional.empty());
} else {
- final String orderBy =
-
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
+ final String orderBy = String.join(", ",
pkRowType.getFieldNames());
return buildSelectWithBoundaryRowLimits(
tableId,
limitSize,
@@ -322,7 +325,7 @@ public class SqlServerUtils {
StringBuilder sql = new StringBuilder();
for (Iterator<String> fieldNamesIt =
pkRowType.getFieldNames().iterator();
fieldNamesIt.hasNext(); ) {
- sql.append("MAX(" + fieldNamesIt.next() + ")");
+ sql.append("MAX(").append(fieldNamesIt.next()).append(")");
if (fieldNamesIt.hasNext()) {
sql.append(" , ");
}
@@ -342,12 +345,8 @@ public class SqlServerUtils {
}
sql.append(projection).append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
- if (condition.isPresent()) {
- sql.append(" WHERE ").append(condition.get());
- }
- if (orderBy.isPresent()) {
- sql.append(" ORDER BY ").append(orderBy.get());
- }
+ condition.ifPresent(s -> sql.append(" WHERE ").append(s));
+ orderBy.ifPresent(s -> sql.append(" ORDER BY ").append(s));
return sql.toString();
}
@@ -396,11 +395,54 @@ public class SqlServerUtils {
sql.append(projection);
sql.append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
- if (condition.isPresent()) {
- sql.append(" WHERE ").append(condition.get());
- }
+ condition.ifPresent(s -> sql.append(" WHERE ").append(s));
sql.append(" ORDER BY ").append(orderBy);
sql.append(") T");
return sql.toString();
}
+
+ public static int compare(Object obj1, Object obj2, Column splitColumn) {
+ if
(splitColumn.typeName().equals(SqlServerTypeUtils.UNIQUEIDENTIFIRER)) {
+ return new SQLServerUUIDComparator()
+ .compare(UUID.fromString(obj1.toString()),
UUID.fromString(obj2.toString()));
+ }
+ return ObjectUtils.compare(obj1, obj2);
+ }
+
+ /**
+ * Comparator for SQL Server UUIDs. SQL Server compares UUIDs in a
different order than Java.
+ * Reference code: <a
+ *
href="https://github.com/dotnet/runtime/blob/5535e31a712343a63f5d7d796cd874e563e5ac14/src/libraries/System.Data.Common/src/System/Data/SQLTypes/SQLGuid.cs#L113">SQLGuid.cs::CompareTo</a>
+ * Reference doc: <a
+ *
href="https://learn.microsoft.com/uk-ua/sql/connect/ado-net/sql/compare-guid-uniqueidentifier-values?view=sql-server-ver16">Comparing
+ * GUID and uniqueidentifier values</a>
+ */
+ static class SQLServerUUIDComparator implements Comparator<UUID> {
+
+ private static final int SIZE_OF_GUID = 16;
+ private static final byte[] GUID_ORDER = {
+ 10, 11, 12, 13, 14, 15, 8, 9, 6, 7, 4, 5, 0, 1, 2, 3
+ };
+
+ public int compare(UUID uuid1, UUID uuid2) {
+ byte[] bytes1 = uuidToBytes(uuid1);
+ byte[] bytes2 = uuidToBytes(uuid2);
+
+ for (int i = 0; i < SIZE_OF_GUID; i++) {
+ byte b1 = bytes1[GUID_ORDER[i]];
+ byte b2 = bytes2[GUID_ORDER[i]];
+ if (b1 != b2) {
+ return (b1 & 0xFF) - (b2 & 0xFF); // Unsigned byte
comparison
+ }
+ }
+ return 0;
+ }
+
+ private byte[] uuidToBytes(UUID uuid) {
+ ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+ bb.putLong(uuid.getMostSignificantBits());
+ bb.putLong(uuid.getLeastSignificantBits());
+ return bb.array();
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java
new file mode 100644
index 000000000..89a292777
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.sqlserver.source.utils;
+
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Unit test for {@link SqlServerUtils.SQLServerUUIDComparator}. * */
+public class SQLServerUUIDComparatorTest {
+
+ @Test
+ public void testComparator() {
+ SqlServerUtils.SQLServerUUIDComparator comparator =
+ new SqlServerUtils.SQLServerUUIDComparator();
+ // Create an ArrayList and fill it with Guid values.
+ List<UUID> guidList = new ArrayList<>();
+ guidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
+ guidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
+ guidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));
+
+ // Sort the Guids.
+ guidList.sort(ObjectUtils::compare);
+
+ assertEquals(
+ guidList.get(0).toString().toUpperCase(),
"1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
+ assertEquals(
+ guidList.get(1).toString().toUpperCase(),
"2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
+ assertEquals(
+ guidList.get(2).toString().toUpperCase(),
"3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
+
+ // Create an ArrayList of SqlGuids.
+ List<UUID> sqlGuidList = new ArrayList<>();
+
sqlGuidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
+
sqlGuidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
+
sqlGuidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));
+
+ // Sort the SqlGuids. The unsorted SqlGuids are in the same order
+ // as the unsorted Guid values.
+ sqlGuidList.sort(comparator);
+
+ // Display the sorted SqlGuids. The sorted SqlGuid values are ordered
+ // differently than the Guid values.
+ assertEquals(
+ sqlGuidList.get(0).toString().toUpperCase(),
+ "2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
+ assertEquals(
+ sqlGuidList.get(1).toString().toUpperCase(),
+ "3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
+ assertEquals(
+ sqlGuidList.get(2).toString().toUpperCase(),
+ "1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
+ }
+}