Hi Aljoscha,

ok, now I at least understand, why it works with fromElements(...). For
the rest I am not so sure.

> What this means in your case is that the watermark can only advance if
a new element arrives, because only then is the watermark updated.

But new elements arrive all the time, about 50/s, or do you mean
something else?

getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
choice, if i understand the semantics correctly. It just affects
watermarking in the absence of events, right?

Cheers,

Konstantin


On 16.11.2015 12:31, Aljoscha Krettek wrote:
> Hi,
> it could be what Gyula mentioned. Let me first go a bit into how the 
> TimestampExtractor works internally.
> 
> First, the timestamp extractor internally keeps the value of the last emitted 
> watermark. Then, the semantics of the TimestampExtractor are as follows :
>  - the result of extractTimestamp is taken and it replaces the internal 
> timestamp of the element
>  - if the result of extractWatermark is larger than the last watermark the 
> new value is emitted as a watermark and the value is stored
>  - getCurrentWatermark is called on the specified auto-watermark interval, if 
> the returned value is larger than the last watermark it is emitted and stored 
> as last watermark
> 
> What this means in your case is that the watermark can only advance if a new 
> element arrives, because only then is the watermark updated.
> 
> The reason why you see results if you use fromElements is that the 
> window-operator also emits all the windows that it currently has buffered if 
> the program closes. This happens in the case of fromElements because only a 
> finite number of elements is emitted, after which the source closes, thereby 
> finishing the whole program.
> 
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>> Could this part of the extractor be the problem Aljoscha?
>>
>> @Override
>>     public long getCurrentWatermark() {
>>         return Long.MIN_VALUE;
>>     }
>>
>> Gyula
>>
>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. 
>> nov. 16., H, 10:39):
>> Hi Aljoscha,
>>
>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>
>> The timestamps look good to me. Here an example.
>>
>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>
>> The order now is
>>
>> stream
>> .map(dummyMapper)
>> .assignTimestamps(...)
>> .timeWindow(...)
>>
>> Is there a way to print out the assigned timestamps after
>> stream.assignTimestamps(...)?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>> Hi,
>>> are you also using the timestamp extractor when you are using 
>>> env.fromCollection().
>>>
>>> Could you maybe insert a dummy mapper after the Kafka source that just 
>>> prints the element and forwards it? To see if the elements come with a good 
>>> timestamp from Kafka.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>>>> wrote:
>>>>
>>>> Hi everyone,
>>>>
>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>
>>>> I am using a very simple TimestampExtractor like [1], which just
>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>
>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>> (parameterTool.getRequired("topic"),
>>>>                new AvroPojoDeserializationSchema(),
>>>> parameterTool.getProperties()))
>>>>
>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>> AutoWatermarkIntervall is 500.
>>>>
>>>> The problem is, when I do something like:
>>>>
>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>> .sum(..)
>>>> .print()
>>>>
>>>> env.execute();
>>>>
>>>> the windows never get triggered.
>>>>
>>>> If I use ProcessingTime it works.
>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>> with EventTime, too.
>>>>
>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> [1]:
>>>>
>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>
>>>>    final private long maxDelay;
>>>>
>>>>    public  PojoTimestampExtractor(long maxDelay) {
>>>>        this.maxDelay = maxDelay;
>>>>    }
>>>>
>>>>    @Override
>>>>    public long extractTimestamp(Pojo fightEvent, long l) {
>>>>        return pojo.getTime();
>>>>    }
>>>>
>>>>    @Override
>>>>    public long extractWatermark(Pojo pojo, long l) {
>>>>        return pojo.getTime() - maxDelay;
>>>>    }
>>>>
>>>>    @Override
>>>>    public long getCurrentWatermark() {
>>>>        return Long.MIN_VALUE;
>>>>    }
>>>
>>>
>>
>> --
>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to