After fixing the clock issue on the application level, the latency is as
expected. Thanks again!

Robert

On Tue, May 3, 2016 at 9:54 AM, Robert Schmidtke <ro.schmid...@gmail.com>
wrote:

> Hi Igor, thanks for your reply.
>
> As for your first point I'm not sure I understand correctly. I'm ingesting
> records at a rate of about 50k records per second, and those records are
> fairly small. If I add a time stamp to each of them, I will have a lot more
> data, which is not exactly what I want. Instead I wanted to add something
> like a watermark once every second and only have a time stamp on this one,
> and calculate the latency from it.
>
> For your second point, in fact the clocks are up to 8s apart -.-" not sure
> how I missed this yesterday. as I'm not an admin of the machine I will
> request ntp to be set up.
>
> Thanks!
> Robert
>
>
>
> On Mon, May 2, 2016 at 10:19 PM, Igor Berman <igor.ber...@gmail.com>
> wrote:
>
>> 1. why are you doing join instead of something like
>> System.currentTimeInMillis()? at the end you have tuple of your data with
>> timestamp anyways...so why just not to wrap you data in tuple2 with
>> additional info of creation ts?
>>
>> 2. are you sure that consumer/producer machines' clocks are in sync?
>> you can use ntp for this.
>>
>> On 2 May 2016 at 20:02, Robert Schmidtke <ro.schmid...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I have implemented a way to measure latency in a DataStream (I hope):
>>> I'm consuming a Kafka topic and I'm union'ing the resulting stream with a
>>> custom source that emits a (machine-local) timestamp every 1000ms (using
>>> currentTimeMillis). On the consuming end I'm distinguishing between the
>>> Kafka events and the timestamps. When encountering a timestamp, I take the
>>> difference of the processing machine's local time and the timestamp found
>>> in the stream, expecting a positive difference (with the processing
>>> machine's timestamp being larger than the timestamp found in the stream).
>>> However, the opposite is the case. Now I am wondering about when events are
>>> actually processed.
>>>
>>> Union the Stream from Kafka+my custom source, batching them in 10s
>>> windows (which is what I do), I expect 10 timestamps with ascending values
>>> and a rough gap of 1000ms in the stream:
>>>
>>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>>>
>>> On the receiving end I again take the currentTimeMillis in my fold
>>> function, expecting the resulting value to be larger (most of the time)
>>> than the timestamps encountered in the stream:
>>>
>>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>>>
>>> The system clocks are in sync up to 1ms.
>>>
>>> Maybe I am not clear about when certain timestamps are created (i.e.
>>> when the UDFs are invoked) or how windows are processed. Any advice is
>>> greatly appreciated, also alternative approaches to calculating latency.
>>>
>>> I'm on Flink 0.10.2 by the way.
>>>
>>> Thanks in advance for the help!
>>>
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680

Reply via email to