Hello.

I have a current Spark 1.6.1 application that I am working to modify.  The
flow of the application looks something like the following:

(Kafka) --> (Direct Stream Receiver) --> (Repartition) -->
(Extract/Schemitization Logic w/ RangePartitioner) --> Several Output
Operations

In the 'extract' segment above I have the following code:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
    val messages: DStream[String] = kafkaStreamFactory.create(topics)
    val transformed = messages.map(Schematize(_))
    val result = enriched
        .map(event => { (event.targetEntity, event) })
        .transform(rdd => rdd.partitionBy(new RangePartitioner[String,
SchematizedData](defaultPartitions, rdd)))
        .map(x => x._2)

     result
}

This code executes well.  Each follow-on output operation (job) consumes
from a transform 'partitionBy' ShuffledRDD.  The result is that my 2-minute
batches are consumed in roughly 48 seconds (streaming batch interval), w/
the 'RangePartitioner' stage not counted in my streaming job @ 30 seconds
(so around 78 seconds total).  I have 'concurrent jobs' set to 4.  The
'read from kafka' and 'extract' for a given set of topics is a single job
(two stages), and all of the various output operations follow on after
completion.

I am aiming to remove the 'RangePartitioner' for two reasons:
  1. We do not need to re-partition here. We're already calling
stream.repartition in the Kafka Stream Factory.
  2. It appears to be causing issues w/ failure to recompute when nodes go
down.

When I remove the 'RangePartitioner code, I have the following:

def getData(kafkaStreamFactory: KafkaStreamFactory, topics: Array[String],
defaultPartitions: Int): DStream[SchematizedData] = {
    val messages: DStream[String] = kafkaStreamFactory.create(topics)
    val transformed = messages.map(Schematize(_))
    val result = enriched.repartition(defaultPartitions)

     result
}

The resulting code, with no other changes takes around 9 minutes to
complete a single batch. I am having trouble determining why these have
such a significant performance difference.  Looking at the partitionBy
code, it is creating a separate ShuffledRDD w/in the transform block.  The
'stream.repartition' call is equivalent to
stream.transform(_.repartition(partitions)), which calls coalesce
(shuffle=true), which creates a new ShuffledRDD with a HashPartitioner.
These calls appear functionally equivelent - I am having trouble coming up
with a justification for the significant performance differences between
calls.

Help?

Regards,

Bryan Jeffrey

Reply via email to