After fixing the clock issue on the application level, the latency is as expected. Thanks again!
Robert On Tue, May 3, 2016 at 9:54 AM, Robert Schmidtke <ro.schmid...@gmail.com> wrote: > Hi Igor, thanks for your reply. > > As for your first point I'm not sure I understand correctly. I'm ingesting > records at a rate of about 50k records per second, and those records are > fairly small. If I add a time stamp to each of them, I will have a lot more > data, which is not exactly what I want. Instead I wanted to add something > like a watermark once every second and only have a time stamp on this one, > and calculate the latency from it. > > For your second point, in fact the clocks are up to 8s apart -.-" not sure > how I missed this yesterday. as I'm not an admin of the machine I will > request ntp to be set up. > > Thanks! > Robert > > > > On Mon, May 2, 2016 at 10:19 PM, Igor Berman <igor.ber...@gmail.com> > wrote: > >> 1. why are you doing join instead of something like >> System.currentTimeInMillis()? at the end you have tuple of your data with >> timestamp anyways...so why just not to wrap you data in tuple2 with >> additional info of creation ts? >> >> 2. are you sure that consumer/producer machines' clocks are in sync? >> you can use ntp for this. >> >> On 2 May 2016 at 20:02, Robert Schmidtke <ro.schmid...@gmail.com> wrote: >> >>> Hi everyone, >>> >>> I have implemented a way to measure latency in a DataStream (I hope): >>> I'm consuming a Kafka topic and I'm union'ing the resulting stream with a >>> custom source that emits a (machine-local) timestamp every 1000ms (using >>> currentTimeMillis). On the consuming end I'm distinguishing between the >>> Kafka events and the timestamps. When encountering a timestamp, I take the >>> difference of the processing machine's local time and the timestamp found >>> in the stream, expecting a positive difference (with the processing >>> machine's timestamp being larger than the timestamp found in the stream). >>> However, the opposite is the case. Now I am wondering about when events are >>> actually processed. >>> >>> Union the Stream from Kafka+my custom source, batching them in 10s >>> windows (which is what I do), I expect 10 timestamps with ascending values >>> and a rough gap of 1000ms in the stream: >>> >>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68 >>> >>> On the receiving end I again take the currentTimeMillis in my fold >>> function, expecting the resulting value to be larger (most of the time) >>> than the timestamps encountered in the stream: >>> >>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53 >>> >>> The system clocks are in sync up to 1ms. >>> >>> Maybe I am not clear about when certain timestamps are created (i.e. >>> when the UDFs are invoked) or how windows are processed. Any advice is >>> greatly appreciated, also alternative approaches to calculating latency. >>> >>> I'm on Flink 0.10.2 by the way. >>> >>> Thanks in advance for the help! >>> >>> Robert >>> >>> -- >>> My GPG Key ID: 336E2680 >>> >> >> > > > -- > My GPG Key ID: 336E2680 > -- My GPG Key ID: 336E2680