Hi Stefan, Since TwoPhaseCommitSinkFunction is new to me, I would like to know more details.
There are two more questions: 1. If `snapshotState` failed at the first checkpoint, does it mean there is no state and no transaction can be aborted by default? 2. I saw FlinkKafkaProducer011 has a transaction id pool, which has multiple ids to be reused by producer, and it aborts all ids in this pool in the `initializeState`. Is this pool designed for the situation in the first problem or something I haven't noticed? Thank you. Best Regards, Tony Wei 2018-01-04 22:15 GMT+08:00 Stefan Richter <[email protected]>: > Yes, that is how it works. > > > Am 04.01.2018 um 14:47 schrieb Jinhua Luo <[email protected]>: > > > > The TwoPhaseCommitSinkFunction seems to record the transaction status > > in the state just like what I imagine above, correct? > > and if the progress fails before commit, in the later restart, the > > commit would be triggered again, correct? So the commit would not be > > forgotten, correct? > > > > 2018-01-03 22:54 GMT+08:00 Stefan Richter <[email protected]>: > >> I think a mix of async UPDATES and exactly-once all this might be > tricky, > >> and the typical use case for async IO is more about reads. So let’s > take a > >> step back: what would you like to achieve with this? Do you want a > >> read-modify-update (e.g. a map function that queries and updates a DB) > or > >> just updates (like a sink based that goes against a DB). From the > previous > >> question, I assume the second case applies, in which case I wonder why > you > >> even need to be async for a sink? I think a much better approach could > be > >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some > >> batching to lower update costs. > >> > >> On top of the TwoPhaseCommitSinkFunction, you could implement > transactions > >> against your DB, similar to e.g. this example with Postgres: > >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java- > transaction-manager-that-works-with-postgresql/ > >> . > >> > >> Does this help or do you really need async read-modify-update? > >> > >> Best, > >> Stefan > >> > >> > >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <[email protected]>: > >> > >> No, I mean how to implement exactly-once db commit (given our async io > >> target is mysql), not the state used by flink. > >> As mentioned in previous mail, if I commit db in > >> notifyCheckpointComplete, we have a risk to lost data (lost commit, > >> and flink restart would not trigger notifyCheckpointComplete for the > >> last checkpoint again). > >> On the other hand, if I update and commit per record, the sql/stored > >> procedure have to handle duplicate updates at failure restart. > >> > >> So, when or where to commit so that we could get exactly-once db > ingress. > >> > >> 2018-01-03 21:57 GMT+08:00 Stefan Richter <[email protected] > >: > >> > >> > >> Hi, > >> > >> > >> Then how to implement exactly-once async io? That is, neither missing > >> data or duplicating data. > >> > >> > >> From the docs about async IO here > >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ > asyncio.html > >> : > >> > >> "Fault Tolerance Guarantees: > >> The asynchronous I/O operator offers full exactly-once fault tolerance > >> guarantees. It stores the records for in-flight asynchronous requests in > >> checkpoints and restores/re-triggers the requests when recovering from a > >> failure.“ > >> > >> So it is already handled by Flink in a way that supports exactly-once. > >> > >> Is there some way to index data by checkpoint id and records which > >> checkpoints already commit to db? But that means we need MapState, > >> right? > >> > >> > >> The information required depends a bit on the store that you are using, > >> maybe the last confirmed checkpoint id is enough, but maybe you require > >> something more. This transaction information is probably not „by-key“, > but > >> „per-operator“, so I would suggest to use operator state (see next > answer). > >> Btw the implementation of async operators does something very similar to > >> restore pending requests, and you can see the code in > „AsyncWaitOperator". > >> > >> > >> However, the async-io operator normally follows other operators, e.g. > >> fold, so it normally faces the DataStream but not KeyedStream, and > >> DataStream only supports ListState, right? > >> > >> > >> You can use non-keyed state, aka operator state, to store such > information. > >> See here: > >> https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/stream/state.html#using-managed-operator-state > >> . It does not require a KeyedSteam. > >> > >> Best, > >> Stefan > >> > >> > >> > >> 2018-01-03 18:43 GMT+08:00 Stefan Richter <[email protected] > >: > >> > >> > >> > >> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <[email protected]>: > >> > >> 2017-12-08 18:25 GMT+08:00 Stefan Richter <[email protected] > >: > >> > >> You need to be a bit careful if your sink needs exactly-once semantics. > In > >> this case things should either be idempotent or the db must support > rolling > >> back changes between checkpoints, e.g. via transactions. Commits should > be > >> triggered for confirmed checkpoints („notifyCheckpointComplete“). > >> > >> > >> I doubt if we have a risk here: in notifyCheckpointComplete, the > >> checkpoint was completed, and if the process crashes (or machine > >> failure) before it commits the db, the flink would restart the app, > >> restoring the state from the last checkpoint, but it would not invoke > >> notifyCheckpointComplete again? correct? if so, we would miss the > >> database ingress for the data between the last two checkpoints, am I > >> correct? > >> > >> > >> Yes, that is correct. What I was talking about was more the opposite > >> problem,i.e. committing too early. In that case, you could have > committed > >> for a checkpoint that failed afterwards, and recovery will start from an > >> earlier checkpoint but with your commit already applied. You should only > >> commit after you received the notification or else your semantics can be > >> down to „at-least-once". > >> > >> > >> > >
