Copilot commented on code in PR #4117: URL: https://github.com/apache/flink-cdc/pull/4117#discussion_r2850604900
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Proxy; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils}. */ +class StatementUtilsTest { + + @Test + void testSetSafeObjectConvertsBigIntegerToBigDecimal() throws SQLException { + Map<String, Object> invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + // Create a BigInteger value that exceeds Long.MAX_VALUE + BigInteger bigIntValue = new BigInteger("9223372036854775808"); // Long.MAX_VALUE + 1 + BigDecimal expectedBigDecimal = new BigDecimal(bigIntValue); + + // Use the safe method + StatementUtils.setSafeObject(psProxy, 1, bigIntValue); + + // Assert that it correctly used setBigDecimal with the converted BigDecimal value + assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal"); + assertThat(invocationDetails.get("value")).isInstanceOf(BigDecimal.class); + assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal); + } + + @Test + void testSetSafeObjectHandlesLargeBigIntegerValues() throws SQLException { + Map<String, Object> invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + // Test with BIGINT UNSIGNED max value + BigInteger maxUnsignedBigInt = new BigInteger("18446744073709551615"); // 2^64 - 1 + BigDecimal expectedBigDecimal = new BigDecimal(maxUnsignedBigInt); + + StatementUtils.setSafeObject(psProxy, 1, maxUnsignedBigInt); + + assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal"); + assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal); + } + + @Test + void testSetSafeObjectHandlesRegularValues() throws SQLException { + Map<String, Object> invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + // Test with a common Long + StatementUtils.setSafeObject(psProxy, 1, 123L); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo(123L); + invocationDetails.clear(); + + // Test with a String + StatementUtils.setSafeObject(psProxy, 2, "test"); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo("test"); + invocationDetails.clear(); + + // Test with an Integer + StatementUtils.setSafeObject(psProxy, 3, 456); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo(456); + invocationDetails.clear(); + + // Test with null + StatementUtils.setSafeObject(psProxy, 4, null); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isNull(); + } Review Comment: The test doesn't verify the behavior for BigInteger values that are within the Long range (between Long.MIN_VALUE and Long.MAX_VALUE inclusive). According to the implementation in setSafeObject, these values are converted to long before being set via setObject. Add a test case to verify this behavior, for example testing with BigInteger.valueOf(Long.MAX_VALUE) to ensure it correctly uses setObject with a long value. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java: ########## @@ -76,6 +78,24 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) }); } + // PreparedStatement#setObject method will be converted to long type when handling bigint + // unsigned, which poses a data overflow issue for values exceeding Long.MAX_VALUE. + // Therefore, we need to convert to BigDecimal when the value is outside the long range Review Comment: The comment should follow Javadoc format for consistency with other public methods in this class. Consider converting this to a proper Javadoc comment with `@param` and `@throws` tags. ```suggestion /** * Safely sets an object parameter on a {@link PreparedStatement}, handling unsigned BIGINT * values to avoid overflow. {@code PreparedStatement#setObject} converts {@link BigInteger} * values to {@code long}, which can overflow for values outside the {@code long} range; this * method converts such values to {@link BigDecimal} instead. * * @param ps the prepared statement on which the parameter is to be set * @param parameterIndex the index of the parameter to set * @param value the parameter value; {@link BigInteger} values outside the {@code long} range * are converted to {@link BigDecimal} * @throws SQLException if setting the parameter on the prepared statement fails */ ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Proxy; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils}. */ +class StatementUtilsTest { + + @Test + void testSetSafeObjectConvertsBigIntegerToBigDecimal() throws SQLException { + Map<String, Object> invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + // Create a BigInteger value that exceeds Long.MAX_VALUE + BigInteger bigIntValue = new BigInteger("9223372036854775808"); // Long.MAX_VALUE + 1 + BigDecimal expectedBigDecimal = new BigDecimal(bigIntValue); + + // Use the safe method + StatementUtils.setSafeObject(psProxy, 1, bigIntValue); + + // Assert that it correctly used setBigDecimal with the converted BigDecimal value + assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal"); + assertThat(invocationDetails.get("value")).isInstanceOf(BigDecimal.class); + assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal); + } + + @Test + void testSetSafeObjectHandlesLargeBigIntegerValues() throws SQLException { + Map<String, Object> invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + // Test with BIGINT UNSIGNED max value + BigInteger maxUnsignedBigInt = new BigInteger("18446744073709551615"); // 2^64 - 1 + BigDecimal expectedBigDecimal = new BigDecimal(maxUnsignedBigInt); + + StatementUtils.setSafeObject(psProxy, 1, maxUnsignedBigInt); + + assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal"); + assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal); + } + + @Test + void testSetSafeObjectHandlesRegularValues() throws SQLException { + Map<String, Object> invocationDetails = new HashMap<>(); + PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); + + // Test with a common Long + StatementUtils.setSafeObject(psProxy, 1, 123L); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo(123L); + invocationDetails.clear(); + + // Test with a String + StatementUtils.setSafeObject(psProxy, 2, "test"); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo("test"); + invocationDetails.clear(); + + // Test with an Integer + StatementUtils.setSafeObject(psProxy, 3, 456); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isEqualTo(456); + invocationDetails.clear(); + + // Test with null + StatementUtils.setSafeObject(psProxy, 4, null); + assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); + assertThat(invocationDetails.get("value")).isNull(); + } Review Comment: The test cases should include a test for negative BigInteger values to ensure they are handled correctly. While MySQL BIGINT UNSIGNED cannot have negative values, the code handles negative BigInteger values that are less than Long.MIN_VALUE. Add a test case with a value like BigInteger.valueOf(Long.MIN_VALUE).subtract(BigInteger.ONE) to verify this edge case works as expected. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java: ########## @@ -76,6 +78,24 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) }); } + // PreparedStatement#setObject method will be converted to long type when handling bigint + // unsigned, which poses a data overflow issue for values exceeding Long.MAX_VALUE. + // Therefore, we need to convert to BigDecimal when the value is outside the long range + public static void setSafeObject(PreparedStatement ps, int parameterIndex, Object value) + throws SQLException { + if (value instanceof BigInteger) { + BigInteger bigIntValue = (BigInteger) value; + if (bigIntValue.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0 + || bigIntValue.compareTo(BigInteger.valueOf(Long.MIN_VALUE)) < 0) { + ps.setBigDecimal(parameterIndex, new BigDecimal(bigIntValue)); + } else { + ps.setObject(parameterIndex, bigIntValue.longValueExact()); + } Review Comment: The conversion to long using longValueExact() is safe for BigInteger values within the Long range, but this creates an inconsistency in behavior. When a BigInteger is within the Long range, it's converted to a long before being set, which means the JDBC driver will handle it as a Long. When it's outside the range, it's converted to BigDecimal. This inconsistency could lead to subtle differences in how MySQL interprets these values, especially for unsigned columns. Consider always using BigDecimal for BigInteger values to ensure consistent behavior regardless of the magnitude. ```suggestion // unsigned, which poses a data overflow issue for values exceeding Long.MAX_VALUE and can // also lead to inconsistent behavior between in-range and out-of-range BigInteger values. // To ensure consistent handling, always bind BigInteger values as BigDecimal. public static void setSafeObject(PreparedStatement ps, int parameterIndex, Object value) throws SQLException { if (value instanceof BigInteger) { BigInteger bigIntValue = (BigInteger) value; ps.setBigDecimal(parameterIndex, new BigDecimal(bigIntValue)); ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java: ########## @@ -1447,6 +1448,99 @@ private boolean hasNextData(final CloseableIterator<?> iterator) } } + @Test + void testUnsignedBigintPrimaryKeyChunking() throws Exception { + customDatabase.createAndInitialize(); + + String db = customDatabase.getDatabaseName(); + String table = "unsigned_bigint_pk"; + try (MySqlConnection connection = getConnection()) { + connection.setAutoCommit(false); + String createSql = + String.format( + "CREATE TABLE %s.%s (\n" + + " `order_id` BIGINT UNSIGNED NOT NULL,\n" + + " `desc` VARCHAR(512) NOT NULL,\n" + + " PRIMARY KEY (`order_id`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;", + StatementUtils.quote(db), StatementUtils.quote(table)); + // Insert sample data including values near UNSIGNED BIGINT max + String insertSql = + String.format( + "INSERT INTO %s.%s (`order_id`, `desc`) VALUES " + + "(1, 'flink'),(2, 'flink'),(3, 'flink'),(4, 'flink'),(5, 'flink')," + + "(6, 'flink'),(7, 'flink'),(8, 'flink'),(9, 'flink'),(10, 'flink')," + + "(11, 'flink'),(12, 'flink')," + + "(18446744073709551604, 'flink'),(18446744073709551605, 'flink')," + + "(18446744073709551606, 'flink'),(18446744073709551607, 'flink')," + + "(18446744073709551608, 'flink'),(18446744073709551609, 'flink')," + + "(18446744073709551610, 'flink'),(18446744073709551611, 'flink')," + + "(18446744073709551612, 'flink'),(18446744073709551613, 'flink')," + + "(18446744073709551614, 'flink'),(18446744073709551615, 'flink');", + StatementUtils.quote(db), StatementUtils.quote(table)); + // Drop if exists to be idempotent across runs, then create and insert + connection.execute( + String.format( + "DROP TABLE IF EXISTS %s.%s;", + StatementUtils.quote(db), StatementUtils.quote(table)), + createSql, + insertSql); + connection.commit(); + } + + // Build a source reading only the unsigned_bigint_pk table + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)), + DataTypes.FIELD("desc", DataTypes.STRING())); + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType); + RowDataDebeziumDeserializeSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + + MySqlSource<RowData> source = + MySqlSource.<RowData>builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(customDatabase.getUsername()) + .password(customDatabase.getPassword()) + .serverTimeZone("UTC") + .databaseList(db) + .tableList(db + "." + table) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .chunkKeyColumn(new ObjectPath(db, table), "order_id") + .splitSize(2) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + try (CloseableIterator<RowData> it = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source") + .executeAndCollect()) { + // Expect 24 records as inserted above Review Comment: The test comment states "Expect 24 records as inserted above" but the actual insert statement inserts 12 small values (1-12) and 12 large values (18446744073709551604-18446744073709551615), which indeed totals 24. However, the comment should be more explicit about what records are expected to help with debugging if the test fails. Consider adding a more descriptive comment or using a named constant for the expected count. ```suggestion // Expect 24 records: 12 small PKs (1–12) and 12 large PKs // (18446744073709551604–18446744073709551615) inserted in unsigned_bigint_pk ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
