All,

Any thoughts?  I can run another couple of experiments to try to narrow the
problem.  The total data volume in the repartition is around 60GB / batch.

Regards,

Bryan Jeffrey

On Tue, Dec 13, 2016 at 12:11 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Hello.
>
> I have a current Spark 1.6.1 application that I am working to modify.  The
> flow of the application looks something like the following:
>
> (Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
> (Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
> Operations
>
> In the 'extract' segment above I have the following code:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
>     val messages: DStream[String] = kafkaStreamFactory.create(topics)
>     val transformed = messages.map(Schematize(_))
>     val result = enriched
>         .map(event => { (event.targetEntity, event) })
>         .transform(rdd => rdd.partitionBy(new RangePartitioner[String,
> SchematizedData](defaultPartitions, rdd)))
>         .map(x => x._2)
>
>      result
> }
>
> This code executes well.  Each follow-on output operation (job) consumes
> from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
> batches are consumed in roughly 48 seconds (streaming batch interval), w/
> the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
> (so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
> 'read from kafka' and 'extract' for a given set of topics is a single job
> (two stages), and all of the various output operations follow on after
> completion.
>
> I am aiming to remove the 'RangePartitioner' for two reasons:
>   1. We do not need to re-partition here. We're already calling
> stream.repartition in the Kafka Stream Factory.
>   2. It appears to be causing issues w/ failure to recompute when nodes go
> down.
>
> When I remove the 'RangePartitioner code, I have the following:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
>     val messages: DStream[String] = kafkaStreamFactory.create(topics)
>     val transformed = messages.map(Schematize(_))
>     val result = enriched.repartition(defaultPartitions)
>
>      result
> }
>
> The resulting code, with no other changes takes around 9 minutes to
> complete a single batch. I am having trouble determining why these have
> such a significant performance difference.  Looking at the partitionBy
> code, it is creating a separate ShuffledRDD w/in the transform block.  The
> 'stream.repartition' call is equivalent to 
> stream.transform(_.repartition(partitions)),
> which calls coalesce (shuffle=true), which creates a new ShuffledRDD with a
> HashPartitioner.  These calls appear functionally equivelent - I am having
> trouble coming up with a justification for the significant performance
> differences between calls.
>
> Help?
>
> Regards,
>
> Bryan Jeffrey
>
>

Reply via email to