Re:咨询Flink 1.19文档中关于iterate操作

2024-05-20 文章 Xuyang
Hi, 

目前Iterate api在1.19版本上废弃了,不再支持,具体可以参考[1][2]。Flip[1]中提供了另一种替代的办法[3]




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream

[2] https://issues.apache.org/jira/browse/FLINK-33144

[3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300




--

Best!
Xuyang





在 2024-05-20 22:39:37,""  写道:
>尊敬的Flink开发团队:
>
>您好!
>
>我目前正在学习如何使用Apache Flink的DataStream API实现迭代算法,例如图的单源最短路径。在Flink 
>1.18版本的文档中,我注意到有关于iterate操作的介绍,具体请见:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#iterations
>
>但是,我发现Flink 
>1.19版本的文档中不再提及iterate操作。这让我有些困惑。不知道在最新版本中,这是否意味着iterate操作不再被支持?如果是这样的话,请问我该如何在数据流上进行迭代计算?
>
>非常感谢您的时间和帮助,期待您的回复。
>
>谢谢!
>
>李智诚


咨询Flink 1.19文档中关于iterate操作

2024-05-20 文章 www
尊敬的Flink开发团队:

您好!

我目前正在学习如何使用Apache Flink的DataStream API实现迭代算法,例如图的单源最短路径。在Flink 
1.18版本的文档中,我注意到有关于iterate操作的介绍,具体请见:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#iterations

但是,我发现Flink 
1.19版本的文档中不再提及iterate操作。这让我有些困惑。不知道在最新版本中,这是否意味着iterate操作不再被支持?如果是这样的话,请问我该如何在数据流上进行迭代计算?

非常感谢您的时间和帮助,期待您的回复。

谢谢!

李智诚

Re: flinksql 经过优化后,group by字段少了

2024-05-20 文章 Lincoln Lee
Hi,

可以尝试下 1.17 或更新的版本, 这个问题在 flink 1.17.0 中已修复[1]。
批处理中做这个 remove 优化是符合语义的,而在流中不能直接裁剪,
对于相关时间函数的说明文档[2]中也进行了更新

[1] https://issues.apache.org/jira/browse/FLINK-30006
[2]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e5%86%85%e7%bd%ae%e5%87%bd%e6%95%b0%e7%9a%84%e7%a1%ae%e5%ae%9a%e6%80%a7


Best,
Lincoln Lee


℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 22:07写道:

> 当前用的是 flink 1.16 版本,这个issue虽然合并到了 calcite-1.22.0 中,但是在之后一段时间内,又被新的 pr (
> https://github.com/apache/calcite/pull/1735/files)合并了。
> 所以,当前flink中是仍然存在这个问题。
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org>;
> 发送时间: 2024年5月20日(星期一) 中午12:51
> 收件人: "user-zh"
> 主题: Re: flinksql 经过优化后,group by字段少了
>
>
>
> 你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
> 版本开始就已经用的是这个 calcite 版本了。
>
> 所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
> issue 来报一个 bug。
>
> PS:
> 上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。
>
> [1] https://issues.apache.org/jira/browse/CALCITE-3531
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 11:06写道:
> >
> > 您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite
> 中修复了,https://github.com/apache/calcite/pull/1602/files
> > >; 但是,flink 中引用的
> calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
> >
> >
> >
> >
> > -- 原始邮件 --
> >
> 发件人:   
> "user-zh"   
>  > 发送时间: 2024年5月20日(星期一) 上午10:32
> > 收件人: "user-zh" >
> > 主题: Re: flinksql 经过优化后,group by字段少了
> >
> >
> >
> > 看起来像是因为 "dt = cast(CURRENT_DATE  as string)" 推导 dt
> 这个字段是个常量,进而被优化掉了。
> >
> > 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
> >
> > ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月19日周日 01:01写道:
> > >
> > > create view tmp_view as
> > > SELECT
> > >     dt, -- 2
> > >     uid, -- 0
> > >     uname, -- 1
> > >     uage -- 3
> > > from
> > >     kafkaTable
> > > where dt = cast(CURRENT_DATE  as string);
> > >
> > > insert into printSinkTable
> > > select
> > >     dt, uid, uname,
> sum(uage)
> > > from tmp_view
> > > group by
> > >     dt,
> > >     uid,
> > >     uname;
> > >
> > >
> > >
> > > sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname
> 三个字段进行聚合求和操作。
> > > 但是,经过优化后,生成的 物理结构如下:
> > > == Optimized Execution Plan ==
> > >
> Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt,
> uid, uname, EXPR$3])
> > > +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid,
> uname, EXPR$3])
> > > &nbsp; &nbsp;+- GroupAggregate(groupBy=[uid,
> uname], select=[uid, uname, SUM(uage) AS EXPR$3])
> > > &nbsp; &nbsp; &nbsp; +-
> Exchange(distribution=[hash[uid, uname]])
> > > &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp;+- Calc(select=[uid, uname, uage], where=[(dt =
> CAST(CURRENT_DATE()))])
> > > &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; +- TableSourceScan(table=[[default_catalog,
> default_database, kafkaTable]], fields=[uid, uname, dt, uage])
> > >
> > >
> > >
> > > 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li