1. why are you doing join instead of something like
System.currentTimeInMillis()? at the end you have tuple of your data with
timestamp anyways...so why just not to wrap you data in tuple2 with
additional info of creation ts?

2. are you sure that consumer/producer machines' clocks are in sync?
you can use ntp for this.

On 2 May 2016 at 20:02, Robert Schmidtke <ro.schmid...@gmail.com> wrote:

> Hi everyone,
>
> I have implemented a way to measure latency in a DataStream (I hope): I'm
> consuming a Kafka topic and I'm union'ing the resulting stream with a
> custom source that emits a (machine-local) timestamp every 1000ms (using
> currentTimeMillis). On the consuming end I'm distinguishing between the
> Kafka events and the timestamps. When encountering a timestamp, I take the
> difference of the processing machine's local time and the timestamp found
> in the stream, expecting a positive difference (with the processing
> machine's timestamp being larger than the timestamp found in the stream).
> However, the opposite is the case. Now I am wondering about when events are
> actually processed.
>
> Union the Stream from Kafka+my custom source, batching them in 10s windows
> (which is what I do), I expect 10 timestamps with ascending values and a
> rough gap of 1000ms in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>
> On the receiving end I again take the currentTimeMillis in my fold
> function, expecting the resulting value to be larger (most of the time)
> than the timestamps encountered in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>
> The system clocks are in sync up to 1ms.
>
> Maybe I am not clear about when certain timestamps are created (i.e. when
> the UDFs are invoked) or how windows are processed. Any advice is greatly
> appreciated, also alternative approaches to calculating latency.
>
> I'm on Flink 0.10.2 by the way.
>
> Thanks in advance for the help!
>
> Robert
>
> --
> My GPG Key ID: 336E2680
>

Reply via email to