No, this is not possible unless you use an external service such as a
database.
The assigners might run on different machines and Flink does not provide
utilities for r/w shared state.

Best, Fabian

2016-09-15 20:17 GMT+02:00 Saiph Kappa <saiph.ka...@gmail.com>:

> And is it possible to share state across parallel instances with
> AssignerWithPunctuatedWatermarks?
>
> Thanks!
>
> On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> the problem might be that your timestamp/watermark assigner is run in
>> parallel and that only one parallel instance of those operators emits the
>> watermark because only one of those parallel instances sees the element
>> with _3 == 9000. For the watermark to advance at an operator it needs to
>> advance in all upstream operations.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <saiph.ka...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a streaming (event time) application where I am receiving events
>>> with the same assigned timestamp. I receive 10000 events in total on a
>>> window of 5 minutes, but I emit water mark when 9000 elements have been
>>> received. This watermark is 6 minutes after the assigned timestamps. My
>>> question is: why the function that is associated with the window reads
>>> 10000 elements and not 9000? All elements that have a timestamp lower than
>>> the watermark should be ignored (1000), but it's not happening.
>>>
>>> Here is part of the code:
>>> «
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val rawStream = env.socketTextStream("localhost", 4321)
>>>
>>> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
>>> Int, Long)] {
>>>       val timestamp = System.currentTimeMillis();
>>>
>>>       override def extractTimestamp(element: (String, Int, Long),
>>> previousElementTimestamp: Long): Long =
>>>         timestamp
>>>
>>>       override def checkAndGetNextWatermark(lastElement: (String, Int,
>>> Long), extractedTimestamp: Long): Watermark = {
>>>         if(lastElement._3 == 9000) {
>>>           val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>>>           new watermark.Watermark(ts)
>>>         } else null
>>>       }
>>>     }
>>>
>>> val stream = rawStream.map(line => {
>>>       val Array(p1, p2, p3) = line.split(" ")
>>>       (p1, p2.toInt, p3.toLong)
>>>     })
>>>       .assignTimestampsAndWatermarks(punctuatedAssigner)
>>>
>>> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
>>> »
>>>
>>> Thanks!
>>>
>>
>

Reply via email to