[ https://issues.apache.org/jira/browse/FLINK-8492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336105#comment-16336105 ]
Fabian Hueske commented on FLINK-8492: -------------------------------------- OK, I think I found a simple fix. The problem is in fact a cost modeling issue. We do not penalize having multiple calcs due to the cost function in {{CommonCalc.computeSelfCost()}}. If we add a constant {{1}} to {{compCnt}}, the calcs will be merged because this reduces the cost. {code} private[flink] def computeSelfCost( calcProgram: RexProgram, planner: RelOptPlanner, rowCnt: Double): RelOptCost = { // compute number of expressions that do not access a field or literal, i.e. computations, // conditions, etc. We only want to account for computations, not for simple projections. // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule // in normalization stage. So we should ignore CASTs here in optimization stage. val compCnt = calcProgram.getExprList.asScala.toList.count { case _: RexInputRef => false case _: RexLiteral => false case c: RexCall if c.getOperator.getName.equals("CAST") => false case _ => true } + 1 // <------ THIS IS THE FIX val newRowCnt = estimateRowCount(calcProgram, rowCnt) planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0) } {code} > Fix unsupported exception for udtf with multi calc > -------------------------------------------------- > > Key: FLINK-8492 > URL: https://issues.apache.org/jira/browse/FLINK-8492 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Hequn Cheng > Assignee: Hequn Cheng > Priority: Major > > Considering the following test, unsupported exception will be thrown due to > multi calc existing between correlate and TableFunctionScan. > {code:java} > // code placeholder > @Test > def testCrossJoinWithMultiFilter(): Unit = { > val t = testData(env).toTable(tEnv).as('a, 'b, 'c) > val func0 = new TableFunc0 > val result = t > .join(func0('c) as('d, 'e)) > .select('c, 'd, 'e) > .where('e > 10) > .where('e > 20) > .select('c, 'd) > .toAppendStream[Row] > result.addSink(new StreamITCase.StringSink[Row]) > env.execute() > val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44") > assertEquals(expected.sorted, StreamITCase.testResults.sorted) > } > {code} > I can see two options to fix this problem: > # Adapt calcite OptRule to merge the continuous calc. > # Merge multi calc in correlate convert rule. > I prefer the second one, not only it is easy to implement but also i think > with or without an optimize rule should not influence flink functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)