In general, avoid collect if you can. Collect brings data top the client,
where the computation is not parallel any more.

Try to do as much on the DataSet as possible.

On Mon, Jun 29, 2015 at 2:58 PM, Juan Fumero <
juan.jose.fumero.alfo...@oracle.com> wrote:

> Hi Stephan,
>   so should I use another method instead of collect? It seems
> multithread is not working with this.
>
>
> Juan
>
> On Mon, 2015-06-29 at 14:51 +0200, Stephan Ewen wrote:
> > Hi Juan!
> >
> >
> > This is an artifact of a workaround right now. The actual collect()
> > logic happens in the flatMap() and the sink is a dummy that executes
> > nothing. The flatMap writes the data to be collected to the
> > "accumulator" that delivers it back.
> >
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Mon, Jun 29, 2015 at 2:30 PM, Juan Fumero
> > <juan.jose.fumero.alfo...@oracle.com> wrote:
> >         Hi,
> >           I am starting with Flink. I have tried to look for the
> >         documentation but I havent found it clear.
> >
> >         I wonder the difference between these two states:
> >
> >         FlatMap RUNNING vs DataSink RUNNIG.
> >
> >         FlatMap is doing data any data transformation? Compilation? In
> >         which point is actually executing the function provided in the
> >         MapFunction? How could I know exactly the time for the kernel
> >         computation?
> >
> >         It seems is using one thread in this step, even though I
> >         specified 16 threads in the createLocalEnvironment.
> >
> >         CHAIN DataSource (at applyFunction(ApplyFunction.java:96)
> >         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
> >         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
> >         (collect())(1/1) switched to RUNNING
> >
> >         Here is running only one thread for almost 35 seconds.
> >
> >         The rest of the execution is very fast (less than one second
> >         for computing the square of an array of 500000 integer
> >         elements)
> >
> >         Thanks
> >         Juan
> >
> >         Here the full log.
> >
> >         06/29/2015 14:13:25 Job execution switched to status RUNNING.
> >         06/29/2015 14:13:25 CHAIN DataSource (at
> >         applyFunction(ApplyFunction.java:96)
> >         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
> >         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
> >         (collect())(1/1) switched to SCHEDULED
> >         06/29/2015 14:13:25 CHAIN DataSource (at
> >         applyFunction(ApplyFunction.java:96)
> >         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
> >         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
> >         (collect())(1/1) switched to DEPLOYING
> >         06/29/2015 14:13:26 CHAIN DataSource (at
> >         applyFunction(ApplyFunction.java:96)
> >         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
> >         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
> >         (collect())(1/1) switched to RUNNING
> >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
> >         SCHEDULED
> >         06/29/2015 14:14:01 CHAIN DataSource (at
> >         applyFunction(ApplyFunction.java:96)
> >         (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map
> >         (Map at applyFunction(ApplyFunction.java:108)) -> FlatMap
> >         (collect())(1/1) switched to FINISHED
> >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
> >         DEPLOYING
> >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
> >         RUNNING
> >         06/29/2015 14:14:01 DataSink (collect() sink)(1/1) switched to
> >         FINISHED
> >         06/29/2015 14:14:01 Job execution switched to status FINISHED.
> >
> >
> >
> >
> >
> >
>
>
>

Reply via email to