This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b22bc62  [FLINK-21592][table-planner-blink] RemoveSingleAggregateRule 
fails due to nullability mismatch (#15082)
b22bc62 is described below

commit b22bc62ae59d3ccaef95507897c7725970e4e5c3
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Apr 14 17:51:43 2021 +0800

    [FLINK-21592][table-planner-blink] RemoveSingleAggregateRule fails due to 
nullability mismatch (#15082)
---
 .../apache/calcite/sql2rel/RelDecorrelator.java    | 22 ++++++----
 .../logical/RemoveSingleAggregateRuleTest.xml      | 50 ++++++++++++++++++++++
 .../logical/RemoveSingleAggregateRuleTest.scala    | 45 +++++++++++++++++++
 3 files changed, 109 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index 4181625..07893cc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -118,11 +118,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
-/**
- * Copied to fix CALCITE-4333, should be removed for the next Calcite upgrade.
- *
- * <p>Changes: Line 671 ~ Line 681, Line 430 ~ Line 441.
- */
+/** Copied to fix calcite issues. */
 public class RelDecorrelator implements ReflectiveVisitor {
     // ~ Static fields/initializers 
---------------------------------------------
 
@@ -439,6 +435,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
             return null;
         }
 
+        // BEGIN FLINK MODIFICATION
+        // Reason: to de-correlate sort rel when its parent is not a correlate
+        // Should be removed after CALCITE-4333 is fixed
         final RelNode newInput = frame.r;
 
         Mappings.TargetMapping mapping =
@@ -452,6 +451,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
         final int offset = rel.offset == null ? -1 : 
RexLiteral.intValue(rel.offset);
         final int fetch = rel.fetch == null ? -1 : 
RexLiteral.intValue(rel.fetch);
+        // END FLINK MODIFICATION
 
         final RelNode newSort =
                 relBuilder
@@ -685,6 +685,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     public Frame getInvoke(RelNode r, RelNode parent) {
         final Frame frame = dispatcher.invoke(r);
+        // BEGIN FLINK MODIFICATION
+        // Reason: to de-correlate sort rel when its parent is not a correlate
+        // Should be removed after CALCITE-4333 is fixed
         if (frame != null && parent instanceof Correlate && r instanceof Sort) 
{
             Sort sort = (Sort) r;
             // Can not decorrelate if the sort has per-correlate-key 
attributes like
@@ -696,6 +699,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
                 return null;
             }
         }
+        // END FLINK MODIFICATION
         if (frame != null) {
             map.put(r, frame);
         }
@@ -1869,13 +1873,15 @@ public class RelDecorrelator implements 
ReflectiveVisitor {
                 return;
             }
 
-            // singleAggRel produces a nullable type, so create the new
-            // projection that casts proj expr to a nullable type.
+            // BEGIN FLINK MODIFICATION
+            // Reason: fix the nullability mismatch issue
             final RelBuilder relBuilder = call.builder();
+            final boolean nullable = 
singleAggregate.getAggCallList().get(0).getType().isNullable();
             final RelDataType type =
                     relBuilder
                             .getTypeFactory()
-                            
.createTypeWithNullability(projExprs.get(0).getType(), true);
+                            
.createTypeWithNullability(projExprs.get(0).getType(), nullable);
+            // END FLINK MODIFICATION
             final RexNode cast = relBuilder.getRexBuilder().makeCast(type, 
projExprs.get(0));
             relBuilder.push(aggregate).project(cast);
             call.transformTo(relBuilder.build());
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.xml
new file mode 100644
index 0000000..05ccc23
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+       <TestCase name="testRemoveSingleAggregateRule">
+               <Resource name="sql">
+                       <![CDATA[select (select count(x)-1 from foo where 
foo.y=bar.i) from bar]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[$SCALAR_QUERY({
+LogicalProject(EXPR$0=[-($0, 1)])
+  LogicalAggregate(group=[{}], agg#0=[COUNT($0)])
+    LogicalProject(x=[$0])
+      LogicalFilter(condition=[=($1, $cor0.i)])
+        LogicalTableScan(table=[[default_catalog, default_database, foo, 
source: [TestTableSource(x, y)]]])
+})])
++- LogicalTableScan(table=[[default_catalog, default_database, bar, source: 
[TestTableSource(i, s)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Calc(select=[-(CASE(IS NULL($f1), 0:BIGINT, $f1), 1) AS EXPR$0])
++- HashJoin(joinType=[LeftOuterJoin], where=[=(i, y)], select=[i, y, $f1], 
build=[right])
+   :- Exchange(distribution=[hash[i]])
+   :  +- Calc(select=[i])
+   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
bar, source: [TestTableSource(i, s)]]], fields=[i, s])
+   +- HashAggregate(isMerge=[true], groupBy=[y], select=[y, 
Final_COUNT(count$0) AS $f1])
+      +- Exchange(distribution=[hash[y]])
+         +- LocalHashAggregate(groupBy=[y], select=[y, Partial_COUNT(x) AS 
count$0])
+            +- Calc(select=[x, y], where=[IS NOT NULL(y)])
+               +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, foo, source: [TestTableSource(x, y)]]], fields=[x, y])
+]]>
+               </Resource>
+       </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.scala
new file mode 100644
index 0000000..cdde94f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.{Before, Test}
+
+/**
+ * Test for RemoveSingleAggregateRule.
+ */
+class RemoveSingleAggregateRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Int, Int)]("foo", 'x, 'y)
+    util.addTableSource[(Int, String)]("bar", 'i, 's)
+  }
+
+  @Test
+  def testRemoveSingleAggregateRule(): Unit = {
+    util.verifyRelPlan("select (select count(x)-1 from foo where foo.y=bar.i) 
from bar")
+  }
+
+}

Reply via email to