Hi Juan! This is an artifact of a workaround right now. The actual collect() logic happens in the flatMap() and the sink is a dummy that executes nothing. The flatMap writes the data to be collected to the "accumulator" that delivers it back.
Greetings, Stephan On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero < juan.jose.fumero.alfo...@oracle.com> wrote: > Hi, > I am starting with Flink. I have tried to look for the documentation but > I havent found it clear. > > I wonder the difference between these two states: > > FlatMap RUNNING vs DataSink RUNNIG. > > FlatMap is doing data any data transformation? Compilation? In which point > is actually executing the function provided in the MapFunction? How could I > know exactly the time for the kernel computation? > > It seems is using one thread in this step, even though I specified 16 > threads in the createLocalEnvironment. > > CHAIN DataSource (at applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched > to RUNNING > > Here is running only one thread for almost 35 seconds. > > The rest of the execution is very fast (less than one second for computing > the square of an array of 500000 integer elements) > > Thanks > Juan > > Here the full log. > > 06/29/2015 14:13:25 Job execution switched to status RUNNING. > 06/29/2015 14:13:25 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched > to SCHEDULED > 06/29/2015 14:13:25 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched > to DEPLOYING > 06/29/2015 14:13:26 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched > to RUNNING > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to SCHEDULED > 06/29/2015 14:14:01 CHAIN DataSource (at > applyFunction(ApplyFunction.java:96) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > applyFunction(ApplyFunction.java:108)) -> FlatMap (collect())(1/1) switched > to FINISHED > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to DEPLOYING > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to RUNNING > 06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to FINISHED > 06/29/2015 14:14:01 Job execution switched to status FINISHED. > > > >