+user to potentially help others

Cody,

Thanks for calling out isEmpty, I didn’t realize that it was so dangerous. 
Taking that out and just reusing the count has eliminated the issue, and now 
the cluster is happily eating 400,000 record batches.

For completeness’ sake: I am using the direct stream API.

From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Saturday, October 31, 2015 2:00 PM
To: YOUNG, MATTHEW, T (Intel Corp) <matthew.t.yo...@intel.com>
Subject: Re: Very slow performance on very small record counts

Have you looked at jstack or the thread dump from the spark ui during that time 
to see what's happening?

Are you using receiver based stream or direct stream?

The only odd thing I notice about your code is that you're calling isEmpty, 
which will do a take(), which can end up scheduling multiple times if it 
initially grabs empty partitions.  You're counting the rdd anyway, so why not 
just do count() first?



On Fri, Oct 30, 2015 at 5:38 PM, Young, Matthew T 
<matthew.t.yo...@intel.com<mailto:matthew.t.yo...@intel.com>> wrote:
In a job I am writing I have encountered impossibly poor performance with Spark 
Streaming 1.5.1. The environment is three 16 core/32 GB RAM VMs

The job involves parsing 600 bytes or so of JSON (per record) from Kafka, 
extracting two values from the JSON, doing some aggregation and averages, and 
writing a handful of summary results back to Kafka each two-second batch.

The issue is that Spark seems to be hitting a hard minimum of 4 seconds to 
process each batch, even a batch with as few as 192 records in it!

When I check the Web UI for such a batch I see a proper distribution of offsets 
(each worker gets < 10 records) and four jobs for the batch. Three of the jobs 
are completed very quickly (as I would expect), but one job essentially 
dominates the 4 seconds. This WebUI screenshot is presented in the attachment 
“Spark Idle Time 2.png”.

When I drill down into that job and look at the event timeline I see very odd 
behavior. The duration for the longest task is only ~0.3 s, and there is nice 
parallelism. What seems to be happening is right at the start of the job there 
is a few milliseconds of deserialization, followed by almost 4s(!) of doing 
absolutely nothing, followed by a few hundred milliseconds where the actual 
processing is taking place. This WebUI screenshot is presented in the 
attachment “Spark Idle Time.png”

What can cause this delay where Spark does nothing (or reports doing nothing) 
for so much time? I have included the code corresponding to the foreachRDD that 
is triggering this 4 second job below.

Thank you for your time,

-- Matthew


    // Transmit Kafka config to all workers so they can write back as necessary
    val broadcastBrokers = ssc.sparkContext.broadcast(brokerList)
    val broadcastZookeeper = ssc.sparkContext.broadcast(zookeeper)

    // Define the task for Spark Streaming to do
    messages.foreachRDD(sourceStream => {

      // Parse the JSON into a more usable structure
      val parsed = sourceStream.map(y => parse(y._2))

      val GeosRUSECs = parsed.map {
        x => ((x \ "geoip").extract[String](DefaultFormats, manifest[String]), 
((x \ "rusec").extract[Long](DefaultFormats, manifest[Long]), 1L))
      }.cache
      if (!GeosRUSECs.isEmpty) {
        val totalRUSEC = GeosRUSECs.map(x => (x._2._1)).reduce(_ + _)
        val avgRUSEC = totalRUSEC / GeosRUSECs.count.toDouble

        if (!avgRUSEC.isNaN && !avgRUSEC.isInfinite) {
          // Acquire local Kafka connection
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, 
broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", 
avgRUSEC.toString))
        }

        // Wait times for each geo with total wait and number of queries
        val GeosWaitsCounts = GeosRUSECs.reduceByKey((x, y) => (x._1 + y._1, 
x._2 + y._2))

        val avgRespPerGeo = GeosWaitsCounts.map { case (geo, (totWait, 
numQueries)) => (geo, totWait.toDouble / numQueries) }

        avgRespPerGeo.foreach { geoInfo =>
          val producer = kafkaWriter.getProducer(broadcastBrokers.value, 
broadcastZookeeper.value)
          producer.send(new KeyedMessage[String, String]("SparkRUSECout", 
geoInfo._1 + " average RUSEC: " + geoInfo._2))
        }
      }
    })



---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>

Reply via email to