Hi Daniele,

A pragmatic approach to do that would be to execute the computations in the
scope of a foreachRDD, surrounded by wall-clock timers.
For example:
dstream.foreachRDD{ rdd =>
   val t0 = System.currentTimeMillis()
   val aggregates = rdd.<your-operations>
   // make sure you get a result here, not another RDD.
   // Otherwise you need to do something like rdd.count to materialize it
   val elapsedTime = System.currentTimeMillis() - t0
   println(s"operation took $elapsedTime")
}

In the end, this will result in the same performance as the batch spark
engine, so you might want to check the performance there first.
If you want to add the stream ingestion time to this, it gets a bit more
tricky.

kind regards, Gerard.



On Tue, Jun 26, 2018 at 11:49 AM Daniele Foroni <daniele.for...@gmail.com>
wrote:

> Hi all,
>
> I am using spark streaming and I need to evaluate the latency of the
> standard aggregations (avg, min, max, …) provided by the spark APIs.
> Any way to do it in the code?
>
> Thanks in advance,
> ---
> Daniele
>
>

Reply via email to