yunfengzhou-hub commented on code in PR #23362:
URL: https://github.com/apache/flink/pull/23362#discussion_r1363026695


##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,746 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    ExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    ExecutionEnvironment.createLocalEnvironment();
+                    // Create the collection environment
+                    new CollectionEnvironment();
+                    // Create the remote environment
+                    ExecutionEnvironment.createRemoteEnvironment(String host, 
int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    StreamExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    StreamExecutionEnvironment.createLocalEnvironment();
+                    // The collection environment is not supported.
+                    // Create the remote environment
+                    StreamExecutionEnvironment.createRemoteEnvironment(String 
host, int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+As the source of DataSet is always bounded, the execution mode must be set to 
RuntimeMode.BATCH to make Flink execute in batch mode.
+
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();

Review Comment:
   It might be better to remove this line or use `StreamExecutionEnvironment 
executionEnvironment = // [...]`.



##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,746 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    ExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    ExecutionEnvironment.createLocalEnvironment();
+                    // Create the collection environment
+                    new CollectionEnvironment();
+                    // Create the remote environment
+                    ExecutionEnvironment.createRemoteEnvironment(String host, 
int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    StreamExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    StreamExecutionEnvironment.createLocalEnvironment();
+                    // The collection environment is not supported.
+                    // Create the remote environment
+                    StreamExecutionEnvironment.createRemoteEnvironment(String 
host, int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+As the source of DataSet is always bounded, the execution mode must be set to 
RuntimeMode.BATCH to make Flink execute in batch mode.
+
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Using the streaming sources and sinks
+
+### Sources
+
+The DataStream API uses `DataStreamSource` to read records from external 
system, while the DataSet API uses the `DataSource`.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Read data from file
+                    DataSource<> source = 
ExecutionEnvironment.readFile(FileInputFormat<> inputFormat, String filePath);
+                    // Read data from collection
+                    DataSource<> source = 
ExecutionEnvironment.fromCollection(Collection<> data);
+                    // Read data from inputformat
+                    DataSource<> source = 
ExecutionEnvironment.createInput(InputFormat<> inputFormat)

Review Comment:
   It might be better to use `DataSource<> source = 
ExecutionEnvironment.createInput(inputFormat)` instead of `DataSource<> source 
= ExecutionEnvironment.createInput(InputFormat<> inputFormat)`. Same for other 
codes that have mixed signature and example variables.



##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,758 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">// Create the execution 
environment
+            ExecutionEnvironment.getExecutionEnvironment();
+            // Create the local execution environment
+            ExecutionEnvironment.createLocalEnvironment();
+            // Create the collection environment
+            new CollectionEnvironment();
+            // Create the remote environment
+            ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">// Create the execution 
environment
+            StreamExecutionEnvironment.getExecutionEnvironment();
+            // Create the local execution environment
+            StreamExecutionEnvironment.createLocalEnvironment();
+            // The collection environment is not supported.
+            // Create the remote environment
+            StreamExecutionEnvironment.createRemoteEnvironment(host, port, 
jarFile);
+        </code>
+    </td>
+  </tr>
+</table>
+
+
+As the source of DataSet is always bounded, the execution mode must be set to 
RuntimeMode.BATCH to make Flink execute in batch mode.
+
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Using the streaming sources and sinks
+
+Sources: The DataStream API uses `DataStreamSource` to read records from 
external system, while the DataSet API uses the
+`DataSource`.
+
+Sinks: The DataStream API uses the implementations of `SinkFunction` and 
`Sink` to write records to external system, while the
+DataSet API uses the `FileOutputFormat`.
+
+If you are looking for pre-defined source and sink connectors of DataStream, 
please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" 
>}})
+
+
+## Implement the DataSet API by DataStream
+
+#### Category 1
+
+For Category 1, the usage of the API in DataStream is almost identical to that 
in DataSet. This means that implementing these 
+DataSet APIs by the DataStream API is relatively straightforward and does not 
require significant modifications or complexity
+in the job code.
+
+### Map
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">dataSet.map(new MapFunction(){
+    // implement user-defined map logic
+});
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">dataStream.map(new MapFunction(){
+    // implement user-defined map logic
+});
+        </code>
+    </td>
+  </tr>
+</table>
+
+
+### FlatMap
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">dataSet.flatMap(new 
FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">dataStream.flatMap(new 
FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Filter
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">dataSet.filter(new 
FilterFunction(){
+    // implement user-defined filter logic
+});
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">dataStream.filter(new 
FilterFunction(){
+    // implement user-defined filter logic
+});
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Union
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">dataSet1.union(dataSet2);
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">dataStream1.union(dataStream2);
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Rebalance
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">dataSet.rebalance();
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">dataStream.rebalance();
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Project
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple3&ltInteger, 
Double, String&gt&gt dataSet = // [...]
+dataSet.project(2,0);
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple3&ltInteger, 
Double, String&gt&gt dataStream = // [...]
+dataStream.project(2,0);
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Reduce on Grouped DataSet
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltString, 
Integer&gt&gt dataSet = // [...]
+dataSet.groupBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+            // implement user-defined reduce logic
+        });
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltString, 
Integer&gt&gt dataStream = // [...]
+dataStream.keyBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+            // implement user-defined reduce logic
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Aggregate on Grouped DataSet
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltString, 
Integer&gt&gt dataSet = // [...]
+// compute sum of the second field
+dataSet.groupBy(value -> value.f0).aggregate(SUM, 1);
+// compute min of the second field
+dataSet.groupBy(value -> value.f0).aggregate(MIN, 1);
+// compute max of the second field
+dataSet.groupBy(value -> value.f0).aggregate(MAX, 1);
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltString, 
Integer&gt&gt dataStream = // [...]
+// compute sum of the second field
+dataStream.keyBy(value -> value.f0).sum(1);
+// compute min of the second field
+dataStream.keyBy(value -> value.f0).min(1);
+// compute max of the second field
+dataStream.keyBy(value -> value.f0).max(1);
+        </code>
+    </td>
+  </tr>
+</table>
+
+#### Category 2
+
+For category 2, these DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior.
+The developers need to adapt their code to accommodate these variations, which 
introduces additional complexity.
+
+### Distinct
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltInteger&gt dataSet = // 
[...]
+dataSet.distinct();
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltInteger&gt 
dataStream = // [...]
+dataStream
+        .keyBy(value -> value)
+        .reduce((value1, value2) -> value1);
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Hash-Partition
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltString, 
Integer&gt&gt dataSet = // [...]
+dataSet.partitionByHash(value -> value.f0);
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltString, 
Integer&gt&gt dataStream = // [...]
+// partition by the hashcode of key
+dataStream.partitionCustom((key, numSubpartition) -> key.hashCode() % 
numSubpartition, value -> value.f0);
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Reduce on Full DataSet
+
+If developers want to compute data of full datastream, `GlobalWindow` could be 
used to collect all records of datastream.
+However, a special trigger is also required to trigger the computation of 
`GlobalWindow` at the end of its inputs. Here is an example 
+code snippet of the trigger.
+
+```java
+public class EOFTrigger extends Trigger<Object, GlobalWindow> {
+
+    private boolean hasRegistered;
+
+    @Override
+    public TriggerResult onElement(
+            Object element, long timestamp, GlobalWindow window, 
TriggerContext ctx) {
+        if (!hasRegistered) {
+            ctx.registerEventTimeTimer(Long.MAX_VALUE);
+            hasRegistered = true;
+        }
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onEventTime(long time, GlobalWindow window, 
TriggerContext ctx) {
+        return TriggerResult.FIRE_AND_PURGE;
+    }
+
+    @Override
+    public TriggerResult onProcessingTime(long time, GlobalWindow window, 
TriggerContext ctx) {
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public void clear(GlobalWindow window, TriggerContext ctx) throws 
Exception {}
+}
+```
+Then the reduce operation on full datastream could be performed by 
`EOFTrigger`.
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltString&gt dataSet = // 
[...]
+dataSet.reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltString&gt dataStream 
= // [...]
+dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger())
+        .reduce(new ReduceFunction(){
+        // implement user-defined reduce logic
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+
+### Aggregate on Full DataSet
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltInteger, 
Integer&gt&gt dataSet = // [...]
+// compute sum of the second field
+dataSet.aggregate(SUM, 1);
+// compute min of the second field
+dataSet.aggregate(MIN, 1);
+// compute max of the second field
+dataSet.aggregate(MAX, 1);
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltInteger, 
Integer&gt&gt dataStream = // [...]
+// compute sum of the second field
+dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).sum(1);
+// compute min of the second field
+dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).min(1);
+// compute max of the second field
+dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).max(1);
+        </code>
+    </td>
+  </tr>
+</table>
+
+### GroupReduce on Full DataSet
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltInteger&gt dataSet = // 
[...]
+dataSet.reduceGroup(new GroupReduceFunction(){
+        // implement user-defined group reduce logic
+        });
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltInteger&gt 
dataStream = // [...]
+dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger())
+        .apply(new WindowFunction(){
+        // implement user-defined group reduce logic
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+### GroupReduce on Grouped DataSet
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltInteger, 
String&gt&gt dataSet = // [...]
+dataSet.groupBy(value -> value.f0)
+        .reduceGroup(new GroupReduceFunction(){
+        // implement user-defined group reduce logic
+        });
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltString, 
Integer&gt&gt dataStream = // [...]
+dataStream.keyBy(value -> value.f0)
+        .window(GlobalWindows.create())
+        .trigger(new EOFTrigger())
+        .apply(new WindowFunction(){
+        // implement user-defined group reduce logic
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+### First-n
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">dataSet.first(n)
+        </code>
+    </td>
+    <td>
+        <code style="white-space: 
pre-line;">dataStream.windowAll(GlobalWindows.create())
+.trigger(new EOFTrigger())
+.apply(new AllWindowFunction(){
+// implement first-n logic
+});
+        </code>
+    </td>
+  </tr>
+</table>
+
+
+### Join
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltString, 
Integer&gt&gt dataSet1 = // [...]
+DataSet&ltTuple2&ltString, Integer&gt&gt dataSet2 = // [...]
+dataSet1.join(dataSet2)
+        .where(data -> data.f0)
+        .equalTo(data -> data.f0)
+        .with(new JoinFunction(){
+        // implement user-defined join logic
+        });
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltString, 
Integer&gt&gt dataStream1 = // [...]
+DataStream&ltTuple2&ltString, Integer&gt&gt dataStream2 = // [...]
+dataStream1.join(dataStream2)
+        .where(value -> value.f0)
+        .equalTo(value -> value.f0)
+        .window(GlobalWindows.create())
+        .trigger(new EOFTrigger())
+        .apply(new JoinFunction(){
+        // implement user-defined join logic
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+### CoGroup
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltString, 
Integer&gt&gt dataSet1 = // [...]
+DataSet&ltTuple2&ltString, Integer&gt&gt dataSet2 = // [...]
+dataSet1.coGroup(dataSet2).where(value -> value.f0)
+        .equalTo(value -> value.f0)
+        .with(new CoGroupFunction(){
+        // implement user-defined co group logic
+        });
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltString, 
Integer&gt&gt dataStream1 = // [...]
+DataStream&ltTuple2&ltString, Integer&gt&gt dataStream2 = // [...]
+dataStream1.coGroup(dataStream2)
+        .where(value -> value.f0)
+        .equalTo(value -> value.f0)
+        .window(GlobalWindows.create())
+        .trigger(new EOFTrigger())
+        .apply(new CoGroupFunction(){
+        // implement user-defined co group logic
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+### OuterJoin
+
+To implement the OuterJoin operation between two DataStreams, datastream1 and 
datastream2, follow these steps:
+
+1. Co-group the two idatastreams in a `GlobalWindow` with `EOFTrigger` to 
collect elements with the specific key of each datastream.
+
+2. Implement left/right outer join in the user-defined CoGroupFunction.
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltTuple2&ltString, 
Integer&gt&gt dataSet1 = // [...]
+DataSet&ltTuple2&ltString, Integer&gt&gt dataSet2 = // [...]
+// left outer join
+dataSet1.leftOuterJoin(dataSet2).where(dataSet1.f0)
+        .equalTo(dataSet2.f0)
+        .with(new JoinFunction(){
+        // implement user-defined left outer join logic
+        });
+// right outer join
+dataSet1.rightOuterJoin(dataSet2).where(dataSet1.f0)
+        .equalTo(dataSet2.f0)
+        .with(new JoinFunction(){
+        // implement user-defined right outer join logic
+        });
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltTuple2&ltString, 
Integer&gt&gt dataStream1 = // [...]
+DataStream&ltTuple2&ltString, Integer&gt&gt dataStream2 = // [...]
+// left outer join
+dataStream1.coGroup(dataStream2)
+        .where(value -> value.f0)
+        .equalTo(value -> value.f0)
+        .window(GlobalWindows.create())
+        .trigger(new EOFTrigger())
+        .apply((leftIterable, rightInterable, collector) -> {
+            if(!rightInterable.iterator().hasNext()){
+            // implement user-defined left outer join logic
+            }
+        });
+// right outer join
+dataStream1.coGroup(dataStream2)
+        .where(value -> value.f0)
+        .equalTo(value -> value.f0)
+        .window(GlobalWindows.create())
+        .trigger(new EOFTrigger())
+        .apply((leftIterable, rightInterable, collector) -> {
+            if(!leftIterable.iterator().hasNext()){
+            // implement user-defined right outer join logic
+            }
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+#### Category 3
+
+For category 3, these DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. Additional 
+calculation steps will be added.
+
+To collect records from each subtask, every record needs to be assigned a 
unique subtask ID and grouped accordingly within the window. 
+This additional step of assigning the subtask ID and performing a groupby 
operation introduces shuffle costs. Here is an example code 
+snippet showing how to assign a subtask ID to each record. The function 
assignSubtaskID is used to explain the detailed behavior and will 
+be utilized in the subsequent sections:
+```java
+// assign subtask ID to all records
+DataStream<Tuple2<String, Integer>> assignSubtaskID(DataStream<Integer> 
dataStream) {
+        return dataStream.map(new RichMapFunction<Integer, Tuple2<String, 
Integer>>() {
+                @Override
+                public Tuple2<String, Integer> map(Integer value) {
+                        return 
Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
+                }
+        });
+}
+```
+
+### MapPartition/SortPartition
+
+<table>
+  <tr>
+    <th>DataSet</th>
+    <th>DataStream</th>
+  </tr>
+  <tr>
+    <td>
+        <code style="white-space: pre-line;">DataSet&ltInteger&gt dataSet = // 
[...]
+// MapPartition
+dataSet.mapPartition(new MapPartitionFunction(){
+        // implement user-defined map partition logic
+        });
+// SortPartition
+dataSet.sortPartition(0, Order.ASCENDING);
+dataSet.sortPartition(0, Order.DESCENDING)
+        </code>
+    </td>
+    <td>
+        <code style="white-space: pre-line;">DataStream&ltInteger&gt 
dataStream = // [...]
+// assign subtask ID to all records
+DataStream&ltTuple2&ltString, Integer&gt&gt dataStream1 = 
assignSubtaskID(dataStream);
+dataStream1.keyBy(value -> value.f0)
+        .window(GlobalWindows.create())
+        .trigger(new EOFTrigger())
+        .apply(new WindowFunction(){
+        // implement user-defined map partition or sort partition logic
+        });
+        </code>
+    </td>
+  </tr>
+</table>
+
+### Cross
+
+To implement the Cross operation between two DataStreams, datastream1 and 
datastream2, the parallism of two datastreams should be the
+same and then follow these steps:

Review Comment:
   I wrote a program like follows according to the document. The expected 
result size is 100 while the actual result is not. Could you please help verify 
the correctness of the following code and the document?
   
   ```java
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   env.setParallelism(4);
   DataStream<Long> dataStream1 = env.fromSequence(0, 9);
   DataStream<Long> dataStream2 = env.fromSequence(0, 9);
   
   DataStream<Tuple2<String, Long>> dataStream3 = 
dataStream1.broadcast().map(new AddSubtaskIDMapFunction<>());
   DataStream<Tuple2<String, Long>> dataStream4 = dataStream2.map(new 
AddSubtaskIDMapFunction<>());
   // join the two streams according to the subtask ID
   DataStream<Long> resultStream = dataStream3.join(dataStream4)
           .where(value -> value.f0)
           .equalTo(value -> value.f0)
           .window(GlobalWindows.create())
           .trigger(new EOFTrigger())
           .apply((JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, 
Long>) (first, second) ->
                   first.f1 * 10 + second.f1);
   
   List<Long> result = IteratorUtils.toList(resultStream.executeAndCollect());
   System.out.println(result.size());
   Collections.sort(result);
   System.out.println(result);
   ```



##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,746 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    ExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    ExecutionEnvironment.createLocalEnvironment();
+                    // Create the collection environment
+                    new CollectionEnvironment();
+                    // Create the remote environment
+                    ExecutionEnvironment.createRemoteEnvironment(String host, 
int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    StreamExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    StreamExecutionEnvironment.createLocalEnvironment();
+                    // The collection environment is not supported.
+                    // Create the remote environment
+                    StreamExecutionEnvironment.createRemoteEnvironment(String 
host, int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+As the source of DataSet is always bounded, the execution mode must be set to 
RuntimeMode.BATCH to make Flink execute in batch mode.
+
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Using the streaming sources and sinks
+
+### Sources
+
+The DataStream API uses `DataStreamSource` to read records from external 
system, while the DataSet API uses the `DataSource`.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Read data from file
+                    DataSource<> source = 
ExecutionEnvironment.readFile(FileInputFormat<> inputFormat, String filePath);
+                    // Read data from collection
+                    DataSource<> source = 
ExecutionEnvironment.fromCollection(Collection<> data);
+                    // Read data from inputformat
+                    DataSource<> source = 
ExecutionEnvironment.createInput(InputFormat<> inputFormat)
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Read data from file
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.readFile(FileInputFormat<> inputFormat, String 
filePath);
+                    // Read data from collection
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.fromCollection(Collection<> data);
+                    // Read data from inputformat
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.createInput(InputFormat<> inputFormat)
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+### Sinks
+
+The DataStream API uses the implementations of `DataStreamSink` to write 
records to external system, while the
+DataSet API uses the `DataSink`.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Write to outputformat
+                    DataSink<> sink = dataSet.output(OutputFormat<> 
outputFormat);
+                    // Write to csv file
+                    DataSink<> sink = dataSet.writeAsCsv(String filePath);
+                    // Write to text file
+                    DataSink<> sink = dataSet.writeAsText(String filePath);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Write to sink function or sink
+                    DataStreamSink<> sink = dataStream.addSink(SinkFunction<> 
sinkFunction)
+                    DataStreamSink<> sink = 
dataStream.sinkTo(org.apache.flink.api.connector.sink.Sink<> sink)
+                    // Write to csv file
+                    DataStreamSink<> sink = dataStream.writeAsCsv(String path);
+                    // Write to text file
+                    DataStreamSink<> sink = dataStream.writeAsText(String 
path);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+If you are looking for pre-defined source and sink connectors of DataStream, 
please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" 
>}})
+
+
+## Implement the DataSet API by DataStream
+
+### Category 1
+
+For Category 1, the usage of the API in DataStream is almost identical to that 
in DataSet. This means that implementing these 
+DataSet APIs by the DataStream API is relatively straightforward and does not 
require significant modifications or complexity
+in the job code.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Map</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.map(new MapFunction<>(){
+                    // implement user-defined map logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.map(new MapFunction<>(){
+                    // implement user-defined map logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>FlatMap</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.flatMap(new FlatMapFunction<>(){
+                        // implement user-defined flatmap logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.flatMap(new FlatMapFunction<>(){
+                        // implement user-defined flatmap logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Filter</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.filter(new FilterFunction<>(){
+                        // implement user-defined filter logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.filter(new FilterFunction<>(){
+                        // implement user-defined filter logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Union</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet1.union(dataSet2);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream1.union(dataStream2);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Rebalance</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.rebalance();
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.rebalance();
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Project</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple3<Integer, Double, String>> dataSet = // [...]
+                    dataSet.project(2,0);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple3<Integer, Double, String>> dataStream = 
// [...]
+                    dataStream.project(2,0);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Reduce on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    dataSet.groupBy(value -> value.f0)
+                            .reduce(new ReduceFunction<>(){
+                                // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    dataStream.keyBy(value -> value.f0)
+                            .reduce(new ReduceFunction<>(){
+                                // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Aggregate on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    // compute sum of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(SUM, 1);
+                    // compute min of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(MIN, 1);
+                    // compute max of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(MAX, 1);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    // compute sum of the second field
+                    dataStream.keyBy(value -> value.f0).sum(1);
+                    // compute min of the second field
+                    dataStream.keyBy(value -> value.f0).min(1);
+                    // compute max of the second field
+                    dataStream.keyBy(value -> value.f0).max(1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+### Category 2
+
+For category 2, these DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior.
+The developers need to adapt their code to accommodate these variations, which 
introduces additional complexity.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Distinct</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Integer> dataSet = // [...]
+                    dataSet.distinct();
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Integer> dataStream = // [...]
+                    dataStream.keyBy(value -> value)
+                            .reduce((value1, value2) -> value1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Hash-Partition</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    dataSet.partitionByHash(value -> value.f0);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    // partition by the hashcode of key
+                    dataStream.partitionCustom((key, numSubpartition) -> 
key.hashCode() % numSubpartition, value -> value.f0);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+Operations on a full DataSet correspond to the global window aggregation in 
DataStream with a custom trigger that invokes the computation of GlobalWindow 
at the end of the inputs. Below is an example code snippet that will be reduced 
in the rest of this document.
+
+```java
+public class EOFTrigger extends Trigger<Object, GlobalWindow> {
+
+    private boolean hasRegistered;
+
+    @Override
+    public TriggerResult onElement(
+            Object element, long timestamp, GlobalWindow window, 
TriggerContext ctx) {
+        if (!hasRegistered) {
+            ctx.registerEventTimeTimer(Long.MAX_VALUE);
+            hasRegistered = true;
+        }
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onEventTime(long time, GlobalWindow window, 
TriggerContext ctx) {
+        return TriggerResult.FIRE_AND_PURGE;
+    }
+
+    @Override
+    public TriggerResult onProcessingTime(long time, GlobalWindow window, 
TriggerContext ctx) {
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public void clear(GlobalWindow window, TriggerContext ctx) throws 
Exception {}
+}
+```
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Reduce on Full DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<String> dataSet = // [...]
+                    dataSet.reduce(new ReduceFunction<>(){
+                            // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<String> dataStream = // [...]
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger())
+                            .reduce(new ReduceFunction<>(){
+                            // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Aggregate on Full DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<Integer, Integer>> dataSet = // [...]
+                    // compute sum of the second field
+                    dataSet.aggregate(SUM, 1);
+                    // compute min of the second field
+                    dataSet.aggregate(MIN, 1);
+                    // compute max of the second field
+                    dataSet.aggregate(MAX, 1);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<Integer, Integer>> dataStream = // [...]
+                    // compute sum of the second field
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger()).sum(1);
+                    // compute min of the second field
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger()).min(1);
+                    // compute max of the second field
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger()).max(1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>GroupReduce on Full DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Integer> dataSet = // [...]
+                    dataSet.reduceGroup(new GroupReduceFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Integer> dataStream = // [...]
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger())
+                            .apply(new WindowFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>GroupReduce on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<Integer, String>> dataSet = // [...]
+                    dataSet.groupBy(value -> value.f0)
+                            .reduceGroup(new GroupReduceFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    dataStream.keyBy(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new WindowFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>First-n</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.first(n)
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.windowAll(GlobalWindows.create())
+                        .trigger(new EOFTrigger())
+                        .apply(new AllWindowFunction<>(){
+                        // implement first-n logic
+                        });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Join</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet1 = // [...]
+                    DataSet<Tuple2<String, Integer>> dataSet2 = // [...]
+                    dataSet1.join(dataSet2)
+                            .where(data -> data.f0)
+                            .equalTo(data -> data.f0)
+                            .with(new JoinFunction<>(){
+                            // implement user-defined join logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream1 = // [...]
+                    DataStream<Tuple2<String, Integer>> dataStream2 = // [...]
+                    dataStream1.join(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new JoinFunction<>(){
+                            // implement user-defined join logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>CoGroup</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet1 = // [...]
+                    DataSet<Tuple2<String, Integer>> dataSet2 = // [...]
+                    dataSet1.coGroup(dataSet2).where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .with(new CoGroupFunction<>(){
+                            // implement user-defined co group logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream1 = // [...]
+                    DataStream<Tuple2<String, Integer>> dataStream2 = // [...]
+                    dataStream1.coGroup(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new CoGroupFunction<>(){
+                            // implement user-defined co group logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>OuterJoin</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet1 = // [...]
+                    DataSet<Tuple2<String, Integer>> dataSet2 = // [...]
+                    // left outer join
+                    dataSet1.leftOuterJoin(dataSet2).where(dataSet1.f0)
+                            .equalTo(dataSet2.f0)
+                            .with(new JoinFunction<>(){
+                            // implement user-defined left outer join logic
+                            });
+                    // right outer join
+                    dataSet1.rightOuterJoin(dataSet2).where(dataSet1.f0)
+                            .equalTo(dataSet2.f0)
+                            .with(new JoinFunction<>(){
+                            // implement user-defined right outer join logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream1 = // [...]
+                    DataStream<Tuple2<String, Integer>> dataStream2 = // [...]
+                    // left outer join
+                    dataStream1.coGroup(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply((leftIterable, rightInterable, collector) 
-> {
+                                if(!rightInterable.iterator().hasNext()){
+                                // implement user-defined left outer join logic
+                                }
+                            });
+                    // right outer join
+                    dataStream1.coGroup(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply((leftIterable, rightInterable, collector) 
-> {
+                                if(!leftIterable.iterator().hasNext()){
+                                // implement user-defined right outer join 
logic
+                                }
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+### Category 3
+
+For category 3, these DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. Additional 
+calculation steps will be added.
+
+To collect records from each subtask, it is necessary to assign a unique 
subtask ID to each record and group them accordingly within the window. The 
+following code snippet illustrates how to assign a subtask ID to each record, 
and will be used in the DataStream examples in the subsequent sections. 
+Here is an example code.
+
+```java
+// assign subtask ID to all records
+public class AddSubtaskIDMapFunction extends RichMapFunction<Integer, 
Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> map(Integer value) {
+        return 
Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
+    }
+}
+```
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>MapPartition/SortPartition</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Integer> dataSet = // [...]
+                    // MapPartition
+                    dataSet.mapPartition(new MapPartitionFunction<>(){
+                            // implement user-defined map partition logic
+                            });
+                    // SortPartition
+                    dataSet.sortPartition(0, Order.ASCENDING);
+                    dataSet.sortPartition(0, Order.DESCENDING);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Integer> dataStream = // [...]
+                    // assign subtask ID to all records
+                    DataStream<Tuple2<String, Integer>> dataStream1 = 
dataStream.map(new AddSubtaskIDMapFunction());
+                    dataStream1.keyBy(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new WindowFunction<>(){
+                            // implement user-defined map partition or sort 
partition logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Cross</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Integer> dataSet1 = // [...]
+                    DataSet<Integer> dataSet2 = // [...]
+                    // Cross
+                    dataSet1.cross(dataSet2)
+                            .with(new CrossFunction<>(){
+                            // implement user-defined cross logic
+                            })
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // the parallelism of dataStream1 and dataStream2 should 
be same
+                    DataStream<Integer> dataStream1 = // [...]
+                    DataStream<Integer> dataStream2 = // [...]
+                    DataStream<Tuple2<String, Integer>> datastream3 = 
dataStream1.broadcast().map(new AddSubtaskIDMapFunction());
+                    DataStream<Tuple2<String, Integer>> datastream4 = 
dataStream2.map(new AddSubtaskIDMapFunction());
+                    // join the two streams according to the subtask ID
+                    dataStream3.join(dataStream4)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new JoinFunction<>(){
+                            // implement user-defined cross logic
+                            })
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+### Category 4
+
+The following DataSet APIs are not directly supported by DataStream, given 
that they rely on modifications to Flink's infrastructures. DataStream 
+has provided low-level APIs like ProcessFunction that exposes more basic 
building blocks of Flink, and users may use these APIs to 
+implement custom operators that achieve the same function as the following 
DataSet APIs.
+
+* RangePartition
+
+* GroupCombine

Review Comment:
   nit: the blank line above can be removed. As you can see in the following 
section of this comment, removing the blank line could reduce the spacing 
between rows.
   
   * RangePartition
   
   * GroupCombine
   ---
   * RangePartition
   * GroupCombine



##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,746 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    ExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    ExecutionEnvironment.createLocalEnvironment();
+                    // Create the collection environment
+                    new CollectionEnvironment();
+                    // Create the remote environment
+                    ExecutionEnvironment.createRemoteEnvironment(String host, 
int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    StreamExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    StreamExecutionEnvironment.createLocalEnvironment();
+                    // The collection environment is not supported.
+                    // Create the remote environment
+                    StreamExecutionEnvironment.createRemoteEnvironment(String 
host, int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+As the source of DataSet is always bounded, the execution mode must be set to 
RuntimeMode.BATCH to make Flink execute in batch mode.
+
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Using the streaming sources and sinks
+
+### Sources
+
+The DataStream API uses `DataStreamSource` to read records from external 
system, while the DataSet API uses the `DataSource`.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Read data from file
+                    DataSource<> source = 
ExecutionEnvironment.readFile(FileInputFormat<> inputFormat, String filePath);
+                    // Read data from collection
+                    DataSource<> source = 
ExecutionEnvironment.fromCollection(Collection<> data);
+                    // Read data from inputformat
+                    DataSource<> source = 
ExecutionEnvironment.createInput(InputFormat<> inputFormat)
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Read data from file
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.readFile(FileInputFormat<> inputFormat, String 
filePath);
+                    // Read data from collection
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.fromCollection(Collection<> data);
+                    // Read data from inputformat
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.createInput(InputFormat<> inputFormat)
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+### Sinks
+
+The DataStream API uses the implementations of `DataStreamSink` to write 
records to external system, while the
+DataSet API uses the `DataSink`.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Write to outputformat
+                    DataSink<> sink = dataSet.output(OutputFormat<> 
outputFormat);
+                    // Write to csv file
+                    DataSink<> sink = dataSet.writeAsCsv(String filePath);
+                    // Write to text file
+                    DataSink<> sink = dataSet.writeAsText(String filePath);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Write to sink function or sink
+                    DataStreamSink<> sink = dataStream.addSink(SinkFunction<> 
sinkFunction)
+                    DataStreamSink<> sink = 
dataStream.sinkTo(org.apache.flink.api.connector.sink.Sink<> sink)
+                    // Write to csv file
+                    DataStreamSink<> sink = dataStream.writeAsCsv(String path);
+                    // Write to text file
+                    DataStreamSink<> sink = dataStream.writeAsText(String 
path);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+If you are looking for pre-defined source and sink connectors of DataStream, 
please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" 
>}})
+
+
+## Implement the DataSet API by DataStream
+
+### Category 1
+
+For Category 1, the usage of the API in DataStream is almost identical to that 
in DataSet. This means that implementing these 
+DataSet APIs by the DataStream API is relatively straightforward and does not 
require significant modifications or complexity
+in the job code.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Map</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.map(new MapFunction<>(){
+                    // implement user-defined map logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.map(new MapFunction<>(){
+                    // implement user-defined map logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>FlatMap</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.flatMap(new FlatMapFunction<>(){
+                        // implement user-defined flatmap logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.flatMap(new FlatMapFunction<>(){
+                        // implement user-defined flatmap logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Filter</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.filter(new FilterFunction<>(){
+                        // implement user-defined filter logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.filter(new FilterFunction<>(){
+                        // implement user-defined filter logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Union</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet1.union(dataSet2);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream1.union(dataStream2);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Rebalance</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.rebalance();
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.rebalance();
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Project</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple3<Integer, Double, String>> dataSet = // [...]
+                    dataSet.project(2,0);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple3<Integer, Double, String>> dataStream = 
// [...]
+                    dataStream.project(2,0);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Reduce on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    dataSet.groupBy(value -> value.f0)
+                            .reduce(new ReduceFunction<>(){
+                                // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    dataStream.keyBy(value -> value.f0)
+                            .reduce(new ReduceFunction<>(){
+                                // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Aggregate on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    // compute sum of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(SUM, 1);
+                    // compute min of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(MIN, 1);
+                    // compute max of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(MAX, 1);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    // compute sum of the second field
+                    dataStream.keyBy(value -> value.f0).sum(1);
+                    // compute min of the second field
+                    dataStream.keyBy(value -> value.f0).min(1);
+                    // compute max of the second field
+                    dataStream.keyBy(value -> value.f0).max(1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+### Category 2
+
+For category 2, these DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior.
+The developers need to adapt their code to accommodate these variations, which 
introduces additional complexity.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Distinct</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Integer> dataSet = // [...]
+                    dataSet.distinct();
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Integer> dataStream = // [...]
+                    dataStream.keyBy(value -> value)
+                            .reduce((value1, value2) -> value1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Hash-Partition</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    dataSet.partitionByHash(value -> value.f0);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    // partition by the hashcode of key
+                    dataStream.partitionCustom((key, numSubpartition) -> 
key.hashCode() % numSubpartition, value -> value.f0);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+Operations on a full DataSet correspond to the global window aggregation in 
DataStream with a custom trigger that invokes the computation of GlobalWindow 
at the end of the inputs. Below is an example code snippet that will be reduced 
in the rest of this document.
+
+```java
+public class EOFTrigger extends Trigger<Object, GlobalWindow> {
+
+    private boolean hasRegistered;
+
+    @Override
+    public TriggerResult onElement(
+            Object element, long timestamp, GlobalWindow window, 
TriggerContext ctx) {
+        if (!hasRegistered) {
+            ctx.registerEventTimeTimer(Long.MAX_VALUE);
+            hasRegistered = true;
+        }
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onEventTime(long time, GlobalWindow window, 
TriggerContext ctx) {
+        return TriggerResult.FIRE_AND_PURGE;
+    }
+
+    @Override
+    public TriggerResult onProcessingTime(long time, GlobalWindow window, 
TriggerContext ctx) {
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public void clear(GlobalWindow window, TriggerContext ctx) throws 
Exception {}
+}
+```
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Reduce on Full DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<String> dataSet = // [...]
+                    dataSet.reduce(new ReduceFunction<>(){
+                            // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<String> dataStream = // [...]
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger())
+                            .reduce(new ReduceFunction<>(){
+                            // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Aggregate on Full DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<Integer, Integer>> dataSet = // [...]
+                    // compute sum of the second field
+                    dataSet.aggregate(SUM, 1);
+                    // compute min of the second field
+                    dataSet.aggregate(MIN, 1);
+                    // compute max of the second field
+                    dataSet.aggregate(MAX, 1);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<Integer, Integer>> dataStream = // [...]
+                    // compute sum of the second field
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger()).sum(1);
+                    // compute min of the second field
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger()).min(1);
+                    // compute max of the second field
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger()).max(1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>GroupReduce on Full DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Integer> dataSet = // [...]
+                    dataSet.reduceGroup(new GroupReduceFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Integer> dataStream = // [...]
+                    dataStream.windowAll(GlobalWindows.create()).trigger(new 
EOFTrigger())
+                            .apply(new WindowFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>GroupReduce on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<Integer, String>> dataSet = // [...]
+                    dataSet.groupBy(value -> value.f0)
+                            .reduceGroup(new GroupReduceFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    dataStream.keyBy(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new WindowFunction<>(){
+                            // implement user-defined group reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>First-n</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.first(n)
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.windowAll(GlobalWindows.create())
+                        .trigger(new EOFTrigger())
+                        .apply(new AllWindowFunction<>(){
+                        // implement first-n logic
+                        });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Join</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet1 = // [...]
+                    DataSet<Tuple2<String, Integer>> dataSet2 = // [...]
+                    dataSet1.join(dataSet2)
+                            .where(data -> data.f0)
+                            .equalTo(data -> data.f0)
+                            .with(new JoinFunction<>(){
+                            // implement user-defined join logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream1 = // [...]
+                    DataStream<Tuple2<String, Integer>> dataStream2 = // [...]
+                    dataStream1.join(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new JoinFunction<>(){
+                            // implement user-defined join logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>CoGroup</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet1 = // [...]
+                    DataSet<Tuple2<String, Integer>> dataSet2 = // [...]
+                    dataSet1.coGroup(dataSet2).where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .with(new CoGroupFunction<>(){
+                            // implement user-defined co group logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream1 = // [...]
+                    DataStream<Tuple2<String, Integer>> dataStream2 = // [...]
+                    dataStream1.coGroup(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply(new CoGroupFunction<>(){
+                            // implement user-defined co group logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>OuterJoin</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet1 = // [...]
+                    DataSet<Tuple2<String, Integer>> dataSet2 = // [...]
+                    // left outer join
+                    dataSet1.leftOuterJoin(dataSet2).where(dataSet1.f0)
+                            .equalTo(dataSet2.f0)
+                            .with(new JoinFunction<>(){
+                            // implement user-defined left outer join logic
+                            });
+                    // right outer join
+                    dataSet1.rightOuterJoin(dataSet2).where(dataSet1.f0)
+                            .equalTo(dataSet2.f0)
+                            .with(new JoinFunction<>(){
+                            // implement user-defined right outer join logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream1 = // [...]
+                    DataStream<Tuple2<String, Integer>> dataStream2 = // [...]
+                    // left outer join
+                    dataStream1.coGroup(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply((leftIterable, rightInterable, collector) 
-> {
+                                if(!rightInterable.iterator().hasNext()){
+                                // implement user-defined left outer join logic
+                                }
+                            });
+                    // right outer join
+                    dataStream1.coGroup(dataStream2)
+                            .where(value -> value.f0)
+                            .equalTo(value -> value.f0)
+                            .window(GlobalWindows.create())
+                            .trigger(new EOFTrigger())
+                            .apply((leftIterable, rightInterable, collector) 
-> {
+                                if(!leftIterable.iterator().hasNext()){
+                                // implement user-defined right outer join 
logic
+                                }
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+### Category 3
+
+For category 3, these DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. Additional 
+calculation steps will be added.
+
+To collect records from each subtask, it is necessary to assign a unique 
subtask ID to each record and group them accordingly within the window. The 
+following code snippet illustrates how to assign a subtask ID to each record, 
and will be used in the DataStream examples in the subsequent sections. 
+Here is an example code.
+
+```java
+// assign subtask ID to all records
+public class AddSubtaskIDMapFunction extends RichMapFunction<Integer, 
Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> map(Integer value) {
+        return 
Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
+    }
+}

Review Comment:
   It might be better to write as follows.
   
   ```java
       public static class AddSubtaskIDMapFunction<T> extends 
RichMapFunction<T, Tuple2<String, T>> {
   
           @Override
           public Tuple2<String, T> map(T value) {
               return 
Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
           }
       }
   ```



##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,746 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    ExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    ExecutionEnvironment.createLocalEnvironment();
+                    // Create the collection environment
+                    new CollectionEnvironment();
+                    // Create the remote environment
+                    ExecutionEnvironment.createRemoteEnvironment(String host, 
int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Create the execution environment
+                    StreamExecutionEnvironment.getExecutionEnvironment();
+                    // Create the local execution environment
+                    StreamExecutionEnvironment.createLocalEnvironment();
+                    // The collection environment is not supported.
+                    // Create the remote environment
+                    StreamExecutionEnvironment.createRemoteEnvironment(String 
host, int port, String... jarFiles);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+As the source of DataSet is always bounded, the execution mode must be set to 
RuntimeMode.BATCH to make Flink execute in batch mode.
+
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Using the streaming sources and sinks
+
+### Sources
+
+The DataStream API uses `DataStreamSource` to read records from external 
system, while the DataSet API uses the `DataSource`.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Read data from file
+                    DataSource<> source = 
ExecutionEnvironment.readFile(FileInputFormat<> inputFormat, String filePath);
+                    // Read data from collection
+                    DataSource<> source = 
ExecutionEnvironment.fromCollection(Collection<> data);
+                    // Read data from inputformat
+                    DataSource<> source = 
ExecutionEnvironment.createInput(InputFormat<> inputFormat)
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Read data from file
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.readFile(FileInputFormat<> inputFormat, String 
filePath);
+                    // Read data from collection
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.fromCollection(Collection<> data);
+                    // Read data from inputformat
+                    DataStreamSource<> source = 
StreamExecutionEnvironment.createInput(InputFormat<> inputFormat)
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+### Sinks
+
+The DataStream API uses the implementations of `DataStreamSink` to write 
records to external system, while the
+DataSet API uses the `DataSink`.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>
+                {{< highlight "java" >}}
+                    // Write to outputformat
+                    DataSink<> sink = dataSet.output(OutputFormat<> 
outputFormat);
+                    // Write to csv file
+                    DataSink<> sink = dataSet.writeAsCsv(String filePath);
+                    // Write to text file
+                    DataSink<> sink = dataSet.writeAsText(String filePath);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    // Write to sink function or sink
+                    DataStreamSink<> sink = dataStream.addSink(SinkFunction<> 
sinkFunction)
+                    DataStreamSink<> sink = 
dataStream.sinkTo(org.apache.flink.api.connector.sink.Sink<> sink)
+                    // Write to csv file
+                    DataStreamSink<> sink = dataStream.writeAsCsv(String path);
+                    // Write to text file
+                    DataStreamSink<> sink = dataStream.writeAsText(String 
path);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+If you are looking for pre-defined source and sink connectors of DataStream, 
please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" 
>}})
+
+
+## Implement the DataSet API by DataStream
+
+### Category 1
+
+For Category 1, the usage of the API in DataStream is almost identical to that 
in DataSet. This means that implementing these 
+DataSet APIs by the DataStream API is relatively straightforward and does not 
require significant modifications or complexity
+in the job code.
+
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Map</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.map(new MapFunction<>(){
+                    // implement user-defined map logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.map(new MapFunction<>(){
+                    // implement user-defined map logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>FlatMap</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.flatMap(new FlatMapFunction<>(){
+                        // implement user-defined flatmap logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.flatMap(new FlatMapFunction<>(){
+                        // implement user-defined flatmap logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Filter</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.filter(new FilterFunction<>(){
+                        // implement user-defined filter logic
+                    });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.filter(new FilterFunction<>(){
+                        // implement user-defined filter logic
+                    });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Union</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet1.union(dataSet2);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream1.union(dataStream2);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Rebalance</td>
+            <td>
+                {{< highlight "java" >}}
+                    dataSet.rebalance();
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    dataStream.rebalance();
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Project</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple3<Integer, Double, String>> dataSet = // [...]
+                    dataSet.project(2,0);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple3<Integer, Double, String>> dataStream = 
// [...]
+                    dataStream.project(2,0);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Reduce on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    dataSet.groupBy(value -> value.f0)
+                            .reduce(new ReduceFunction<>(){
+                                // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    dataStream.keyBy(value -> value.f0)
+                            .reduce(new ReduceFunction<>(){
+                                // implement user-defined reduce logic
+                            });
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Aggregate on Grouped DataSet</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    // compute sum of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(SUM, 1);
+                    // compute min of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(MIN, 1);
+                    // compute max of the second field
+                    dataSet.groupBy(value -> value.f0).aggregate(MAX, 1);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    // compute sum of the second field
+                    dataStream.keyBy(value -> value.f0).sum(1);
+                    // compute min of the second field
+                    dataStream.keyBy(value -> value.f0).min(1);
+                    // compute max of the second field
+                    dataStream.keyBy(value -> value.f0).max(1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+    </tbody>
+</table>
+
+
+### Category 2
+
+For category 2, these DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior.
+The developers need to adapt their code to accommodate these variations, which 
introduces additional complexity.
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left">Operations</th>
+            <th class="text-left">DataSet</th>
+            <th class="text-left">DataStream</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>Distinct</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Integer> dataSet = // [...]
+                    dataSet.distinct();
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Integer> dataStream = // [...]
+                    dataStream.keyBy(value -> value)
+                            .reduce((value1, value2) -> value1);
+                {{< /highlight >}}
+            </td>
+        </tr>
+        <tr>
+            <td>Hash-Partition</td>
+            <td>
+                {{< highlight "java" >}}
+                    DataSet<Tuple2<String, Integer>> dataSet = // [...]
+                    dataSet.partitionByHash(value -> value.f0);
+                {{< /highlight >}}
+            </td>
+            <td>
+                {{< highlight "java" >}}
+                    DataStream<Tuple2<String, Integer>> dataStream = // [...]
+                    // partition by the hashcode of key
+                    dataStream.partitionCustom((key, numSubpartition) -> 
key.hashCode() % numSubpartition, value -> value.f0);

Review Comment:
   Let's reduce the max length of the example code lines. We can do the 
following:
   - Reduce indentation. For example, change
   ```java
       dataStream1.union(dataStream2);
   ```
   into
   ```java
   dataStream1.union(dataStream2);
   ```
   - Wrap lines. For example, change
   ```java
   dataStream.partitionCustom((key, numSubpartition) -> key.hashCode() % 
numSubpartition, value -> value.f0);
   ```
   into
   ```java
   dataStream.partitionCustom(
               (key, numSubpartition) -> key.hashCode() % numSubpartition,
               value -> value.f0);
   ```
   
   There is no strict criteria on how short we should trim each code line. A 
shorter line could decrease the width of the tables in this document and reduce 
chances where users need to slip left and right to view the whole line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to