This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 34105c7 [FLINK-17135][python][tests] Fix the test testPandasFunctionMixedWithGeneralPythonFunction to make it more stable 34105c7 is described below commit 34105c708b518f1fc5cc83f62bf10143ff662d13 Author: Dian Fu <dia...@apache.org> AuthorDate: Thu Apr 16 16:51:10 2020 +0800 [FLINK-17135][python][tests] Fix the test testPandasFunctionMixedWithGeneralPythonFunction to make it more stable There are two caches in RelDataTypeFactoryImpl: KEY2TYPE_CACHE and DATATYPE_CACHE. KEY2TYPE_CACHE caches the mapping of Key(consists of field names and field types, etc) to RelDataType and can be used for the canonization of row types. DATATYPE_CACHE caches the RelDataType instances. PythonCalcSplitRule will split a Calc RelNode which contains both non-vectorized Python UDF and vectorized Python UDF into two Calc RelNodes. For the test case testPandasFunctionMixedWithGeneralPythonFunction, the output type of the bottom Calc consists of two fields (f0: INTEGER, f1: INTEGER), let's call it row_type_0. This row type is already available in the cache (generated by other test cases, held in variable KEY2TYPE_CACHE) and so it will hit the cache when constructing this row type for the bottle Calc. However, during debugging, I found that the INTEGER type referenced by row_type_0 is already cleaned up from the cache DATATYPE_CACHE. Then when constructing the RexProgram for the top Calc, it creates another INTEGER type and failure happens. To work around this problem, we adjust the test case a bit to make the output row type of the bottom Calc consisting of three fields instead of two fields to make the cache hit fail. --- .../table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml | 8 ++++---- .../planner/plan/rules/logical/PythonCalcSplitRuleTest.scala | 2 +- .../org/apache/flink/table/plan/PythonCalcSplitRuleTest.scala | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml index 4a25947..ed6e1a4 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml @@ -238,18 +238,18 @@ FlinkLogicalCalc(select=[pandasFunc1(a, b) AS EXPR$0]) </TestCase> <TestCase name="testPandasFunctionMixedWithGeneralPythonFunction"> <Resource name="sql"> - <![CDATA[SELECT pandasFunc1(a, b), pyFunc1(a, c) + 1 FROM MyTable]]> + <![CDATA[SELECT pandasFunc1(a, b), pyFunc1(a, c) + 1, a + 1 FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(EXPR$0=[pandasFunc1($0, $1)], EXPR$1=[+(pyFunc1($0, $2), 1)]) +LogicalProject(EXPR$0=[pandasFunc1($0, $1)], EXPR$1=[+(pyFunc1($0, $2), 1)], EXPR$2=[+($0, 1)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[f0 AS EXPR$0, +(f1, 1) AS EXPR$1]) -+- FlinkLogicalCalc(select=[f0, pyFunc1(a, c) AS f1]) +FlinkLogicalCalc(select=[f0 AS EXPR$0, +(f1, 1) AS EXPR$1, +(a, 1) AS EXPR$2]) ++- FlinkLogicalCalc(select=[a, f0, pyFunc1(a, c) AS f1]) +- FlinkLogicalCalc(select=[a, c, pandasFunc1(a, b) AS f0]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala index cf027b0..260c9b9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala @@ -169,7 +169,7 @@ class PythonCalcSplitRuleTest extends TableTestBase { @Test def testPandasFunctionMixedWithGeneralPythonFunction(): Unit = { - val sqlQuery = "SELECT pandasFunc1(a, b), pyFunc1(a, c) + 1 FROM MyTable" + val sqlQuery = "SELECT pandasFunc1(a, b), pyFunc1(a, c) + 1, a + 1 FROM MyTable" util.verifyPlan(sqlQuery) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/PythonCalcSplitRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/PythonCalcSplitRuleTest.scala index 953f67f..68a93a2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/PythonCalcSplitRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/PythonCalcSplitRuleTest.scala @@ -440,7 +440,7 @@ class PythonCalcSplitRuleTest extends TableTestBase { util.tableEnv.registerFunction("pyFunc1", new PythonScalarFunction("pyFunc1")) util.tableEnv.registerFunction("pandasFunc1", new PandasScalarFunction("pandasFunc1")) - val resultTable = table.select("pandasFunc1(a, b), pyFunc1(a, c) + 1") + val resultTable = table.select("pandasFunc1(a, b), pyFunc1(a, c) + 1, a + 1") val expected = unaryNode( "DataStreamCalc", @@ -451,9 +451,9 @@ class PythonCalcSplitRuleTest extends TableTestBase { streamTableNode(table), term("select", "a", "c", "pandasFunc1(a, b) AS f0") ), - term("select", "f0", "pyFunc1(a, c) AS f1") + term("select", "a", "f0", "pyFunc1(a, c) AS f1") ), - term("select", "f0 AS _c0", "+(f1, 1) AS _c1") + term("select", "f0 AS _c0", "+(f1, 1) AS _c1", "+(a, 1) AS _c2") ) util.verifyTable(resultTable, expected)