WencongLiu commented on code in PR #23362:
URL: https://github.com/apache/flink/pull/23362#discussion_r1369710115


##########
docs/content/docs/dev/datastream/how_to_migrate_from_dataset_to_datastream.md:
##########
@@ -0,0 +1,660 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+bookToc: false
+aliases:
+  - /dev/how_to_migrate_from_dataset_to_datastream.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How To Migrate From DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. DataSet operators can be implemented by the 
DataStream API. However, it's important to note that 
+different operators have varying costs in the implementation, and they can be 
categorized into three types:
+
+1. The first type of operators are quite similar to DataStream in terms of API 
usage. They can be easily implemented without much 
+complexity.
+2. The second type of operators, on the other hand, have completely different 
names and API usage in DataStream. This can make the 
+job code more complex.
+3. Lastly, the third type of operators not only have different names and API 
usage in DataStream, but they also involve additional 
+computation and shuffle costs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each type of DataSet operators using the DataStream API, highlighting the 
specific considerations and challenges associated with each type.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
moving from ExecutionEnvironment to StreamExecutionEnvironment.
+{{< tabs executionenv >}}
+{{< tab "DataSet">}}
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+As the source of DataSet is always bounded, the execution mode is suggested to 
be set to RuntimeMode.BATCH to allow Flink to apply
+additional optimizations for batch processing.
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Implement the DataSet API by DataStream
+
+### Same API Usage
+
+In the first type of operators, the usage of the API in DataStream is almost 
identical to that in DataSet. This means that 
+implementing these operators using the DataStream API is relatively 
straightforward and does not require significant modifications 
+or complexity in the code.
+
+#### Map
+
+{{< tabs mapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### FlatMap
+
+{{< tabs flatmapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Filter
+
+{{< tabs filterfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Union
+
+{{< tabs unionfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<String> input1 = // [...]
+DataSet<String> input2 = // [...]
+DataSet<String> output = input1.union(input2);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<String> input1 = // [...]
+DataStream<String> input2 = // [...]
+DataStream<String> output = input1.union(input2);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### Rebalance
+
+{{< tabs rebalancefunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<String> input = // [...]
+DataSet<String> output = input.rebalance();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<String> input = // [...]
+DataStream<String> output = input.rebalance();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Grouped DataSet
+
+{{< tabs reducegroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> input = // [...]
+DataSet<Tuple2<String, Integer>> output = input
+        .groupBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+            // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input = // [...]
+        DataStream<Tuple2<String, Integer>> output = input
+        .keyBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Aggregate on Grouped DataSet
+
+{{< tabs aggregategroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> input = // [...]
+DataSet<Tuple2<String, Integer>> output = input
+        .groupBy(value -> value.f0)
+        // compute sum of the second field
+        // .aggregate(SUM, 1);
+        // compute min of the second field
+        // .aggregate(MIN, 1);
+        // compute max of the second field
+        // .aggregate(MAX, 1);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input = // [...]
+DataStream<Tuple2<String, Integer>> output = input
+        .keyBy(value -> value.f0)
+        // compute sum of the second field
+        // .sum(1);
+        // compute min of the second field
+        // .min(1);
+        // compute max of the second field
+        // .max(1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+### Different API Usage
+
+In the second type of operators there are differences in both naming 
conventions and API usage between DataStream and DataSet. 
+The developers need to adapt their code to accommodate these variations, which 
introduces additional complexity.
+
+#### Project
+
+{{< tabs projectfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple3<Integer, Double, String>> input = // [...]
+DataSet<Tuple2<String, Integer>> out = input.project(2,0);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple3<Integer, Double, String>> input = // [...]
+DataStream<Tuple2<String, Integer>> out = input.map(value -> 
Tuple2.of(value.f2, value.f0));
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Distinct
+
+{{< tabs distinctfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Integer> input = // [...]
+DataSet<Integer> output = input.distinct();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Integer> input = // [...]
+DataStream<Integer> output = input
+        .keyBy(value -> value)
+        .reduce((value1, value2) -> value1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Hash-Partition
+
+{{< tabs hashpartitionfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> input = // [...]
+DataSet<Tuple2<String, Integer>> output = input.partitionByHash(value -> 
value.f0);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input = // [...]
+DataStream<Tuple2<String, Integer>> output =
+        input.partitionCustom(
+        (key, numSubpartition) ->
+        // partition by the hashcode of key
+        key.hashCode() % numSubpartition, value -> value.f0);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Full DataSet
+
+{{< tabs reducefullfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<String> input = // [...]
+DataSet<String> output = input.reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<String> input = // [...]
+DataStream<String> output = input.keyBy(value -> "KEY")
+        .reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### Aggregate on Full DataSet
+
+{{< tabs aggregatefullfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<Integer, Integer>> input = // [...]
+DataSet<Tuple2<Integer, Integer>> output = input
+        // compute sum of the second field
+        // .aggregate(SUM, 1);
+        // compute minimum value of the second field
+        // .aggregate(MIN, 1);
+        // compute maximum value of the second field
+        // .aggregate(MAX, 1);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<Integer, Integer>> input = // [...]
+        DataStream<Tuple2<Integer, Integer>> output = input
+        .keyBy(value -> "KEY")
+        // compute sum of the second field
+        // .sum(1);
+        // compute minimum value of the second field
+        // .min(1);
+        // compute maximum value of the second field
+        // .max(1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+### Additional Computation and Shuffle Cost
+
+In the third type of operators, the complete records of the DataSet or the 
records of individual subtask needs to be collected 
+first and then processed. To collect records, the DataStream assigns a same 
timestamp to all records and uses a fixed-length time 
+window to gather them. However, this introduces additional computational costs 
due to timestamp processing. Here is an example 
+code snippet:
+```java
+// assign a same timestamp to all records
+void assignSameTimestamp(DataStream<Tuple2<String, Integer>> input){
+        input.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, 
Integer>>forMonotonousTimestamps()
+        .withTimestampAssigner((event, timestamp) -> 0));
+}
+```
+To collect records from each subtask, every record needs to be assigned a 
unique subtask ID and grouped accordingly within the window. 
+This additional step of assigning the subtask ID and performing a groupby 
operation introduces shuffle costs. Here is an example code 
+snippet showing how to assign a subtask ID to each record:
+```java
+// assign subtask ID to all records
+DataStream<Tuple2<String, Integer>> assignSubtaskID(DataStream<Integer> input){

Review Comment:
   Currently the method is transformed into the class "AddSubtaskIDMapFunction".



##########
docs/content/docs/dev/datastream/how_to_migrate_from_dataset_to_datastream.md:
##########
@@ -0,0 +1,660 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+bookToc: false
+aliases:
+  - /dev/how_to_migrate_from_dataset_to_datastream.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How To Migrate From DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. DataSet operators can be implemented by the 
DataStream API. However, it's important to note that 
+different operators have varying costs in the implementation, and they can be 
categorized into three types:
+
+1. The first type of operators are quite similar to DataStream in terms of API 
usage. They can be easily implemented without much 
+complexity.
+2. The second type of operators, on the other hand, have completely different 
names and API usage in DataStream. This can make the 
+job code more complex.
+3. Lastly, the third type of operators not only have different names and API 
usage in DataStream, but they also involve additional 
+computation and shuffle costs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each type of DataSet operators using the DataStream API, highlighting the 
specific considerations and challenges associated with each type.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
moving from ExecutionEnvironment to StreamExecutionEnvironment.
+{{< tabs executionenv >}}
+{{< tab "DataSet">}}
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+As the source of DataSet is always bounded, the execution mode is suggested to 
be set to RuntimeMode.BATCH to allow Flink to apply
+additional optimizations for batch processing.
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Implement the DataSet API by DataStream
+
+### Same API Usage
+
+In the first type of operators, the usage of the API in DataStream is almost 
identical to that in DataSet. This means that 
+implementing these operators using the DataStream API is relatively 
straightforward and does not require significant modifications 
+or complexity in the code.
+
+#### Map
+
+{{< tabs mapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### FlatMap
+
+{{< tabs flatmapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Filter
+
+{{< tabs filterfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Union
+
+{{< tabs unionfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<String> input1 = // [...]
+DataSet<String> input2 = // [...]
+DataSet<String> output = input1.union(input2);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<String> input1 = // [...]
+DataStream<String> input2 = // [...]
+DataStream<String> output = input1.union(input2);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### Rebalance
+
+{{< tabs rebalancefunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<String> input = // [...]
+DataSet<String> output = input.rebalance();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<String> input = // [...]
+DataStream<String> output = input.rebalance();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Grouped DataSet
+
+{{< tabs reducegroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> input = // [...]
+DataSet<Tuple2<String, Integer>> output = input
+        .groupBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+            // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input = // [...]
+        DataStream<Tuple2<String, Integer>> output = input
+        .keyBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Aggregate on Grouped DataSet
+
+{{< tabs aggregategroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> input = // [...]
+DataSet<Tuple2<String, Integer>> output = input
+        .groupBy(value -> value.f0)
+        // compute sum of the second field
+        // .aggregate(SUM, 1);
+        // compute min of the second field
+        // .aggregate(MIN, 1);
+        // compute max of the second field
+        // .aggregate(MAX, 1);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input = // [...]
+DataStream<Tuple2<String, Integer>> output = input
+        .keyBy(value -> value.f0)
+        // compute sum of the second field
+        // .sum(1);
+        // compute min of the second field
+        // .min(1);
+        // compute max of the second field
+        // .max(1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+### Different API Usage
+
+In the second type of operators there are differences in both naming 
conventions and API usage between DataStream and DataSet. 
+The developers need to adapt their code to accommodate these variations, which 
introduces additional complexity.
+
+#### Project
+
+{{< tabs projectfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple3<Integer, Double, String>> input = // [...]
+DataSet<Tuple2<String, Integer>> out = input.project(2,0);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple3<Integer, Double, String>> input = // [...]
+DataStream<Tuple2<String, Integer>> out = input.map(value -> 
Tuple2.of(value.f2, value.f0));
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Distinct
+
+{{< tabs distinctfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Integer> input = // [...]
+DataSet<Integer> output = input.distinct();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Integer> input = // [...]
+DataStream<Integer> output = input
+        .keyBy(value -> value)
+        .reduce((value1, value2) -> value1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Hash-Partition
+
+{{< tabs hashpartitionfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> input = // [...]
+DataSet<Tuple2<String, Integer>> output = input.partitionByHash(value -> 
value.f0);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input = // [...]
+DataStream<Tuple2<String, Integer>> output =
+        input.partitionCustom(
+        (key, numSubpartition) ->
+        // partition by the hashcode of key
+        key.hashCode() % numSubpartition, value -> value.f0);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Full DataSet
+
+{{< tabs reducefullfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<String> input = // [...]
+DataSet<String> output = input.reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<String> input = // [...]
+DataStream<String> output = input.keyBy(value -> "KEY")
+        .reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### Aggregate on Full DataSet
+
+{{< tabs aggregatefullfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<Integer, Integer>> input = // [...]
+DataSet<Tuple2<Integer, Integer>> output = input
+        // compute sum of the second field
+        // .aggregate(SUM, 1);
+        // compute minimum value of the second field
+        // .aggregate(MIN, 1);
+        // compute maximum value of the second field
+        // .aggregate(MAX, 1);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<Integer, Integer>> input = // [...]
+        DataStream<Tuple2<Integer, Integer>> output = input
+        .keyBy(value -> "KEY")
+        // compute sum of the second field
+        // .sum(1);
+        // compute minimum value of the second field
+        // .min(1);
+        // compute maximum value of the second field
+        // .max(1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+### Additional Computation and Shuffle Cost
+
+In the third type of operators, the complete records of the DataSet or the 
records of individual subtask needs to be collected 
+first and then processed. To collect records, the DataStream assigns a same 
timestamp to all records and uses a fixed-length time 
+window to gather them. However, this introduces additional computational costs 
due to timestamp processing. Here is an example 
+code snippet:
+```java
+// assign a same timestamp to all records
+void assignSameTimestamp(DataStream<Tuple2<String, Integer>> input){
+        input.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, 
Integer>>forMonotonousTimestamps()
+        .withTimestampAssigner((event, timestamp) -> 0));
+}
+```
+To collect records from each subtask, every record needs to be assigned a 
unique subtask ID and grouped accordingly within the window. 
+This additional step of assigning the subtask ID and performing a groupby 
operation introduces shuffle costs. Here is an example code 
+snippet showing how to assign a subtask ID to each record:
+```java
+// assign subtask ID to all records
+DataStream<Tuple2<String, Integer>> assignSubtaskID(DataStream<Integer> input){
+        return input.map(new RichMapFunction<Integer, Tuple2<String, 
Integer>>() {
+                        @Override
+                        public Tuple2<String, Integer> map(Integer value) {
+                                return 
Tuple2.of(String.valueOf(getRuntimeContext().
+                                            getIndexOfThisSubtask()), 
value);}});
+}
+```
+
+#### MapPartition/SortPartition
+
+{{< tabs mapsortpartitionfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Integer> input = // [...]
+// MapPartition
+input.mapPartition(new MapPartitionFunction(){
+        // implement user-defined map partition logic
+        });
+// SortPartition
+input.sortPartition(0, Order.ASCENDING);
+input.sortPartition(0, Order.DESCENDING)
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Integer> input = // [...]
+// assign subtask ID to all records
+DataStream<Tuple2<String, Integer>> dataStream = assignSubtaskID(input);
+// assign a same timestamp to all records
+assignSameTimestamp(dataStream);
+dataStream.keyBy(value -> value.f0)
+        .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+        .apply(new WindowFunction(){
+        // implement user-defined map partition or sort partition logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### GroupReduce on Grouped DataSet
+
+{{< tabs groupreducefunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<Integer, String>> input = // [...]
+DataSet<Tuple2<Integer, String>> output = input
+        .groupBy(value -> value.f0)
+        .reduceGroup(new GroupReduceFunction(){
+        // implement user-defined group reduce logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input = // [...]
+// assign a same timestamp to all records
+assignSameTimestamp(input);
+DataStream<Tuple2<String, Integer>> output = input
+        .keyBy(value -> value.f0)
+        .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+        .apply(new WindowFunction(){
+        // implement user-defined group reduce logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### GroupReduce on Full DataSet
+
+{{< tabs groupreducefullfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Integer> input = // [...]
+DataSet<Integer> output = input.
+        reduceGroup(new GroupReduceFunction(){
+        // implement user-defined group reduce logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Integer> input = // [...]
+// assign a same timestamp to all records
+assignSameTimestamp(input);
+input.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+        .apply(new WindowFunction(){
+        // implement user-defined group reduce logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Join
+
+{{< tabs joinfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> input1 = // [...]
+DataSet<Tuple2<String, Integer>> input2 = // [...]
+input1.join(input2)
+        .where(data -> data.f0)
+        .equalTo(data -> data.f0)
+        .with(new JoinFunction(){
+        // implement user-defined join logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> input1 = // [...]
+DataStream<Tuple2<String, Integer>> input2 = // [...]
+// assign a same timestamp to all records
+assignSameTimestamp(input1);
+assignSameTimestamp(input2);
+dataStream1.join(dataStream2)
+        .where(value -> value.f0)
+        .equalTo(value -> value.f0)
+        .window(TumblingEventTimeWindows.of(Time.seconds(1)))
+        .apply(new JoinFunction(){
+        // implement user-defined join logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+The Join operator can be efficiently implemented using the Table object in the 
Table API. The Table object allows seamless 
+conversion from a DataStream. Once the join computation on the Table is 
completed, the Table object can be converted back to 
+a DataStream. Here is an example code snippet:
+```java
+DataStream<Tuple2<String, Integer>> input1 = // [...]
+DataStream<Tuple2<String, Integer>> input2 = // [...]
+Table inputTable1 = tableEnv.fromDataStream(input1, $("t1.f0"), $("t1.f1"));
+Table inputTable2 = tableEnv.fromDataStream(input2, $("t2.f0"), $("t2.f1"));
+Table result = inputTable1.join(inputTable2)
+        .where($("t1.f0").isEqual($("t2.f0")))
+        .select($("*"));
+// Convert Table to DataStream
+DataStream<Row> dataStream = tableEnv.toDataStream(result.select($("*")));
+```
+
+#### Cross
+
+To implement the Cross operation between two DataStreams, datastream1 and 
datastream2, the parallism of two datastreams should be 
+same and then follow these steps:
+
+1. Broadcast datastream1 to a map operator. This ensures that each subtask in 
the map operator contains all the records of datastream1. 
+Each record will also be assigned a subtask ID in map operator.
+
+2. Assign a subtask ID to each record in datastream2.
+
+3. Finally, assign the same timestamp to both datastream1 and datastream2 and 
join them based on the subtask ID within a fixed-length 
+time window.
+
+{{< tabs crossfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Integer> input1 = // [...]
+DataSet<Integer> input2 = // [...]
+// Cross
+input1.cross(inputs)
+        .with(new CrossFunction(){
+        // implement user-defined cross logic
+        })
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataSet<Integer> input1 = // [...]
+DataSet<Integer> input2 = // [...]

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to