This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7ce5a7c6e1e [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source 7ce5a7c6e1e is described below commit 7ce5a7c6e1eab6823094a94bc0bca30d0ee618f1 Author: Juntao Hu <maybach...@gmail.com> AuthorDate: Thu Apr 21 19:38:01 2022 +0800 [FLINK-22984][python] Don't pushdown Calc containing Python UDFs into table source This closes #19551. --- ...WatermarkIntoTableSourceScanAcrossCalcRule.java | 7 ++++++- .../PushWatermarkIntoTableSourceScanRuleTest.java | 22 ++++++++++++++++++++++ .../utils/JavaUserDefinedScalarFunctions.java | 21 +++++++++++++++++++++ .../PushWatermarkIntoTableSourceScanRuleTest.xml | 21 +++++++++++++++++++++ 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java index 18539f08e20..d7311cc9ab1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java @@ -37,6 +37,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall; + /** * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the * {@link FlinkLogicalTableSourceScan}. The rule will first look for the computed column in the @@ -62,7 +64,10 @@ public class PushWatermarkIntoTableSourceScanAcrossCalcRule @Override public boolean matches(RelOptRuleCall call) { FlinkLogicalTableSourceScan scan = call.rel(2); - return supportsWatermarkPushDown(scan); + FlinkLogicalCalc calc = call.rel(1); + return supportsWatermarkPushDown(scan) + && calc.getProgram().getExprList().stream() + .noneMatch(rexNode -> containsPythonCall(rexNode, null)); } @Override diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java index cdef53073e2..1c3ead462be 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgr import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder; import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5; import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; @@ -253,4 +254,25 @@ public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase { util.tableEnv().executeSql(ddl); util.verifyRelPlan("select a, c from MyTable"); } + + @Test + public void testWatermarkWithPythonFunctionInComputedColumn() { + util.tableEnv() + .createTemporaryFunction( + "parse_ts", + new JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction()); + String ddl = + "CREATE TABLE MyTable(" + + " a INT,\n" + + " b AS parse_ts(a),\n" + + " WATERMARK FOR b AS b\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'enable-watermark-push-down' = 'true',\n" + + " 'bounded' = 'false',\n" + + " 'disable-lookup' = 'true'" + + ")"; + util.tableEnv().executeSql(ddl); + util.verifyRelPlan("SELECT * FROM MyTable"); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java index 7403e32d473..73c52d16e4d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java @@ -29,6 +29,7 @@ import org.apache.flink.table.functions.python.PythonFunctionKind; import org.apache.flink.types.Row; import java.sql.Timestamp; +import java.time.LocalDateTime; import java.util.Random; import java.util.TimeZone; @@ -293,4 +294,24 @@ public class JavaUserDefinedScalarFunctions { return PythonFunctionKind.PANDAS; } } + + /** A Python UDF that returns current timestamp with any input. */ + public static class PythonTimestampScalarFunction extends ScalarFunction + implements PythonFunction { + + @DataTypeHint("TIMESTAMP(3)") + public LocalDateTime eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... o) { + return LocalDateTime.now(); + } + + @Override + public byte[] getSerializedPythonFunction() { + return new byte[0]; + } + + @Override + public PythonEnv getPythonEnv() { + return null; + } + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml index ae25b30e846..88816818254 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.xml @@ -191,6 +191,27 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3]) FlinkLogicalCalc(select=[a, b, c, d]) +- FlinkLogicalCalc(select=[a, b, c, Reinterpret(func(c, a)) AS d]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[func(func(func(c, a), a), a)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testWatermarkWithPythonFunctionInComputedColumn"> + <Resource name="sql"> + <![CDATA[SELECT * FROM MyTable]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1]) ++- LogicalWatermarkAssigner(rowtime=[b], watermark=[$1]) + +- LogicalProject(a=[$0], b=[parse_ts($0)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +FlinkLogicalCalc(select=[a, b]) ++- FlinkLogicalWatermarkAssigner(rowtime=[b], watermark=[$1]) + +- FlinkLogicalCalc(select=[a, parse_ts(a) AS b]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a]) ]]> </Resource> </TestCase>