Hi Stefan,

Your reply really helps me a lot. Thank you.

2018-01-08 19:38 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> 1.  If `snapshotState` failed at the first checkpoint, does it mean there
> is no state and no transaction can be aborted by default?
>
>
> This is a general problem and not only limited to the first checkpoint.
> Whenever you open a transaction, there is no guaranteed way to store it in
> persistent state to abort it in case of failure. In theory, your job can
> crash at any point after you just opened a transaction. So in the end I
> guess you must rely on something like e.g. timeout based mechanism. You can
> do some _best effort_ to proactively cancel uncommitted transactions
> through methods like states, listing them in files, or having a fixed pool
> of transaction ids and iterate them all for cancellation on a restart.
>
> 2. I saw FlinkKafkaProducer011 has a transaction id pool, which has
> multiple ids to be reused by producer, and it aborts all ids in this pool
> in the `initializeState`. Is this pool designed for the situation in the
> first problem or something I haven't noticed?
>
>
> This implementation is very specific for KafkaProducer and is not
> necessarily a good blueprint for what you are planning. In particular, in
> this case there is a fixed and limited universe of all potential
> transaction ids that a task can potentially (re)use, so after a restart
> without state we can simply iterate all possible transaction ids and issue
> a cancel for all of them. In general, you don’t always know all possible
> transaction ids in a way that allows you to opportunistically cancel all
> potential orphaned transactions.
>
> 2018-01-04 22:15 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>> Yes, that is how it works.
>>
>> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo <luajit...@gmail.com>:
>> >
>> > The TwoPhaseCommitSinkFunction seems to record the transaction status
>> > in the state just like what I imagine above, correct?
>> > and if the progress fails before commit, in the later restart, the
>> > commit would be triggered again, correct? So the commit would not be
>> > forgotten, correct?
>> >
>> > 2018-01-03 22:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
>> >:
>> >> I think a mix of async UPDATES and exactly-once all this might be
>> tricky,
>> >> and the typical use case for async IO is more about reads. So let’s
>> take a
>> >> step back: what would you like to achieve with this? Do you want a
>> >> read-modify-update (e.g. a map function that queries and updates a DB)
>> or
>> >> just updates (like a sink based that goes against a DB). From the
>> previous
>> >> question, I assume the second case applies, in which case I wonder why
>> you
>> >> even need to be async for a sink? I think a much better approach could
>> be
>> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> >> batching to lower update costs.
>> >>
>> >> On top of the TwoPhaseCommitSinkFunction, you could implement
>> transactions
>> >> against your DB, similar to e.g. this example with Postgres:
>> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transa
>> ction-manager-that-works-with-postgresql/
>> >> .
>> >>
>> >> Does this help or do you really need async read-modify-update?
>> >>
>> >> Best,
>> >> Stefan
>> >>
>> >>
>> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <luajit...@gmail.com>:
>> >>
>> >> No, I mean how to implement exactly-once db commit (given our async io
>> >> target is mysql), not the state used by flink.
>> >> As mentioned in previous mail, if I commit db in
>> >> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> >> and flink restart would not trigger notifyCheckpointComplete for the
>> >> last checkpoint again).
>> >> On the other hand, if I update and commit per record, the sql/stored
>> >> procedure have to handle duplicate updates at failure restart.
>> >>
>> >> So, when or where to commit so that we could get exactly-once db
>> ingress.
>> >>
>> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
>> >:
>> >>
>> >>
>> >> Hi,
>> >>
>> >>
>> >> Then how to implement exactly-once async io? That is, neither missing
>> >> data or duplicating data.
>> >>
>> >>
>> >> From the docs about async IO here
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/stream/asyncio.html
>> >> :
>> >>
>> >> "Fault Tolerance Guarantees:
>> >> The asynchronous I/O operator offers full exactly-once fault tolerance
>> >> guarantees. It stores the records for in-flight asynchronous requests
>> in
>> >> checkpoints and restores/re-triggers the requests when recovering from
>> a
>> >> failure.“
>> >>
>> >> So it is already handled by Flink in a way that supports exactly-once.
>> >>
>> >> Is there some way to index data by checkpoint id and records which
>> >> checkpoints already commit to db? But that means we need MapState,
>> >> right?
>> >>
>> >>
>> >> The information required depends a bit on the store that you are using,
>> >> maybe the last confirmed checkpoint id is enough, but maybe you require
>> >> something more. This transaction information is probably not „by-key“,
>> but
>> >> „per-operator“, so I would suggest to use operator state (see next
>> answer).
>> >> Btw the implementation of async operators does something very similar
>> to
>> >> restore pending requests, and you can see the code in
>> „AsyncWaitOperator".
>> >>
>> >>
>> >> However, the async-io operator normally follows other operators, e.g.
>> >> fold, so it normally faces the DataStream but not KeyedStream, and
>> >> DataStream only supports ListState, right?
>> >>
>> >>
>> >> You can use non-keyed state, aka operator state, to store such
>> information.
>> >> See here:
>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/stream/state.html#using-managed-operator-state
>> >> . It does not require a KeyedSteam.
>> >>
>> >> Best,
>> >> Stefan
>> >>
>> >>
>> >>
>> >> 2018-01-03 18:43 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
>> >:
>> >>
>> >>
>> >>
>> >> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <luajit...@gmail.com>:
>> >>
>> >> 2017-12-08 18:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
>> >:
>> >>
>> >> You need to be a bit careful if your sink needs exactly-once
>> semantics. In
>> >> this case things should either be idempotent or the db must support
>> rolling
>> >> back changes between checkpoints, e.g. via transactions. Commits
>> should be
>> >> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>> >>
>> >>
>> >> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> >> checkpoint was completed, and if the process crashes (or machine
>> >> failure) before it commits the db, the flink would restart the app,
>> >> restoring the state from the last checkpoint, but it would not invoke
>> >> notifyCheckpointComplete again? correct? if so, we would miss the
>> >> database ingress for the data between the last two checkpoints, am I
>> >> correct?
>> >>
>> >>
>> >> Yes, that is correct. What I was talking about was more the opposite
>> >> problem,i.e. committing too early. In that case, you could have
>> committed
>> >> for a checkpoint that failed afterwards, and recovery will start from
>> an
>> >> earlier checkpoint but with your commit already applied. You should
>> only
>> >> commit after you received the notification or else your semantics can
>> be
>> >> down to „at-least-once".
>> >>
>> >>
>> >>
>>
>>
>
>

Reply via email to