This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new b408850714c [FLINK-38640][table-planner] Fix NPE in 
DeltaJoinUtil#isFilterOnOneSetOfUpsertKeys (#27204) (#27215)
b408850714c is described below

commit b408850714c607fa9ca6edf1d0469168eaa9a4b8
Author: Xuyang <[email protected]>
AuthorDate: Mon Nov 10 19:28:53 2025 +0800

    [FLINK-38640][table-planner] Fix NPE in 
DeltaJoinUtil#isFilterOnOneSetOfUpsertKeys (#27204) (#27215)
---
 .../table/planner/plan/utils/DeltaJoinUtil.java    |  7 +-
 .../planner/plan/utils/DeltaJoinUtilTest.java      | 83 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
index e96d5be5324..d433ba5ed9d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.catalog.Index;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -335,12 +336,14 @@ public class DeltaJoinUtil {
         return isFilterOnOneSetOfUpsertKeys(nonEquiCond.get(), upsertKeys);
     }
 
-    private static boolean isFilterOnOneSetOfUpsertKeys(
+    @VisibleForTesting
+    protected static boolean isFilterOnOneSetOfUpsertKeys(
             RexNode filter, @Nullable Set<ImmutableBitSet> upsertKeys) {
         ImmutableBitSet fieldRefIndices =
                 ImmutableBitSet.of(
                         
RexNodeExtractor.extractRefInputFields(Collections.singletonList(filter)));
-        return upsertKeys.stream().anyMatch(uk -> 
uk.contains(fieldRefIndices));
+        return upsertKeys != null
+                && upsertKeys.stream().anyMatch(uk -> 
uk.contains(fieldRefIndices));
     }
 
     private static boolean areAllJoinTableScansSupported(StreamPhysicalJoin 
join) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtilTest.java
new file mode 100644
index 00000000000..9a5d8fde202
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtilTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.isFilterOnOneSetOfUpsertKeys;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeltaJoinUtil}. */
+class DeltaJoinUtilTest {
+
+    @Test
+    void testIsFilterOnOneSetOfUpsertKeys() {
+        FlinkTypeFactory typeFactory =
+                new FlinkTypeFactory(
+                        Thread.currentThread().getContextClassLoader(), 
FlinkTypeSystem.INSTANCE);
+        // input schema:
+        // a string,
+        // b bigint,
+        // c bigint
+        List<RelDataType> allFieldTypes =
+                Stream.of(DataTypes.VARCHAR(100), DataTypes.BIGINT(), 
DataTypes.BIGINT())
+                        .map(TypeConversions::fromDataToLogicalType)
+                        .map(typeFactory::createFieldTypeFromLogicalType)
+                        .collect(Collectors.toList());
+
+        RexBuilder rexBuilder = new RexBuilder(typeFactory);
+
+        // a = 'jim'
+        RexNode filter =
+                rexBuilder.makeCall(
+                        SqlStdOperatorTable.EQUALS,
+                        rexBuilder.makeInputRef(allFieldTypes.get(0), 0),
+                        rexBuilder.makeLiteral("jim", allFieldTypes.get(0)));
+
+        assertThat(isFilterOnOneSetOfUpsertKeys(filter, 
Set.of(ImmutableBitSet.of(0)))).isTrue();
+        assertThat(isFilterOnOneSetOfUpsertKeys(filter, 
Set.of(ImmutableBitSet.of(2)))).isFalse();
+        assertThat(isFilterOnOneSetOfUpsertKeys(filter, 
Set.of(ImmutableBitSet.of(0, 1)))).isTrue();
+        assertThat(isFilterOnOneSetOfUpsertKeys(filter, 
Set.of(ImmutableBitSet.of(1, 2))))
+                .isFalse();
+        assertThat(
+                        isFilterOnOneSetOfUpsertKeys(
+                                filter, Set.of(ImmutableBitSet.of(1), 
ImmutableBitSet.of(2))))
+                .isFalse();
+        assertThat(
+                        isFilterOnOneSetOfUpsertKeys(
+                                filter, Set.of(ImmutableBitSet.of(1), 
ImmutableBitSet.of(0))))
+                .isTrue();
+        assertThat(isFilterOnOneSetOfUpsertKeys(filter, null)).isFalse();
+    }
+}

Reply via email to