Hi Piotrek,

There was already an issue [1] and PR for this thread. Should we mark it as
duplicated or related issue?

Best,
Tony Wei

[1] https://issues.apache.org/jira/browse/FLINK-10377

Piotr Nowojski <pi...@ververica.com> 於 2019年11月28日 週四 上午12:17寫道:

> Hi Tony,
>
> Thanks for the explanation. Assuming that’s what’s happening, then I
> agree, this checkStyle should be removed. I created a ticket for this issue
> https://issues.apache.org/jira/browse/FLINK-14979
>
> Piotrek
>
> On 27 Nov 2019, at 16:28, Tony Wei <tony19920...@gmail.com> wrote:
>
> Hi Piotrek,
>
> The case here was that the first snapshot is a savepoint. I know that if
> the following checkpoint succeeded before the previous one, the previous
> one will be subsumed by JobManager. However, if that previous one is a
> savepoint, it won't be subsumed. That leads to the case that Chesney said.
> The following checkpoint succeeded before the previous savepoint, handling
> both of their pending transaction, but savepoint still succeeded and sent
> the notification to each TaskManager. That led to this exception. Could you
> double check if this is the case? Thank you.
>
> Best,
> Tony Wei
>
> Piotr Nowojski <pi...@ververica.com> 於 2019年11月27日 週三 下午8:50 寫道:
>
>> Hi,
>>
>> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
>> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
>> has the same logic [1]. If I remember the discussions with Stephan/Till,
>> the way how Flink is using Akka probably guarantees that messages will be
>> always delivered, except of some failure, so `notifyCheckpointComplete`
>> could be missed probably only if a failure happens between snapshot and
>> arrival of the notification. Receiving the same notification twice should
>> be impossible (based on the knowledge passed to me from Till/Stephan).
>>
>> However, for one thing, if that’s possible, then the code should adjusted
>> accordingly. On the other hand, maybe there is no harm in relaxing the
>> contract? Even if we miss this notification (because of some re-ordering?),
>> next one will subsume the missed one and commit everything.
>>
>> Piotrek
>>
>> [1]
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>>
>> On 27 Nov 2019, at 13:02, Chesnay Schepler <ches...@apache.org> wrote:
>>
>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
>> The notification for complete checkpoints is not reliable; it may be late,
>> not come at all, possibly even in different order than expected.
>>
>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
>> the sink will always fail with an exception.
>>
>> What it should do imo is either a) don't check that there is a pending
>> transaction or b) track the highest checkpoint id received and optionally
>> don't fail if the notification is for an older CP.
>>
>> @piotr WDYT?
>>
>> On 27/11/2019 08:59, Tony Wei wrote:
>>
>> Hi,
>>
>> As the follow up, it seem that savepoint can't be subsumed, so that its
>> notification could still be send to each TMs.
>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>
>> Best,
>> Tony Wei
>>
>> Tony Wei <tony19920...@gmail.com> 於 2019年11月27日 週三 下午3:43寫道:
>>
>>> Hi,
>>>
>>> I want to raise this question again, since I have had this exception on
>>> my production job.
>>>
>>> The exception is as follows
>>>
>>>
>>>> 2019-11-27 14:47:29
>>>
>>>
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint     at
>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
>>>> .java:1149)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624)     at java.lang.Thread.run(Thread.java:
>>>> 748) Caused by: java.lang.IllegalStateException: checkpoint completed,
>>>> but no transaction pending     at org.apache.flink.util.Preconditions
>>>> .checkState(Preconditions.java:195)     at
>>>> org.apache.flink.streaming.api.functions.sink.
>>>> TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
>>>> TwoPhaseCommitSinkFunction.java:267)     at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>>>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .notifyCheckpointComplete(StreamTask.java:822)     at
>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>>>>     ... 5 more
>>>
>>>
>>> And these are the checkpoint / savepoint before the job failed.
>>> <checkoint.png>
>>>
>>> It seems that checkpoint # 675's notification handled the savepoint #
>>> 674's pending transaction holder, but savepoint #674's notification didn't
>>> be subsumed or be ignored by JM.
>>> Therefore, during the checkpoint #676, some tasks got notification
>>> before getting the checkpoint barrier and led to this exception happened,
>>> because there was no pending transaction in queue.
>>>
>>> Does anyone know the details about subsumed notifications mechanism and
>>> how checkpoint coordinator handle this situation? Please correct me if I'm
>>> wrong. Thanks.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Stefan Richter <s.rich...@data-artisans.com> 於 2018年10月8日 週一 下午5:03寫道:
>>>
>>>> Hi Pedro,
>>>>
>>>> unfortunately the interesting parts are all removed from the log, we
>>>> already know about the exception itself. In particular, what I would like
>>>> to see is what checkpoints have been triggered and completed before the
>>>> exception happens.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves <
>>>> pedro.mr.cha...@gmail.com>:
>>>> >
>>>> > Hello,
>>>> >
>>>> > Find attached the jobmanager.log. I've omitted the log lines from
>>>> other
>>>> > runs, only left the job manager info and the run with the error.
>>>> >
>>>> > jobmanager.log
>>>> > <
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>
>>>>
>>>> >
>>>> >
>>>> >
>>>> > Thanks again for your help.
>>>> >
>>>> > Regards,
>>>> > Pedro.
>>>> >
>>>> >
>>>> >
>>>> > -----
>>>> > Best Regards,
>>>> > Pedro Chaves
>>>> > --
>>>> > Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>>
>>
>>
>

Reply via email to