Hi Piotrek,

The case here was that the first snapshot is a savepoint. I know that if
the following checkpoint succeeded before the previous one, the previous
one will be subsumed by JobManager. However, if that previous one is a
savepoint, it won't be subsumed. That leads to the case that Chesney said.
The following checkpoint succeeded before the previous savepoint, handling
both of their pending transaction, but savepoint still succeeded and sent
the notification to each TaskManager. That led to this exception. Could you
double check if this is the case? Thank you.

Best,
Tony Wei

Piotr Nowojski <pi...@ververica.com> 於 2019年11月27日 週三 下午8:50 寫道:

> Hi,
>
> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
> has the same logic [1]. If I remember the discussions with Stephan/Till,
> the way how Flink is using Akka probably guarantees that messages will be
> always delivered, except of some failure, so `notifyCheckpointComplete`
> could be missed probably only if a failure happens between snapshot and
> arrival of the notification. Receiving the same notification twice should
> be impossible (based on the knowledge passed to me from Till/Stephan).
>
> However, for one thing, if that’s possible, then the code should adjusted
> accordingly. On the other hand, maybe there is no harm in relaxing the
> contract? Even if we miss this notification (because of some re-ordering?),
> next one will subsume the missed one and commit everything.
>
> Piotrek
>
> [1]
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>
> On 27 Nov 2019, at 13:02, Chesnay Schepler <ches...@apache.org> wrote:
>
> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
> The notification for complete checkpoints is not reliable; it may be late,
> not come at all, possibly even in different order than expected.
>
> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
> the sink will always fail with an exception.
>
> What it should do imo is either a) don't check that there is a pending
> transaction or b) track the highest checkpoint id received and optionally
> don't fail if the notification is for an older CP.
>
> @piotr WDYT?
>
> On 27/11/2019 08:59, Tony Wei wrote:
>
> Hi,
>
> As the follow up, it seem that savepoint can't be subsumed, so that its
> notification could still be send to each TMs.
> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>
> Best,
> Tony Wei
>
> Tony Wei <tony19920...@gmail.com> 於 2019年11月27日 週三 下午3:43寫道:
>
>> Hi,
>>
>> I want to raise this question again, since I have had this exception on
>> my production job.
>>
>> The exception is as follows
>>
>>
>>> 2019-11-27 14:47:29
>>
>>
>>
>> java.lang.RuntimeException: Error while confirming checkpoint     at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)     at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
>>> .java:1149)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624)     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but
>>> no transaction pending     at org.apache.flink.util.Preconditions
>>> .checkState(Preconditions.java:195)     at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>> .notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)     at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .notifyCheckpointComplete(StreamTask.java:822)     at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)     ...
>>> 5 more
>>
>>
>> And these are the checkpoint / savepoint before the job failed.
>> <checkoint.png>
>>
>> It seems that checkpoint # 675's notification handled the savepoint #
>> 674's pending transaction holder, but savepoint #674's notification didn't
>> be subsumed or be ignored by JM.
>> Therefore, during the checkpoint #676, some tasks got notification before
>> getting the checkpoint barrier and led to this exception happened, because
>> there was no pending transaction in queue.
>>
>> Does anyone know the details about subsumed notifications mechanism and
>> how checkpoint coordinator handle this situation? Please correct me if I'm
>> wrong. Thanks.
>>
>> Best,
>> Tony Wei
>>
>> Stefan Richter <s.rich...@data-artisans.com> 於 2018年10月8日 週一 下午5:03寫道:
>>
>>> Hi Pedro,
>>>
>>> unfortunately the interesting parts are all removed from the log, we
>>> already know about the exception itself. In particular, what I would like
>>> to see is what checkpoints have been triggered and completed before the
>>> exception happens.
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves <
>>> pedro.mr.cha...@gmail.com>:
>>> >
>>> > Hello,
>>> >
>>> > Find attached the jobmanager.log. I've omitted the log lines from other
>>> > runs, only left the job manager info and the run with the error.
>>> >
>>> > jobmanager.log
>>> > <
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>
>>>
>>> >
>>> >
>>> >
>>> > Thanks again for your help.
>>> >
>>> > Regards,
>>> > Pedro.
>>> >
>>> >
>>> >
>>> > -----
>>> > Best Regards,
>>> > Pedro Chaves
>>> > --
>>> > Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>>
>
>

Reply via email to