[jira] [Updated] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-06-29 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31753:
-
Issue Type: Improvement  (was: Bug)

> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
> records from each stream under different modes. The co-group function is 
> chosen to be very lightweight so that benchmark is dominated by the Flink's 
> co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode 
> so that users' don't have to take big regression in order to migrate from 
> DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some 
> algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet> data1 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet> data2 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> data1.coGroup(data2)
> .where((KeySelector, Integer>) tuple 
> -> tuple.f0)
> .equalTo((KeySelector, Integer>) 
> tuple -> tuple.f0)
> .with(
> new RichCoGroupFunction<
> Tuple3,
> Tuple3,
> Integer>() {
> @Override
> public void open(Configuration parameters) throws 
> Exception {
> super.open(parameters);
> }
> @Override
> public void close() throws Exception {
> super.close();
> }
> @Override
> public void coGroup(
> Iterable> 
> iterable,
> Iterable> 
> iterable1,
> Collector collector)
> throws Exception {
> collector.collect(1);
> }
> })
> .write(new CountingAndDiscardingSink(), "/tmp");
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31753:
---
Labels: pull-request-available  (was: )

> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
> records from each stream under different modes. The co-group function is 
> chosen to be very lightweight so that benchmark is dominated by the Flink's 
> co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode 
> so that users' don't have to take big regression in order to migrate from 
> DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some 
> algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet> data1 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet> data2 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> data1.coGroup(data2)
> .where((KeySelector, Integer>) tuple 
> -> tuple.f0)
> .equalTo((KeySelector, Integer>) 
> tuple -> tuple.f0)
> .with(
> new RichCoGroupFunction<
> Tuple3,
> Tuple3,
> Integer>() {
> @Override
> public void open(Configuration parameters) throws 
> Exception {
> super.open(parameters);
> }
> @Override
> public void close() throws Exception {
> super.close();
> }
> @Override
> public void coGroup(
> Iterable> 
> iterable,
> Iterable> 
> iterable1,
> Collector collector)
> throws Exception {
> collector.collect(1);
> }
> })
> .write(new CountingAndDiscardingSink(), "/tmp");
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31753:
-
Description: 
DataSet has been deprecated and will be removed from Flink. However, DataStream 
CoCroup is still considerably slower than DataSet when co-grouping two bounded 
streams.

Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
records from each stream under different modes. The co-group function is chosen 
to be very lightweight so that benchmark is dominated by the Flink's co-group 
overhead.

DataSet: 5.6 sec
DataStream batch mode: 15.4 sec
DataStream stream mode with rocksdb: 81 sec

We should be able to performance co-group operation in DataStream stream mode 
so that users' don't have to take big regression in order to migrate from 
DataSet to DataStream.

We will first add util function in Flink ML to unblock the migration of some 
algorithms from Alink to Flink ML.

Here is the code used to benchmark DataSet's CoGroup.

{code:java}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().disableGenericTypes();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);

DataSet> data1 =
env.fromCollection(
new DataGenerator(numRecords),
Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
DataSet> data2 =
env.fromCollection(
new DataGenerator(numRecords),
Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));

data1.coGroup(data2)
.where((KeySelector, Integer>) tuple 
-> tuple.f0)
.equalTo((KeySelector, Integer>) tuple 
-> tuple.f0)
.with(
new RichCoGroupFunction<
Tuple3,
Tuple3,
Integer>() {

@Override
public void open(Configuration parameters) throws Exception 
{
super.open(parameters);
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public void coGroup(
Iterable> iterable,
Iterable> 
iterable1,
Collector collector)
throws Exception {
collector.collect(1);
}
})
.write(new CountingAndDiscardingSink(), "/tmp");
{code}



  was:
DataSet has been deprecated and will be removed from Flink. However, DataStream 
CoCroup is still considerably slower than DataSet when co-grouping two bounded 
streams.

Here is the benchmark result of co-grouping two bounded stream with 4*10^6 
records from each stream. The co-group function is chosen to be very 
lightweight so that we only 







> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
> records from each stream under different modes. The co-group function is 
> chosen to be very lightweight so that benchmark is dominated by the Flink's 
> co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode 
> so that users' don't have to take big regression in order to migrate from 
> DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some 
> algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet> data1 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet> data2 =
> env.fromCollection(
> new DataGenerator(numRecords),
> 

[jira] [Updated] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31753:
-
Description: 
DataSet has been deprecated and will be removed from Flink. However, DataStream 
CoCroup is still considerably slower than DataSet when co-grouping two bounded 
streams.

Here is the benchmark result of co-grouping two bounded stream with 4*10^6 
records from each stream. The co-group function is chosen to be very 
lightweight so that we only 






> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here is the benchmark result of co-grouping two bounded stream with 4*10^6 
> records from each stream. The co-group function is chosen to be very 
> lightweight so that we only 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)