Hi Aljoscha, ok, now I at least understand, why it works with fromElements(...). For the rest I am not so sure.
> What this means in your case is that the watermark can only advance if a new element arrives, because only then is the watermark updated. But new elements arrive all the time, about 50/s, or do you mean something else? getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok choice, if i understand the semantics correctly. It just affects watermarking in the absence of events, right? Cheers, Konstantin On 16.11.2015 12:31, Aljoscha Krettek wrote: > Hi, > it could be what Gyula mentioned. Let me first go a bit into how the > TimestampExtractor works internally. > > First, the timestamp extractor internally keeps the value of the last emitted > watermark. Then, the semantics of the TimestampExtractor are as follows : > - the result of extractTimestamp is taken and it replaces the internal > timestamp of the element > - if the result of extractWatermark is larger than the last watermark the > new value is emitted as a watermark and the value is stored > - getCurrentWatermark is called on the specified auto-watermark interval, if > the returned value is larger than the last watermark it is emitted and stored > as last watermark > > What this means in your case is that the watermark can only advance if a new > element arrives, because only then is the watermark updated. > > The reason why you see results if you use fromElements is that the > window-operator also emits all the windows that it currently has buffered if > the program closes. This happens in the case of fromElements because only a > finite number of elements is emitted, after which the source closes, thereby > finishing the whole program. > > Cheers, > Aljoscha >> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote: >> >> Could this part of the extractor be the problem Aljoscha? >> >> @Override >> public long getCurrentWatermark() { >> return Long.MIN_VALUE; >> } >> >> Gyula >> >> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. >> nov. 16., H, 10:39): >> Hi Aljoscha, >> >> thanks for your answer. Yes I am using the same TimestampExtractor-Class. >> >> The timestamps look good to me. Here an example. >> >> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 >> >> The order now is >> >> stream >> .map(dummyMapper) >> .assignTimestamps(...) >> .timeWindow(...) >> >> Is there a way to print out the assigned timestamps after >> stream.assignTimestamps(...)? >> >> Cheers, >> >> Konstantin >> >> >> On 16.11.2015 10:31, Aljoscha Krettek wrote: >>> Hi, >>> are you also using the timestamp extractor when you are using >>> env.fromCollection(). >>> >>> Could you maybe insert a dummy mapper after the Kafka source that just >>> prints the element and forwards it? To see if the elements come with a good >>> timestamp from Kafka. >>> >>> Cheers, >>> Aljoscha >>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.kn...@tngtech.com> >>>> wrote: >>>> >>>> Hi everyone, >>>> >>>> I have the following issue with Flink (0.10) and Kafka. >>>> >>>> I am using a very simple TimestampExtractor like [1], which just >>>> extracts a millis timestamp from a POJO. In my streaming job, I read in >>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: >>>> >>>> stream = env.addSource(new FlinkKafkaConsumer082< >>>> (parameterTool.getRequired("topic"), >>>> new AvroPojoDeserializationSchema(), >>>> parameterTool.getProperties())) >>>> >>>> I have timestampEnabled() and the TimeCharacteristics are EventTime, >>>> AutoWatermarkIntervall is 500. >>>> >>>> The problem is, when I do something like: >>>> >>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >>>> .sum(..) >>>> .print() >>>> >>>> env.execute(); >>>> >>>> the windows never get triggered. >>>> >>>> If I use ProcessingTime it works. >>>> If I use env.fromCollection(...) instead of the KafkaSource it works >>>> with EventTime, too. >>>> >>>> Any ideas what I could be doing wrong are highly appreciated. >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> [1]: >>>> >>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> { >>>> >>>> final private long maxDelay; >>>> >>>> public PojoTimestampExtractor(long maxDelay) { >>>> this.maxDelay = maxDelay; >>>> } >>>> >>>> @Override >>>> public long extractTimestamp(Pojo fightEvent, long l) { >>>> return pojo.getTime(); >>>> } >>>> >>>> @Override >>>> public long extractWatermark(Pojo pojo, long l) { >>>> return pojo.getTime() - maxDelay; >>>> } >>>> >>>> @Override >>>> public long getCurrentWatermark() { >>>> return Long.MIN_VALUE; >>>> } >>> >>> >> >> -- >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082