Hi Xuyang,

Thank you for the information regarding the bug fix.

I will proceed with the method of joining input_table and udtf first. Thank
you for the suggestion.

Best regards, Norihiro Fuke.

2024年7月15日(月) 10:43 Xuyang <xyzhong...@163.com>:

> Hi, this is a bug fixed in
> https://github.com/apache/flink/pull/25075/files#diff-4ee2dd065d2b45fb64cacd5977bec6126396cc3b56e72addfe434701ac301efeL405
> <https://github.com/apache/flink/pull/25075>.
>
>
> You can try to join input_table and udtf first, and then use it as the
> input of window tvf to bypass this bug.
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2024-07-09 10:35:04, "Norihiro FUKE" <n.fuke.ou....@gmail.com> wrote:
>
> Hi, community
>
> I encountered a scala.matchError when trying to obtain the table plan for
> the following query in Flink 1.18.1.
>
> The input data is read from Kafka, and the query is intended to perform a
> typical WordCount operation. The query is as follows. SPLIT_STRING is a
> Table Function UDF that splits sentences into words by spaces.
>
> ```SELECT
>     window_start,
>     word,
>     COUNT(*) AS `count`
> FROM
>     TABLE(
>       TUMBLE(TABLE input_table, DESCRIPTOR(proctime), INTERVAL '10' SECOND)),
>       LATERAL TABLE(SPLIT_STRING(sentence)) AS T(word)
> GROUP BY
>     window_start,
>     window_end,
>     word```
>
> The error message received is:
>
> ```
> [ERR] scala.MatchError: rel#177237:FlinkLogicalCorrelate.LOGICAL.any.None: 
> 0.[NONE].[NONE](left=RelSubset#177235,right=RelSubset#177236,correlation=$cor0,joinType=inner,requiredColumns={1})
>  (of class 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate)
> ```
>
> I believe that the issue lies in the existNeighbourWindowTableFunc method
> in flink-table-planner/WindowUtil.scala, where there is an unconsidered
> node (FlinkLogicalCorrelate) when traversing the AST. (This method was
> added in FLINK-32578.) I suspect this comes from the LATERAL entry. While
> this query was FlinkLogicalCorrelate, I think there might be other
> unconsidered nodes as well.
>
> I have two questions regarding this:
>
>    1. Is it an expected behavior for scala.matchError to occur in this
>    case? In other words, I suspect this might be an unreported bug.
>    2. In the code comments of the PR mentioned in the FLINK-32578 ticket,
>    I found the terms "standard form" and "relax form." I searched for "relax
>    form" in the Flink documentation but could not find any reference. As a
>    workaround for this issue, using the WITH clause could be considered, but I
>    am uncertain if this is a universal solution.
>
> Thank you for your assistance.
>
>

Reply via email to