This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git
The following commit(s) were added to refs/heads/asf-site by this push: new 37921e09e Rebuild website 37921e09e is described below commit 37921e09e51af2c853eab6fc833d1b590d2aa4dc Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Wed May 10 11:04:13 2023 +0200 Rebuild website --- .../index.html | 28 +++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/content/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/index.html b/content/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/index.html index b33895e44..6295d8c01 100644 --- a/content/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/index.html +++ b/content/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/index.html @@ -986,8 +986,8 @@ operations.</p> <a class="anchor" href="#setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96">#</a> </h3> <p>We start by moving -from <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/ExecutionEnvironment.html">ExecutionEnvironment</a> -to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html">StreamExecutionEnvironment</a> +from <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/ExecutionEnvironment.html">ExecutionEnvironment</a> +to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html">StreamExecutionEnvironment</a> . Then, as the source in this pipeline is bounded, we can use either the default streaming <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode//">execution mode</a> or the batch mode. In batch mode the tasks of the job can be separated into stages that can be @@ -999,12 +999,12 @@ allow to run the same pipeline with no change on an unbounded source.</p> Using the streaming sources and datasets <a class="anchor" href="#using-the-streaming-sources-and-datasets">#</a> </h3> -<p><strong>Sources</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/operators/DataSource.html">DataSource<T></a> -becomes <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html">DataStreamSource<T></a> +<p><strong>Sources</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/operators/DataSource.html">DataSource<T></a> +becomes <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html">DataStreamSource<T></a> after the call to <em>env.createInput()</em>.</p> -<p><strong>Datasets</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html">DataSet<T></a> +<p><strong>Datasets</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html">DataSet<T></a> are -now <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStream.html">DataStream<T></a> +now <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html">DataStream<T></a> and subclasses.</p> <h3 id="migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135"> <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135">Migrating the join operation</a> @@ -1013,11 +1013,11 @@ and subclasses.</p> <p>The DataStream join operator does not yet support aggregations in batch mode ( see <a href="https://issues.apache.org/jira/browse/FLINK-22587">FLINK-22587</a> for details). Basically, the problem is with the trigger of the -default <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html">GlobalWindow</a> +default <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html">GlobalWindow</a> which never fires so the records are never output. We will workaround this problem by applying a custom <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L246-L280">EndOfStream</a> window. It is a window assigner that assigns all the records to a -single <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html">TimeWindow</a> +single <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html">TimeWindow</a> . So, like for the GlobalWindow, all the records are assigned to the same window except that this window’s trigger is based on the end of the window (which is set to <em>Long.MAX_VALUE</em>). As we are on a bounded source, at some point the watermark will advance to +INFINITY (Long.MAX_VALUE) and will @@ -1029,11 +1029,11 @@ function.</p> <a class="anchor" href="#migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169">#</a> </h3> <p>DataStream API has no -more <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-">groupBy()</a> +more <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-">groupBy()</a> method, we now use -the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-">keyBy()</a> +the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-">keyBy()</a> method. An aggregation downstream will be applied on elements with the same key exactly as -a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html">GroupReduceFunction</a> +a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html">GroupReduceFunction</a> would have done on a DataSet except it will not need to materialize the collection of data. Indeed, the following operator is a reducer: the summing operation downstream is still done through a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/ReduceFunction.html">ReduceFunction</a> @@ -1047,7 +1047,7 @@ Also, please note that, as in the join case, we need to specify windowing for th <a class="anchor" href="#migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211">#</a> </h3> <p>The sort of the datastream is done by applying -a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html">KeyedProcessFunction</a> +a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html">KeyedProcessFunction</a> .</p> <p>But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. As such we need to “divide” the data to have output times. For that we @@ -1056,7 +1056,7 @@ meaning that the timer will fire at the end of the batch.</p> <p>To sort the data, we store the incoming rows inside a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/state/ListState.html">ListState</a> and sort them at output time, when the timer fires in -the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html#onTimer-long-org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext-org.apache.flink.util.Collector-">onTimer()</a> +the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html#onTimer-long-org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext-org.apache.flink.util.Collector-">onTimer()</a> callback.</p> <p>Another thing: to be able to use Flink state, we need to key the datastream beforehand, even if there @@ -1081,7 +1081,7 @@ an <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/ . But the resulting code is very similar to the one using the DataSet API. It’s only that <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html#encode-IN-java.io.OutputStream-">Encoder#encode()</a> method writes bytes -when <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/TextOutputFormat.TextFormatter.html#format-IN-">TextOutputFormat.TextFormatter#format()</a> +when <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/io/TextOutputFormat.TextFormatter.html#format-IN-">TextOutputFormat.TextFormatter#format()</a> wrote Strings.</p> <h2 id="conclusion"> Conclusion