Hi Jeffrey,
You are right and I understood what you have said after I just studied the
class org.apache.flink.util.SerializedThrowable.
I prefer the fixes no.2 you mentioned:
CheckpointException should always capture its wrapped exception as a
SerializedThrowable
Looking forward to seeing your pr soon :)
Best,
Terry Wang
> 在 2019年9月23日,上午11:48,Jeffrey Martin <[email protected]> 写道:
>
> Hi Terry,
>
> KafkaException comes in through the job's dependencies (it's defined in the
> kafka-clients jar packed up in the fat job jar) and is on either the TM nor
> JM default classpath. The job running in the TM includes the job
> dependencies and so can throw a KafkaException but the JM can't deserialize
> it because it's not available on the default classpath.
>
> I'm suggesting defensively wrapping all causes of a CheckpointException in
> a SerializedThrowable (in addition to defensively wrapping everything
> except a CheckpointException). I believe SerializedThrowable is there
> specifically for this case, i.e. where a job in the TM sends the JM an
> exception that's defined only in the job itself.
>
> It might be clearer if I just put up a PR :) I'd be happy to and it'll be
> very short.
>
> Best,
>
> Jeff
>
> On Sun, Sep 22, 2019 at 7:45 PM Terry Wang <[email protected]> wrote:
>
>> Hi, Jeffrey~
>>
>> Thanks for your detailed explanation and I understood why job failed with
>> flink 1.9.
>>
>> But the two fixes you mentioned may still not work well. As KafkaException
>> can be serialized
>> in TM for there is necessary jar in its classpath but not in JM, so maybe
>> it’s impossible to check
>> the possibility of serialization in advance.
>> Do I understand right?
>>
>>
>>
>> Best,
>> Terry Wang
>>
>>
>>
>>> 在 2019年9月23日,上午5:17,Jeffrey Martin <[email protected]> 写道:
>>>
>>> Thanks for suggestion, Terry. I've investigated a bit further.
>>>
>>> DeclineCheckpoint specifically checks for the possibility of an exception
>>> that the JM won't be able to deserialize (i.e. anything other than a
>>> Checkpoint exception). It just doesn't check for the possibility of a
>>> CheckpointException that can't be deserialize because its root cause
>> can't
>>> be deserialize.
>>>
>>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
>>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
>>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
>>> checkpoints to fail sometimes. That caused the KafkaException to be
>>> propagated to the JM as the root cause of a CheckpointException.
>>>
>>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang <[email protected]> wrote:
>>>
>>>> Hi, Jeffrey~
>>>>
>>>> I think two fixes you mentioned may not work in your case.
>>>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
>>>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and
>> JM
>>>> jar package environment inconsistent or jar loaded behavior
>> inconsistent in
>>>> nature.
>>>> Maybe the behavior of standalone cluster’s dynamic class loader changed
>>>> in flink 1.9 since you mentioned that your program run normally in flink
>>>> 1.8.
>>>> Just a thought from me.
>>>> Hope to be useful~
>>>>
>>>> Best,
>>>> Terry Wang
>>>>
>>>>
>>>>
>>>>> 在 2019年9月21日,上午2:58,Jeffrey Martin <[email protected]> 写道:
>>>>>
>>>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
>>>>>
>>>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
>>>>>
>>>>> If FlinkKafkaProducer fails while checkpointing, it throws a
>>>> KafkaException
>>>>> which gets wrapped in a CheckpointException which is sent to the JM as
>> a
>>>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
>>>> the
>>>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
>>>>> KafkaException wind up suppressed so it's impossible to figure out what
>>>>> actually went wrong.
>>>>>
>>>>> I can think of two fixes that would prevent this from occurring in the
>>>>> Kafka or other connectors in the future:
>>>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
>>>>> rather than allowing CheckpointExceptions with non-deserializable root
>>>>> causes to slip through
>>>>> 2. CheckpointException should always capture its wrapped exception as a
>>>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
>>>>> rather than 'super(cause)').
>>>>>
>>>>> Thoughts?
>>>>
>>>>
>>
>>