Hi Stefan,

Since TwoPhaseCommitSinkFunction is new to me, I would like to know more
details.

There are two more questions:
1.  If `snapshotState` failed at the first checkpoint, does it mean there
is no state and no transaction can be aborted by default?
2. I saw FlinkKafkaProducer011 has a transaction id pool, which has
multiple ids to be reused by producer, and it aborts all ids in this pool
in the `initializeState`. Is this pool designed for the situation in the
first problem or something I haven't noticed?

Thank you.

Best Regards,
Tony Wei

2018-01-04 22:15 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Yes, that is how it works.
>
> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo <luajit...@gmail.com>:
> >
> > The TwoPhaseCommitSinkFunction seems to record the transaction status
> > in the state just like what I imagine above, correct?
> > and if the progress fails before commit, in the later restart, the
> > commit would be triggered again, correct? So the commit would not be
> > forgotten, correct?
> >
> > 2018-01-03 22:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
> >> I think a mix of async UPDATES and exactly-once all this might be
> tricky,
> >> and the typical use case for async IO is more about reads. So let’s
> take a
> >> step back: what would you like to achieve with this? Do you want a
> >> read-modify-update (e.g. a map function that queries and updates a DB)
> or
> >> just updates (like a sink based that goes against a DB). From the
> previous
> >> question, I assume the second case applies, in which case I wonder why
> you
> >> even need to be async for a sink? I think a much better approach could
> be
> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
> >> batching to lower update costs.
> >>
> >> On top of the TwoPhaseCommitSinkFunction, you could implement
> transactions
> >> against your DB, similar to e.g. this example with Postgres:
> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-
> transaction-manager-that-works-with-postgresql/
> >> .
> >>
> >> Does this help or do you really need async read-modify-update?
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <luajit...@gmail.com>:
> >>
> >> No, I mean how to implement exactly-once db commit (given our async io
> >> target is mysql), not the state used by flink.
> >> As mentioned in previous mail, if I commit db in
> >> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> >> and flink restart would not trigger notifyCheckpointComplete for the
> >> last checkpoint again).
> >> On the other hand, if I update and commit per record, the sql/stored
> >> procedure have to handle duplicate updates at failure restart.
> >>
> >> So, when or where to commit so that we could get exactly-once db
> ingress.
> >>
> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
> >:
> >>
> >>
> >> Hi,
> >>
> >>
> >> Then how to implement exactly-once async io? That is, neither missing
> >> data or duplicating data.
> >>
> >>
> >> From the docs about async IO here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> asyncio.html
> >> :
> >>
> >> "Fault Tolerance Guarantees:
> >> The asynchronous I/O operator offers full exactly-once fault tolerance
> >> guarantees. It stores the records for in-flight asynchronous requests in
> >> checkpoints and restores/re-triggers the requests when recovering from a
> >> failure.“
> >>
> >> So it is already handled by Flink in a way that supports exactly-once.
> >>
> >> Is there some way to index data by checkpoint id and records which
> >> checkpoints already commit to db? But that means we need MapState,
> >> right?
> >>
> >>
> >> The information required depends a bit on the store that you are using,
> >> maybe the last confirmed checkpoint id is enough, but maybe you require
> >> something more. This transaction information is probably not „by-key“,
> but
> >> „per-operator“, so I would suggest to use operator state (see next
> answer).
> >> Btw the implementation of async operators does something very similar to
> >> restore pending requests, and you can see the code in
> „AsyncWaitOperator".
> >>
> >>
> >> However, the async-io operator normally follows other operators, e.g.
> >> fold, so it normally faces the DataStream but not KeyedStream, and
> >> DataStream only supports ListState, right?
> >>
> >>
> >> You can use non-keyed state, aka operator state, to store such
> information.
> >> See here:
> >> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/state.html#using-managed-operator-state
> >> . It does not require a KeyedSteam.
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >>
> >> 2018-01-03 18:43 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
> >:
> >>
> >>
> >>
> >> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <luajit...@gmail.com>:
> >>
> >> 2017-12-08 18:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
> >:
> >>
> >> You need to be a bit careful if your sink needs exactly-once semantics.
> In
> >> this case things should either be idempotent or the db must support
> rolling
> >> back changes between checkpoints, e.g. via transactions. Commits should
> be
> >> triggered for confirmed checkpoints („notifyCheckpointComplete“).
> >>
> >>
> >> I doubt if we have a risk here: in notifyCheckpointComplete, the
> >> checkpoint was completed, and if the process crashes (or machine
> >> failure) before it commits the db, the flink would restart the app,
> >> restoring the state from the last checkpoint, but it would not invoke
> >> notifyCheckpointComplete again? correct? if so, we would miss the
> >> database ingress for the data between the last two checkpoints, am I
> >> correct?
> >>
> >>
> >> Yes, that is correct. What I was talking about was more the opposite
> >> problem,i.e. committing too early. In that case, you could have
> committed
> >> for a checkpoint that failed afterwards, and recovery will start from an
> >> earlier checkpoint but with your commit already applied. You should only
> >> commit after you received the notification or else your semantics can be
> >> down to „at-least-once".
> >>
> >>
> >>
>
>

Reply via email to