All, Any thoughts? I can run another couple of experiments to try to narrow the problem. The total data volume in the repartition is around 60GB / batch.
Regards, Bryan Jeffrey On Tue, Dec 13, 2016 at 12:11 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Hello. > > I have a current Spark 1.6.1 application that I am working to modify. The > flow of the application looks something like the following: > > (Kafka) --> (Direct Stream Receiver) --> (Repartition) --> > (Extract/Schemitization Logic w/ RangePartitioner) --> Several Output > Operations > > In the 'extract' segment above I have the following code: > > def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String], > defaultPartitions: Int): DStream[SchematizedData] = { > val messages: DStream[String] = kafkaStreamFactory.create(topics) > val transformed = messages.map(Schematize(_)) > val result = enriched > .map(event => { (event.targetEntity, event) }) > .transform(rdd => rdd.partitionBy(new RangePartitioner[String, > SchematizedData](defaultPartitions, rdd))) > .map(x => x._2) > > result > } > > This code executes well. Each follow-on output operation (job) consumes > from a transform 'partitionBy' ShuffledRDD. The result is that my 2-minute > batches are consumed in roughly 48 seconds (streaming batch interval), w/ > the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds > (so around 78 seconds total). I have 'concurrent jobs' set to 4. The > 'read from kafka' and 'extract' for a given set of topics is a single job > (two stages), and all of the various output operations follow on after > completion. > > I am aiming to remove the 'RangePartitioner' for two reasons: > 1. We do not need to re-partition here. We're already calling > stream.repartition in the Kafka Stream Factory. > 2. It appears to be causing issues w/ failure to recompute when nodes go > down. > > When I remove the 'RangePartitioner code, I have the following: > > def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String], > defaultPartitions: Int): DStream[SchematizedData] = { > val messages: DStream[String] = kafkaStreamFactory.create(topics) > val transformed = messages.map(Schematize(_)) > val result = enriched.repartition(defaultPartitions) > > result > } > > The resulting code, with no other changes takes around 9 minutes to > complete a single batch. I am having trouble determining why these have > such a significant performance difference. Looking at the partitionBy > code, it is creating a separate ShuffledRDD w/in the transform block. The > 'stream.repartition' call is equivalent to > stream.transform(_.repartition(partitions)), > which calls coalesce (shuffle=true), which creates a new ShuffledRDD with a > HashPartitioner. These calls appear functionally equivelent - I am having > trouble coming up with a justification for the significant performance > differences between calls. > > Help? > > Regards, > > Bryan Jeffrey > >