Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the
source, while reading source events I can find out that - this is the
source event I want to take actions after. So if at ssource I can then emit
checkpoint and catch it at the end of the DAG that would solve my problem
(well, I also need to somehow distinguish my checkpoint from Flink's
auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion,
can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Anton!
>
> That you can do!
>
> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
> There you will get a call at every checkpoint (and can look at what records
> are before that checkpoint). You also get a call once the checkpoint is
> complete, which corresponds to the point when everything has flown through
> the DAG.
>
> I think it is nice to implement it like that, because it works
> non-blocking: The stream continues while the the records-you-wait-for flow
> through the DAG, and you get an asynchronous notification once they have
> flown all the way through.
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <polyakov.an...@gmail.com
> > wrote:
>
>> I think I can turn my problem into a simpler one.
>>
>> Effectively what I need - I need way to checkpoint certain events in
>> input stream and once this checkpoint reaches end of DAG take some action.
>> So I need a signal at the sink which can tell "all events in source before
>> checkpointed event are now processed".
>>
>> As far as I understand flagged record don't quite work since DAG doesn't
>> propagate source events one-to-one. Some transformations might create 3
>> child events out of 1 source. If I want to make sure I fully processed
>> source event, I need to wait till all childs are processed.
>>
>>
>>
>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <polyakov.an...@gmail.com
>> > wrote:
>>
>>> Hi Fabian
>>>
>>> Defining a special flag for record seems like a checkpoint barrier. I
>>> think I will end up re-implementing checkpointing myself. I found the
>>> discussion in flink-dev:
>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>>> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E>
>>>  which
>>> seems to solve my task. Essentially they want to have a mechanism which
>>> will mark record produced by job as “last” and then wait until it’s fully
>>> propagated through DAG. Similarly to what I need. Essentially my job which
>>> produces trades can also thought as being finished once it produced all
>>> trades, then I just need to wait till latest trade produced by this job is
>>> processed.
>>>
>>> So although windows can probably also be applied, I think propagating
>>> barrier through DAG and checkpointing at final job is what I need.
>>>
>>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
>>> triggering a custom checkoint or finishing streaming job)?
>>>
>>> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>> Hi Anton,
>>>
>>> If I got your requirements right, you are looking for a solution that
>>> continuously produces updated partial aggregates in a streaming fashion.
>>> When a  special event (no more trades) is received, you would like to store
>>> the last update as a final result. Is that correct?
>>>
>>> You can compute continuous updates using a reduce() or fold() function.
>>> These will produce a new update for each incoming event.
>>> For example:
>>>
>>> val s: DataStream[(Int, Long)] = ...
>>> s.keyBy(_._1)
>>>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>>>
>>> would continuously compute a sum for every key (_._1) and produce an
>>> update for each incoming record.
>>>
>>> You could add a flag to the record and implement a ReduceFunction that
>>> marks a record as final when the no-more-trades event is received.
>>> With a filter and a data sink you could emit such final records to a
>>> persistent data store.
>>>
>>> Btw.: You can also define custom trigger policies for windows. A custom
>>> trigger is called for each element that is added to a window and when
>>> certain timers expire. For example with a custom trigger, you can evaluate
>>> a window for every second element that is added. You can also define
>>> whether the elements in the window should be retained or removed after the
>>> evaluation.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2015-11-24 21:32 GMT+01:00 Anton Polyakov <polyakov.an...@gmail.com>:
>>>
>>>> Hi Max
>>>>
>>>> thanks for reply. From what I understand window works in a way that it
>>>> buffers records while window is open, then apply transformation once window
>>>> close is triggered and pass transformed result.
>>>> In my case then window will be open for few hours, then the whole
>>>> amount of trades will be processed once window close is triggered. Actually
>>>> I want to process events as they are produced without buffering them. It is
>>>> more like a stream with some special mark versus windowing seems more like
>>>> a batch (if I understand it correctly).
>>>>
>>>> In other words - buffering and waiting for window to close, then
>>>> processing will be equal to simply doing one-off processing when all events
>>>> are produced. I am looking for a solution when I am processing events as
>>>> they are produced and when source signals "done" my processing is also
>>>> nearly done.
>>>>
>>>>
>>>> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Anton,
>>>>>
>>>>> You should be able to model your problem using the Flink Streaming
>>>>> API. The actions you want to perform on the streamed records
>>>>> correspond to transformations on Windows. You can indeed use
>>>>> Watermarks to signal the window that a threshold for an action has
>>>>> been reached. Otherwise an eviction policy should also do it.
>>>>>
>>>>> Without more details about what you want to do I can only refer you to
>>>>> the streaming API documentation:
>>>>> Please see
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>>>>>
>>>>> Thanks,
>>>>> Max
>>>>>
>>>>> On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
>>>>> <polyakov.an...@gmail.com> wrote:
>>>>> > Hi
>>>>> >
>>>>> > I am very new to Flink and in fact never used it. My task (which I
>>>>> currently solve using home grown Redis-based solution) is quite simple - I
>>>>> have a system which produces some events (trades, it is a financial 
>>>>> system)
>>>>> and computational chain which computes some measure accumulatively over
>>>>> these events. Those events form a long but finite stream, they are 
>>>>> produced
>>>>> as a result of end of day flow. Computational logic forms a processing DAG
>>>>> which computes some measure over these events (VaR). Each trade is
>>>>> processed through DAG and at different stages might produce different set
>>>>> of subsequent events (like return vectors), eventually they all arrive 
>>>>> into
>>>>> some aggregator which computes accumulated measure (reducer).
>>>>> >
>>>>> > Ideally I would like to process trades as they appear (i.e. stream
>>>>> them) and once producer reaches end of portfolio (there will be no more
>>>>> trades), I need to write final resulting measure and mark it as “end of 
>>>>> day
>>>>> record”. Of course I also could use a classical batch - i.e. wait until 
>>>>> all
>>>>> trades are produced and then batch process them, but this will be too
>>>>> inefficient.
>>>>> >
>>>>> > If I use Flink, I will need a sort of watermark saying - “done, no
>>>>> more trades” and once this watermark reaches end of DAG, final measure can
>>>>> be saved. More generally would be cool to have an indication at the end of
>>>>> DAG telling to which input stream position current measure corresponds.
>>>>> >
>>>>> > I feel my problem is very typical yet I can’t find any solution. All
>>>>> examples operate either on infinite streams where nobody cares about
>>>>> completion or classical batch examples which rely on fact all input data 
>>>>> is
>>>>> ready.
>>>>> >
>>>>> > Can you please hint me.
>>>>> >
>>>>> > Thank you vm
>>>>> > Anton
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to