Hello. I have a current Spark 1.6.1 application that I am working to modify. The flow of the application looks something like the following:
(Kafka) --> (Direct Stream Receiver) --> (Repartition) --> (Extract/Schemitization Logic w/ RangePartitioner) --> Several Output Operations In the 'extract' segment above I have the following code: def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String], defaultPartitions: Int): DStream[SchematizedData] = { val messages: DStream[String] = kafkaStreamFactory.create(topics) val transformed = messages.map(Schematize(_)) val result = enriched .map(event => { (event.targetEntity, event) }) .transform(rdd => rdd.partitionBy(new RangePartitioner[String, SchematizedData](defaultPartitions, rdd))) .map(x => x._2) result } This code executes well. Each follow-on output operation (job) consumes from a transform 'partitionBy' ShuffledRDD. The result is that my 2-minute batches are consumed in roughly 48 seconds (streaming batch interval), w/ the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds (so around 78 seconds total). I have 'concurrent jobs' set to 4. The 'read from kafka' and 'extract' for a given set of topics is a single job (two stages), and all of the various output operations follow on after completion. I am aiming to remove the 'RangePartitioner' for two reasons: 1. We do not need to re-partition here. We're already calling stream.repartition in the Kafka Stream Factory. 2. It appears to be causing issues w/ failure to recompute when nodes go down. When I remove the 'RangePartitioner code, I have the following: def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String], defaultPartitions: Int): DStream[SchematizedData] = { val messages: DStream[String] = kafkaStreamFactory.create(topics) val transformed = messages.map(Schematize(_)) val result = enriched.repartition(defaultPartitions) result } The resulting code, with no other changes takes around 9 minutes to complete a single batch. I am having trouble determining why these have such a significant performance difference. Looking at the partitionBy code, it is creating a separate ShuffledRDD w/in the transform block. The 'stream.repartition' call is equivalent to stream.transform(_.repartition(partitions)), which calls coalesce (shuffle=true), which creates a new ShuffledRDD with a HashPartitioner. These calls appear functionally equivelent - I am having trouble coming up with a justification for the significant performance differences between calls. Help? Regards, Bryan Jeffrey