[ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31753:
-----------------------------------
    Labels: pull-request-available  (was: )

> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-31753
>                 URL: https://issues.apache.org/jira/browse/FLINK-31753
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
> records from each stream under different modes. The co-group function is 
> chosen to be very lightweight so that benchmark is dominated by the Flink's 
> co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode 
> so that users' don't have to take big regression in order to migrate from 
> DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some 
> algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet<Tuple3<Integer, Integer, Double>> data1 =
>         env.fromCollection(
>                 new DataGenerator(numRecords),
>                 Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet<Tuple3<Integer, Integer, Double>> data2 =
>         env.fromCollection(
>                 new DataGenerator(numRecords),
>                 Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> data1.coGroup(data2)
>         .where((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple 
> -> tuple.f0)
>         .equalTo((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) 
> tuple -> tuple.f0)
>         .with(
>                 new RichCoGroupFunction<
>                         Tuple3<Integer, Integer, Double>,
>                         Tuple3<Integer, Integer, Double>,
>                         Integer>() {
>                     @Override
>                     public void open(Configuration parameters) throws 
> Exception {
>                         super.open(parameters);
>                     }
>                     @Override
>                     public void close() throws Exception {
>                         super.close();
>                     }
>                     @Override
>                     public void coGroup(
>                             Iterable<Tuple3<Integer, Integer, Double>> 
> iterable,
>                             Iterable<Tuple3<Integer, Integer, Double>> 
> iterable1,
>                             Collector<Integer> collector)
>                             throws Exception {
>                         collector.collect(1);
>                     }
>                 })
>         .write(new CountingAndDiscardingSink(), "/tmp");
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to