This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 11a6ba785 [FLINK-39197][jdbc&mysql] Fix NPE when finding chunk end 
(#4296)
11a6ba785 is described below

commit 11a6ba7854cd3528f351b88e951905e255af3b9c
Author: Chengbing Liu <[email protected]>
AuthorDate: Mon Mar 30 14:54:34 2026 +0800

    [FLINK-39197][jdbc&mysql] Fix NPE when finding chunk end (#4296)
    
    Co-authored-by: Chengbing Liu <[email protected]>
---
 .../assigner/splitter/JdbcSourceChunkSplitter.java |   7 +-
 .../splitter/JdbcSourceChunkSplitterTest.java      | 100 +++++++++++++++++++++
 .../mysql/source/assigners/MySqlChunkSplitter.java |   6 +-
 .../source/assigners/MySqlChunkSplitterTest.java   |  56 ++++++++++++
 4 files changed, 167 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 545db56f8..8bbc93588 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
 
 import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
@@ -507,7 +508,8 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
         return splits;
     }
 
-    private Object nextChunkEnd(
+    @VisibleForTesting
+    Object nextChunkEnd(
             JdbcConnection jdbc,
             Object previousChunkEnd,
             TableId tableId,
@@ -518,6 +520,9 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
         // chunk end might be null when max values are removed
         Object chunkEnd =
                 queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
+        if (chunkEnd == null) {
+            return null;
+        }
         if (Objects.equals(previousChunkEnd, chunkEnd)) {
             // we don't allow equal chunk start and end,
             // should query the next one larger than chunkEnd
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitterTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitterTest.java
new file mode 100644
index 000000000..a52edc636
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
+
+import org.apache.flink.table.api.DataTypes;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+
+/** Tests for {@link JdbcSourceChunkSplitter}. */
+class JdbcSourceChunkSplitterTest {
+
+    /**
+     * Guards against regressions where {@code queryNextChunkMax} may return 
{@code null} (e.g. when
+     * the max row was removed after MIN/MAX was determined). {@code 
nextChunkEnd} must handle this
+     * gracefully and return null without throwing exceptions.
+     */
+    @Test
+    void testNextChunkEndReturnsNullWhenMaxRowRemoved() throws Exception {
+        // given a splitter whose queryNextChunkMax always returns null
+        JdbcSourceChunkSplitter splitter = new 
TestingJdbcSourceChunkSplitter(null);
+
+        TableId tableId = TableId.parse("catalog.db.table");
+        Column splitColumn =
+                
Column.editor().name("id").type("INT").jdbcType(java.sql.Types.INTEGER).create();
+
+        Object previousChunkEnd = 10;
+        Object max = 100;
+        int chunkSize = 5;
+
+        // when queryNextChunkMax returns null, nextChunkEnd should also 
return null
+        Object result =
+                splitter.nextChunkEnd(null, previousChunkEnd, tableId, 
splitColumn, max, chunkSize);
+
+        Assertions.assertThat(result).isNull();
+    }
+
+    /** Minimal testing implementation that stubs out JDBC interactions. */
+    private static class TestingJdbcSourceChunkSplitter extends 
JdbcSourceChunkSplitter {
+
+        @Nullable private final Object nextChunkMaxResult;
+
+        TestingJdbcSourceChunkSplitter(@Nullable Object nextChunkMaxResult) {
+            super(null, null, null, null, null);
+            this.nextChunkMaxResult = nextChunkMaxResult;
+        }
+
+        @Override
+        protected Object queryNextChunkMax(
+                JdbcConnection jdbc,
+                TableId tableId,
+                Column splitColumn,
+                int chunkSize,
+                Object includedLowerBound)
+                throws SQLException {
+            return nextChunkMaxResult;
+        }
+
+        @Override
+        protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId 
tableId)
+                throws SQLException {
+            return 0L;
+        }
+
+        @Override
+        protected Object queryMin(
+                JdbcConnection jdbc, TableId tableId, Column splitColumn, 
Object excludedLowerBound)
+                throws SQLException {
+            return null;
+        }
+
+        @Override
+        protected org.apache.flink.table.types.DataType fromDbzColumn(Column 
splitColumn) {
+            // The concrete type is irrelevant for this test; just return a 
simple numeric type.
+            return DataTypes.BIGINT();
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
index 4821eaba2..b7efb9ab8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
@@ -322,7 +322,8 @@ public class MySqlChunkSplitter implements ChunkSplitter {
         return splits;
     }
 
-    private Object nextChunkEnd(
+    @VisibleForTesting
+    Object nextChunkEnd(
             JdbcConnection jdbc,
             Object previousChunkEnd,
             TableId tableId,
@@ -334,6 +335,9 @@ public class MySqlChunkSplitter implements ChunkSplitter {
         Object chunkEnd =
                 StatementUtils.queryNextChunkMax(
                         jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
+        if (chunkEnd == null) {
+            return null;
+        }
         if (Objects.equals(previousChunkEnd, chunkEnd)) {
             // we don't allow equal chunk start and end,
             // should query the next one larger than chunkEnd
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
index 33f50333a..0e4f369bb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
@@ -21,11 +21,16 @@ import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 
+import io.debezium.config.Configuration;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.sql.SQLException;
 import java.time.ZoneId;
+import java.util.Collections;
 import java.util.List;
 
 /** Tests for {@link 
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter}. */
@@ -85,4 +90,55 @@ class MySqlChunkSplitterTest {
                         ChunkRange.of(2147483637, 2147483647),
                         ChunkRange.of(2147483647, null));
     }
+
+    @Test
+    void testNextChunkEndReturnsNullWhenMaxRowRemoved() throws Exception {
+        // given a MySqlChunkSplitter instance
+        MySqlSourceConfig sourceConfig =
+                new MySqlSourceConfigFactory()
+                        .startupOptions(StartupOptions.initial())
+                        .databaseList("")
+                        .tableList("")
+                        .hostname("")
+                        .username("")
+                        .password("")
+                        .serverTimeZone(ZoneId.of("UTC").toString())
+                        .assignUnboundedChunkFirst(false)
+                        .createConfig(0);
+        MySqlChunkSplitter splitter = new MySqlChunkSplitter(null, 
sourceConfig);
+
+        // and a JdbcConnection whose prepareQueryAndMap always returns null,
+        // so that StatementUtils.queryNextChunkMax(... ) returns null
+        JdbcConfiguration jdbcConfiguration =
+                
JdbcConfiguration.adapt(Configuration.from(Collections.emptyMap()));
+        JdbcConnection jdbc =
+                new JdbcConnection(
+                        jdbcConfiguration,
+                        config -> {
+                            throw new SQLException("Connection not used in 
test");
+                        },
+                        "`",
+                        "`") {
+                    @Override
+                    public <T> T prepareQueryAndMap(
+                            String query,
+                            StatementPreparer statementPreparer,
+                            ResultSetMapper<T> mapper)
+                            throws SQLException {
+                        return null;
+                    }
+                };
+
+        TableId tableId = new TableId("catalog", "db", "tab");
+        Object previousChunkEnd = 10;
+        Object max = 100;
+        int chunkSize = 5;
+
+        Object result =
+                splitter.nextChunkEnd(jdbc, previousChunkEnd, tableId, "id", 
max, chunkSize);
+
+        // when queryNextChunkMax returns null, nextChunkEnd should also 
return null
+        // instead of propagating the null further and causing errors
+        Assertions.assertThat(result).isNull();
+    }
 }

Reply via email to