Hi Etienne,

Nice blog! Thanks for sharing!

Best regards,
Jing


On Wed, Nov 9, 2022 at 5:49 PM Etienne Chauchot <echauc...@apache.org>
wrote:

> Hi Yun Gao,
>
> FYI I just updated the article after your review:
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
> Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>
> Hi Etienne,
>
> Very thanks for the article! Flink is currently indeed keeping increasing
> the
> ability of unified batch / stream processing with the same api, and its a
> great
> pleasure that more and more users are trying this functionality. But I also
> have some questions regarding some details.
>
> First IMO, as a whole for the long run Flink will have two unified APIs,
> namely Table / SQL
> API and DataStream API. Users could express the computation logic with
> these two APIs
> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and jiras.
> And I also think IMHO that reducing the number of APIs to 2 was the good
> move.
>
>
> Underlying Flink provides two
> execution modes:  the streaming mode works with both bounded and unbounded
> data,
> and it executes in a way of incremental processing based on state; the
> batch mode works
> only with bounded data, and it executes in a ways level-by-level similar
> to the traditional
> batch processing frameworks. Users could switch the execution mode via
> EnvironmentSettings.inBatchMode() for
> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I though
> it would be more efficient on my bounded pipeline but as a matter of fact
> the streaming mode seems to be more efficient on my use case. I'll test
> with higher volumes to confirm.
>
>
>
> Specially for DataStream, as implemented in FLIP-140, currently all the
> existing DataStream
> operation supports the batch execution mode in a unified way[1]:  data
> will be sorted for the
> keyBy() edges according to the key, then the following operations like
> reduce() could receive
> all the data belonging to the same key consecutively, then it could
> directly reducing the records
> of the same key without maintaining the intermediate states. In this way
> users could write the
> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream pipeline
> will work with no modification if I plug an unbounded source to it.
>
>
>
> # Regarding the migration of Join / Reduce
>
> First I think Reduce is always supported and users could write
> dataStream.keyBy().reduce(xx)
> directly, and  if batch  execution mode is set, the reduce will not be
> executed in a incremental way,
> instead is acts much  like sort-based  aggregation in the traditional
> batch processing framework.
>
> Regarding Join, although the issue of FLINK-22587 indeed exists: current
> join has to be bound
> to a window and the GlobalWindow does not work properly, but with some
> more try currently
> it does not need users to  re-write the whole join from scratch: Users
> could write a dedicated
> window assigner that assigns all the  records to the same window instance
> and return
> EventTimeTrigger.create() as the default event-time trigger [2]. Then it
> works
>
> source1.join(source2)
>                 .where(a -> a.f0)
>                 .equalTo(b -> b.f0)
>                 .window(new EndOfStreamWindows())
>                 .apply(xxxx);
>
> It does not requires records have event-time attached since the trigger of
> window is only
> relying on the time range of the window and the assignment does not need
> event-time either.
>
> The behavior of the join is also similar to sort-based join if batch mode
> is enabled.
>
> Of course it is not easy to use to let users do the workaround and we'll
> try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that I
> proposed. I tried it and it works perfectly with similar performance.
> Thanks.
>
>
> # Regarding support of Sort / Limit
>
> Currently these two operators are indeed not supported in the DataStream
> API directly. One initial
> though for these two operations are that users may convert the DataStream
> to Table API and use
> Table API for these two operators:
>
> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
> Table tableXX = tableEnv.fromDataStream(dataStream);
> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have a
> special SQL based implementation (for comparison reasons) so I did not want
> to mix SQL and DataStream APIs in the same pipeline.
>
>
> How do you think about this option? We are also assessing if the
> combination of DataStream
> API / Table API is sufficient for all the batch users. Any suggestions are
> warmly welcome.
>
>
> I guess that outside of my use case of comparing the performance of the 3
> Flink APIs (broader subject than this article), users can easily mix the
> APIs in the same pipeline. If we really want to have these operations in
> the DataStream API maybe wrapping state-based implementations could be good
> if their performance meets our expectations.
>
>
>
> Best,
> Yun Gao
>
> I'll update the article and the code with your suggestions. Thanks again.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>
>
> Best
>
> Etienne
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2]
> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>
>
>
> ------------------------------------------------------------------
> From:liu ron <ron9....@gmail.com> <ron9....@gmail.com>
> Send Time:2022 Nov. 8 (Tue.) 10:21
> To:dev <d...@flink.apache.org> <d...@flink.apache.org>; Etienne Chauchot
> <echauc...@apache.org> <echauc...@apache.org>; user
> <user@flink.apache.org> <user@flink.apache.org>
> Subject:Re: [blog article] Howto migrate a real-life batch pipeline from
> the DataSet API to the DataStream API
>
> Thanks for your post, It looks very good to me, also maybe for developers,
>
> Best,
> Liudalong
>
> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
> Wow, cool!  Thanks for your work.
> It'll be definitely helpful for the users that want to migrate their batch
> job from DataSet API to DataStream API.
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Etienne Chauchot" <echauc...@apache.org>
> 收件人: "dev" <d...@flink.apache.org>, "User" <user@flink.apache.org>
> 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
> 主题: [blog article] Howto migrate a real-life batch pipeline from the
> DataSet API to the DataStream API
>
> Hi everyone,
>
> In case some of you are interested, I just posted a blog article about
> migrating a real-life batch pipeline from the DataSet API to the
> DataStream API:
>
>
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
>
>
>

Reply via email to