This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 11a6ba785 [FLINK-39197][jdbc&mysql] Fix NPE when finding chunk end
(#4296)
11a6ba785 is described below
commit 11a6ba7854cd3528f351b88e951905e255af3b9c
Author: Chengbing Liu <[email protected]>
AuthorDate: Mon Mar 30 14:54:34 2026 +0800
[FLINK-39197][jdbc&mysql] Fix NPE when finding chunk end (#4296)
Co-authored-by: Chengbing Liu <[email protected]>
---
.../assigner/splitter/JdbcSourceChunkSplitter.java | 7 +-
.../splitter/JdbcSourceChunkSplitterTest.java | 100 +++++++++++++++++++++
.../mysql/source/assigners/MySqlChunkSplitter.java | 6 +-
.../source/assigners/MySqlChunkSplitterTest.java | 56 ++++++++++++
4 files changed, 167 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 545db56f8..8bbc93588 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import
org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
@@ -507,7 +508,8 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
return splits;
}
- private Object nextChunkEnd(
+ @VisibleForTesting
+ Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
@@ -518,6 +520,9 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
+ if (chunkEnd == null) {
+ return null;
+ }
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitterTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitterTest.java
new file mode 100644
index 000000000..a52edc636
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
+
+import org.apache.flink.table.api.DataTypes;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+
+/** Tests for {@link JdbcSourceChunkSplitter}. */
+class JdbcSourceChunkSplitterTest {
+
+ /**
+ * Guards against regressions where {@code queryNextChunkMax} may return
{@code null} (e.g. when
+ * the max row was removed after MIN/MAX was determined). {@code
nextChunkEnd} must handle this
+ * gracefully and return null without throwing exceptions.
+ */
+ @Test
+ void testNextChunkEndReturnsNullWhenMaxRowRemoved() throws Exception {
+ // given a splitter whose queryNextChunkMax always returns null
+ JdbcSourceChunkSplitter splitter = new
TestingJdbcSourceChunkSplitter(null);
+
+ TableId tableId = TableId.parse("catalog.db.table");
+ Column splitColumn =
+
Column.editor().name("id").type("INT").jdbcType(java.sql.Types.INTEGER).create();
+
+ Object previousChunkEnd = 10;
+ Object max = 100;
+ int chunkSize = 5;
+
+ // when queryNextChunkMax returns null, nextChunkEnd should also
return null
+ Object result =
+ splitter.nextChunkEnd(null, previousChunkEnd, tableId,
splitColumn, max, chunkSize);
+
+ Assertions.assertThat(result).isNull();
+ }
+
+ /** Minimal testing implementation that stubs out JDBC interactions. */
+ private static class TestingJdbcSourceChunkSplitter extends
JdbcSourceChunkSplitter {
+
+ @Nullable private final Object nextChunkMaxResult;
+
+ TestingJdbcSourceChunkSplitter(@Nullable Object nextChunkMaxResult) {
+ super(null, null, null, null, null);
+ this.nextChunkMaxResult = nextChunkMaxResult;
+ }
+
+ @Override
+ protected Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ Column splitColumn,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ return nextChunkMaxResult;
+ }
+
+ @Override
+ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId
tableId)
+ throws SQLException {
+ return 0L;
+ }
+
+ @Override
+ protected Object queryMin(
+ JdbcConnection jdbc, TableId tableId, Column splitColumn,
Object excludedLowerBound)
+ throws SQLException {
+ return null;
+ }
+
+ @Override
+ protected org.apache.flink.table.types.DataType fromDbzColumn(Column
splitColumn) {
+ // The concrete type is irrelevant for this test; just return a
simple numeric type.
+ return DataTypes.BIGINT();
+ }
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
index 4821eaba2..b7efb9ab8 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
@@ -322,7 +322,8 @@ public class MySqlChunkSplitter implements ChunkSplitter {
return splits;
}
- private Object nextChunkEnd(
+ @VisibleForTesting
+ Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
@@ -334,6 +335,9 @@ public class MySqlChunkSplitter implements ChunkSplitter {
Object chunkEnd =
StatementUtils.queryNextChunkMax(
jdbc, tableId, splitColumnName, chunkSize,
previousChunkEnd);
+ if (chunkEnd == null) {
+ return null;
+ }
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
index 33f50333a..0e4f369bb 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
@@ -21,11 +21,16 @@ import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import io.debezium.config.Configuration;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.sql.SQLException;
import java.time.ZoneId;
+import java.util.Collections;
import java.util.List;
/** Tests for {@link
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter}. */
@@ -85,4 +90,55 @@ class MySqlChunkSplitterTest {
ChunkRange.of(2147483637, 2147483647),
ChunkRange.of(2147483647, null));
}
+
+ @Test
+ void testNextChunkEndReturnsNullWhenMaxRowRemoved() throws Exception {
+ // given a MySqlChunkSplitter instance
+ MySqlSourceConfig sourceConfig =
+ new MySqlSourceConfigFactory()
+ .startupOptions(StartupOptions.initial())
+ .databaseList("")
+ .tableList("")
+ .hostname("")
+ .username("")
+ .password("")
+ .serverTimeZone(ZoneId.of("UTC").toString())
+ .assignUnboundedChunkFirst(false)
+ .createConfig(0);
+ MySqlChunkSplitter splitter = new MySqlChunkSplitter(null,
sourceConfig);
+
+ // and a JdbcConnection whose prepareQueryAndMap always returns null,
+ // so that StatementUtils.queryNextChunkMax(... ) returns null
+ JdbcConfiguration jdbcConfiguration =
+
JdbcConfiguration.adapt(Configuration.from(Collections.emptyMap()));
+ JdbcConnection jdbc =
+ new JdbcConnection(
+ jdbcConfiguration,
+ config -> {
+ throw new SQLException("Connection not used in
test");
+ },
+ "`",
+ "`") {
+ @Override
+ public <T> T prepareQueryAndMap(
+ String query,
+ StatementPreparer statementPreparer,
+ ResultSetMapper<T> mapper)
+ throws SQLException {
+ return null;
+ }
+ };
+
+ TableId tableId = new TableId("catalog", "db", "tab");
+ Object previousChunkEnd = 10;
+ Object max = 100;
+ int chunkSize = 5;
+
+ Object result =
+ splitter.nextChunkEnd(jdbc, previousChunkEnd, tableId, "id",
max, chunkSize);
+
+ // when queryNextChunkMax returns null, nextChunkEnd should also
return null
+ // instead of propagating the null further and causing errors
+ Assertions.assertThat(result).isNull();
+ }
}