Streaming Batch Oddities

2016-12-13 Thread Bryan Jeffrey
Hello.

I have a current Spark 1.6.1 application that I am working to modify.  The
flow of the application looks something like the following:

(Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
(Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
Operations

In the 'extract' segment above I have the following code:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
val messages: DStream[String] = kafkaStreamFactory.create(topics)
val transformed = messages.map(Schematize(_))
val result = enriched
.map(event => { (event.targetEntity, event) })
.transform(rdd => rdd.partitionBy(new RangePartitioner[String,
SchematizedData](defaultPartitions, rdd)))
.map(x => x._2)

 result
}

This code executes well.  Each follow-on output operation (job) consumes
from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
batches are consumed in roughly 48 seconds (streaming batch interval), w/
the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
(so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
'read from kafka' and 'extract' for a given set of topics is a single job
(two stages), and all of the various output operations follow on after
completion.

I am aiming to remove the 'RangePartitioner' for two reasons:
  1. We do not need to re-partition here. We're already calling
stream.repartition in the Kafka Stream Factory.
  2. It appears to be causing issues w/ failure to recompute when nodes go
down.

When I remove the 'RangePartitioner code, I have the following:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
val messages: DStream[String] = kafkaStreamFactory.create(topics)
val transformed = messages.map(Schematize(_))
val result = enriched.repartition(defaultPartitions)

 result
}

The resulting code, with no other changes takes around 9 minutes to
complete a single batch. I am having trouble determining why these have
such a significant performance difference.  Looking at the partitionBy
code, it is creating a separate ShuffledRDD w/in the transform block.  The
'stream.repartition' call is equivalent to
stream.transform(_.repartition(partitions)), which calls coalesce
(shuffle=true), which creates a new ShuffledRDD with a HashPartitioner.
These calls appear functionally equivelent - I am having trouble coming up
with a justification for the significant performance differences between
calls.

Help?

Regards,

Bryan Jeffrey


Re: Streaming Batch Oddities

2016-12-13 Thread Bryan Jeffrey
All,

Any thoughts?  I can run another couple of experiments to try to narrow the
problem.  The total data volume in the repartition is around 60GB / batch.

Regards,

Bryan Jeffrey

On Tue, Dec 13, 2016 at 12:11 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have a current Spark 1.6.1 application that I am working to modify.  The
> flow of the application looks something like the following:
>
> (Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
> (Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
> Operations
>
> In the 'extract' segment above I have the following code:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
> val messages: DStream[String] = kafkaStreamFactory.create(topics)
> val transformed = messages.map(Schematize(_))
> val result = enriched
> .map(event => { (event.targetEntity, event) })
> .transform(rdd => rdd.partitionBy(new RangePartitioner[String,
> SchematizedData](defaultPartitions, rdd)))
> .map(x => x._2)
>
>  result
> }
>
> This code executes well.  Each follow-on output operation (job) consumes
> from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
> batches are consumed in roughly 48 seconds (streaming batch interval), w/
> the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
> (so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
> 'read from kafka' and 'extract' for a given set of topics is a single job
> (two stages), and all of the various output operations follow on after
> completion.
>
> I am aiming to remove the 'RangePartitioner' for two reasons:
>   1. We do not need to re-partition here. We're already calling
> stream.repartition in the Kafka Stream Factory.
>   2. It appears to be causing issues w/ failure to recompute when nodes go
> down.
>
> When I remove the 'RangePartitioner code, I have the following:
>
> def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
> defaultPartitions: Int): DStream[SchematizedData] = {
> val messages: DStream[String] = kafkaStreamFactory.create(topics)
> val transformed = messages.map(Schematize(_))
> val result = enriched.repartition(defaultPartitions)
>
>  result
> }
>
> The resulting code, with no other changes takes around 9 minutes to
> complete a single batch. I am having trouble determining why these have
> such a significant performance difference.  Looking at the partitionBy
> code, it is creating a separate ShuffledRDD w/in the transform block.  The
> 'stream.repartition' call is equivalent to 
> stream.transform(_.repartition(partitions)),
> which calls coalesce (shuffle=true), which creates a new ShuffledRDD with a
> HashPartitioner.  These calls appear functionally equivelent - I am having
> trouble coming up with a justification for the significant performance
> differences between calls.
>
> Help?
>
> Regards,
>
> Bryan Jeffrey
>
>