[ https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aviem Zur reassigned BEAM-848: ------------------------------ Assignee: Aviem Zur (was: Amit Sela) > A better shuffle after reading from within mapWithState. > -------------------------------------------------------- > > Key: BEAM-848 > URL: https://issues.apache.org/jira/browse/BEAM-848 > Project: Beam > Issue Type: Improvement > Components: runner-spark > Reporter: Amit Sela > Assignee: Aviem Zur > > The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and > this stateful operation will be followed by a shuffle: > https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159 > Since the stateful read maps "splitSource" -> "partition of a list of read > values", the following shuffle won't benefit in any way (the list of read > values has not been flatMapped yet). In order to avoid shuffle we need to set > the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default > {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and > will skip shuffle if the partitioners match. > It would be wise to shuffle the read values _after_ flatmap. > I will break this into two tasks: > # Set default-partitioner to the input RDD. > # Shuffle (using Coders) the input. -- This message was sent by Atlassian JIRA (v6.3.15#6346)