This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ce5a7c6e1e [FLINK-22984][python] Don't pushdown Calc containing 
Python UDFs into table source
7ce5a7c6e1e is described below

commit 7ce5a7c6e1eab6823094a94bc0bca30d0ee618f1
Author: Juntao Hu <maybach...@gmail.com>
AuthorDate: Thu Apr 21 19:38:01 2022 +0800

    [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table 
source
    
    This closes #19551.
---
 ...WatermarkIntoTableSourceScanAcrossCalcRule.java |  7 ++++++-
 .../PushWatermarkIntoTableSourceScanRuleTest.java  | 22 ++++++++++++++++++++++
 .../utils/JavaUserDefinedScalarFunctions.java      | 21 +++++++++++++++++++++
 .../PushWatermarkIntoTableSourceScanRuleTest.xml   | 21 +++++++++++++++++++++
 4 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
index 18539f08e20..d7311cc9ab1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
@@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall;
+
 /**
  * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the
  * {@link FlinkLogicalTableSourceScan}. The rule will first look for the 
computed column in the
@@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule
     @Override
     public boolean matches(RelOptRuleCall call) {
         FlinkLogicalTableSourceScan scan = call.rel(2);
-        return supportsWatermarkPushDown(scan);
+        FlinkLogicalCalc calc = call.rel(1);
+        return supportsWatermarkPushDown(scan)
+                && calc.getProgram().getExprList().stream()
+                        .noneMatch(rexNode -> containsPythonCall(rexNode, 
null));
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
index cdef53073e2..1c3ead462be 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
 import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
 import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -253,4 +254,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest 
extends TableTestBase {
         util.tableEnv().executeSql(ddl);
         util.verifyRelPlan("select a, c from MyTable");
     }
+
+    @Test
+    public void testWatermarkWithPythonFunctionInComputedColumn() {
+        util.tableEnv()
+                .createTemporaryFunction(
+                        "parse_ts",
+                        new 
JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction());
+        String ddl =
+                "CREATE TABLE MyTable("
+                        + "  a INT,\n"
+                        + "  b AS parse_ts(a),\n"
+                        + "  WATERMARK FOR b AS b\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'enable-watermark-push-down' = 'true',\n"
+                        + " 'bounded' = 'false',\n"
+                        + " 'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        util.verifyRelPlan("SELECT * FROM MyTable");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index 7403e32d473..73c52d16e4d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.functions.python.PythonFunctionKind;
 import org.apache.flink.types.Row;
 
 import java.sql.Timestamp;
+import java.time.LocalDateTime;
 import java.util.Random;
 import java.util.TimeZone;
 
@@ -293,4 +294,24 @@ public class JavaUserDefinedScalarFunctions {
             return PythonFunctionKind.PANDAS;
         }
     }
+
+    /** A Python UDF that returns current timestamp with any input. */
+    public static class PythonTimestampScalarFunction extends ScalarFunction
+            implements PythonFunction {
+
+        @DataTypeHint("TIMESTAMP(3)")
+        public LocalDateTime eval(@DataTypeHint(inputGroup = InputGroup.ANY) 
Object... o) {
+            return LocalDateTime.now();
+        }
+
+        @Override
+        public byte[] getSerializedPythonFunction() {
+            return new byte[0];
+        }
+
+        @Override
+        public PythonEnv getPythonEnv() {
+            return null;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
index ae25b30e846..88816818254 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml
@@ -191,6 +191,27 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 FlinkLogicalCalc(select=[a, b, c, d])
 +- FlinkLogicalCalc(select=[a, b, c, Reinterpret(func(c, a)) AS d])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable, watermark=[func(func(func(c, a), a), a)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWatermarkWithPythonFunctionInComputedColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalWatermarkAssigner(rowtime=[b], watermark=[$1])
+   +- LogicalProject(a=[$0], b=[parse_ts($0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, b])
++- FlinkLogicalWatermarkAssigner(rowtime=[b], watermark=[$1])
+   +- FlinkLogicalCalc(select=[a, parse_ts(a) AS b])
+      +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a])
 ]]>
     </Resource>
   </TestCase>

Reply via email to