Here's bit more details. This is the piece of logic is consuming 4 secs
according Spark log. Few anonymous functions - transformations - are trivial.
-jan
org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1444)
org.apache.kudu.spark.kudu.KuduContext.writeRows(KuduContext.scala:180)
org.apache.kudu.spark.kudu.KuduContext.upsertRows(KuduContext.scala:154)
com.kafka.consumer.SCC$.writeKudu(KafkaKuduConsumer.scala:227)
com.kafka.consumer.SCC$$anonfun$getSSContext$2.apply(KafkaKuduConsumer.scala:294)
com.kafka.consumer.SCC$$anonfun$getSSContext$2.apply(KafkaKuduConsumer.scala:275)
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
scala.util.Try$.apply(Try.scala:161)
org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
On 28 Mar 2017, at 20.38, Jan Holmberg
> wrote:
Thanks for swift response! My measurements are very informal :-) ie. I'm
looking at Spark's 'Streaming' log page that informs "processing time"
directly. That value is systematically 4 sec or 0,5 sec between different
program runs for all batches.
Prog is pretty simple so I do not expect that prog itself is causing the delay.
I'm suspecting two causes:
1) Kudu connection. I have not found a recent streaming example so I might have
some misunderstanding in my code. At the moment, I'm creating new KuduContext
for each batch (val kuduContext = new KuduContext("m1:7051")). Pointer to good
streaming example would definitively help.
2) Kafka connection. We've seen earlier that Spark executor locations have
impact on performance.
-jan
On 28 Mar 2017, at 19.16, Todd Lipcon
> wrote:
On Tue, Mar 28, 2017 at 8:24 AM, Jan Holmberg
> wrote:
I'm wondering the reason, why simple Spark prog. reading streaming data from
Kafka and writing result to Kudu, has unpredictable write times. In most cases,
when running the prog, write times are systematically 4 sec regardless of the
number of messages (anything from 50 to 2000 messages per batch). But
occasionally when starting the prog, it runs substantially faster where write
times are below 0,5 sec with exactly same code base, settings etc.
How are you measuring "write times" here? Are you sure the time is being spent
in the Kudu code and not in other parts of the streaming app?
Writing 2000 rows to Kudu should be on the order of a few milliseconds -- even
0.5 seconds sounds extremely high.
Are you by chance instantiating a new KuduClient each time you write a batch,
rather than reusing an existing one?
Our environment is plain AWS cluster with 3 slaves where each slave has Kafka
and Kudu tablet server instance with CDH 5.10 & Kudu 1.2 & Spark 1.6.
Any hints what to look at?
cheers,
-jan
--
Todd Lipcon
Software Engineer, Cloudera