No, I mean how to implement exactly-once db commit (given our async io
target is mysql), not the state used by flink.
As mentioned in previous mail, if I commit db in
notifyCheckpointComplete, we have a risk to lost data (lost commit,
and flink restart would not trigger notifyCheckpointComplete for the
last checkpoint again).
On the other hand, if I update and commit per record, the sql/stored
procedure have to handle duplicate updates at failure restart.

So, when or where to commit so that we could get exactly-once db ingress.

2018-01-03 21:57 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
> Hi,
>
>
> Then how to implement exactly-once async io? That is, neither missing
> data or duplicating data.
>
>
> From the docs about async IO here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
> :
>
> "Fault Tolerance Guarantees:
> The asynchronous I/O operator offers full exactly-once fault tolerance
> guarantees. It stores the records for in-flight asynchronous requests in
> checkpoints and restores/re-triggers the requests when recovering from a
> failure.“
>
> So it is already handled by Flink in a way that supports exactly-once.
>
> Is there some way to index data by checkpoint id and records which
> checkpoints already commit to db? But that means we need MapState,
> right?
>
>
> The information required depends a bit on the store that you are using,
> maybe the last confirmed checkpoint id is enough, but maybe you require
> something more. This transaction information is probably not „by-key“, but
> „per-operator“, so I would suggest to use operator state (see next answer).
> Btw the implementation of async operators does something very similar to
> restore pending requests, and you can see the code in „AsyncWaitOperator".
>
>
> However, the async-io operator normally follows other operators, e.g.
> fold, so it normally faces the DataStream but not KeyedStream, and
> DataStream only supports ListState, right?
>
>
> You can use non-keyed state, aka operator state, to store such information.
> See here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
> . It does not require a KeyedSteam.
>
> Best,
> Stefan
>
>
>
> 2018-01-03 18:43 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>
>
> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <luajit...@gmail.com>:
>
> 2017-12-08 18:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
> You need to be a bit careful if your sink needs exactly-once semantics. In
> this case things should either be idempotent or the db must support rolling
> back changes between checkpoints, e.g. via transactions. Commits should be
> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>
>
> I doubt if we have a risk here: in notifyCheckpointComplete, the
> checkpoint was completed, and if the process crashes (or machine
> failure) before it commits the db, the flink would restart the app,
> restoring the state from the last checkpoint, but it would not invoke
> notifyCheckpointComplete again? correct? if so, we would miss the
> database ingress for the data between the last two checkpoints, am I
> correct?
>
>
> Yes, that is correct. What I was talking about was more the opposite
> problem,i.e. committing too early. In that case, you could have committed
> for a checkpoint that failed afterwards, and recovery will start from an
> earlier checkpoint but with your commit already applied. You should only
> commit after you received the notification or else your semantics can be
> down to „at-least-once".
>
>

Reply via email to