Gyula Fora created FLINK-2824:
---------------------------------

             Summary: Iteration feedback partitioning does not work as expected
                 Key: FLINK-2824
                 URL: https://issues.apache.org/jira/browse/FLINK-2824
             Project: Flink
          Issue Type: Bug
          Components: Streaming
            Reporter: Gyula Fora
            Priority: Blocker


Iteration feedback partitioning is not handled transparently and can cause 
serious issues if the user does not know the specific implementation details of 
streaming iterations (which is not a realistic expectation).

Example:

IterativeStream it = ... (parallelism 1)
DataStream mapped = it.map(...) (parallelism 2)
// this does not work as the feedback has parallelism 2 != 1
// it.closeWith(mapped.partitionByHash(someField))
// so we need rebalance the data
it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))

This program will execute but the feedback will not be partitioned by hash to 
the mapper instances:
The partitioning will be set from the noOpMap to the iteration sink which has 
parallelism different from the mapper (1 vs 2) and then the iteration source 
forwards the element to the mapper (always to 0).

So the problem is basically that the iteration source/sink pair gets the 
parallelism of the input stream (p=1) not the head operator (p = 2) which leads 
to incorrect partitioning.

Workaround:
Set input parallelism to the same as the head operator

Suggested solution:
The iteration construction should be reworked to set the parallelism of the 
source/sink to the parallelism of the head operator (and validate that all 
heads have the same parallelism)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to