Are you 100% sure that the jar files in the classpath (/lib folder) are
exactly the same on all machines? (It can happen quite easily in a
distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <hinobl...@gmail.com> wrote:

> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>
>
>
> Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午6:52写道:
>
>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which
>> can lead to corrupted data when using UC)
>> Can you tell us a little bit about your environment? (How are you
>> deploying Flink, which state backend are you using, what kind of job (I
>> guess DataStream API))
>>
>> Somehow the process receiving the data is unable to deserialize it, most
>> likely because they are configured differently (different classpath,
>> dependency versions etc.)
>>
>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <hinobl...@gmail.com> wrote:
>>
>>> I do not think this is some code related problem anymore, maybe it is
>>> some bug?
>>>
>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月5日周五 下午4:30写道:
>>>
>>>> Hi all, I find that the failure always occurred in the second task,
>>>> after the source task. So I do something in the first chaining task, I
>>>> transform the 'Map' based class object to another normal class object, and
>>>> the problem disappeared.
>>>>
>>>> Based on the new solution, I also tried to stop and restore job with
>>>> savepoint (all successful).
>>>>
>>>> But, I also met another problem. Also this problem occurs while I stop
>>>> the job, and also occurs in the second task after the source task. The log
>>>> is below:
>>>> 2021-02-05 16:21:26
>>>> java.io.EOFException
>>>>     at org.apache.flink.core.memory.DataInputDeserializer
>>>> .readUnsignedByte(DataInputDeserializer.java:321)
>>>>     at org.apache.flink.types.StringValue.readString(StringValue.java:
>>>> 783)
>>>>     at org.apache.flink.api.common.typeutils.base.StringSerializer
>>>> .deserialize(StringSerializer.java:75)
>>>>     at org.apache.flink.api.common.typeutils.base.StringSerializer
>>>> .deserialize(StringSerializer.java:33)
>>>>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>> .deserialize(PojoSerializer.java:411)
>>>>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>> .deserialize(PojoSerializer.java:411)
>>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>>     at org.apache.flink.runtime.plugable.
>>>> NonReusingDeserializationDelegate.read(
>>>> NonReusingDeserializationDelegate.java:55)
>>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>> .emitNext(StreamTaskNetworkInput.java:145)
>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>> .processInput(StreamOneInputProcessor.java:67)
>>>>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>>> .processInput(StreamTwoInputProcessor.java:92)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .processInput(StreamTask.java:372)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .runMailboxLoop(StreamTask.java:575)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:539)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> It is also about serialize and deserialize, but not related to kryo
>>>> this time.
>>>>
>>>>
>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道:
>>>>
>>>>> From these snippets it is hard to tell what's going wrong. Could you
>>>>> maybe give us a minimal example with which to reproduce the problem?
>>>>> Alternatively, have you read through Flink's serializer documentation [1]?
>>>>> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>>>>>
>>>>> The stack trace looks as if the job fails deserializing some key of
>>>>> your MapRecord map.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote:
>>>>>
>>>>>> Some facts are possibly related with these, since another job do not
>>>>>> meet these expectations.
>>>>>> The problem job use a class which contains a field of Class
>>>>>> MapRecord, and MapRecord is defined to extend HashMap so as to
>>>>>> accept variable json data.
>>>>>>
>>>>>> Class MapRecord:
>>>>>>
>>>>>> @NoArgsConstructor
>>>>>> @Slf4j
>>>>>> public class MapRecord extends HashMap<Object, Object> implements 
>>>>>> Serializable {
>>>>>>     @Override
>>>>>>     public void setTimestamp(Long timestamp) {
>>>>>>         put("timestamp", timestamp);
>>>>>>         put("server_time", timestamp);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public Long getTimestamp() {
>>>>>>         try {
>>>>>>             Object ts = getOrDefault("timestamp", 
>>>>>> getOrDefault("server_time", 0L));
>>>>>>             return ((Number) 
>>>>>> Optional.ofNullable(ts).orElse(0L)).longValue();
>>>>>>         } catch (Exception e) {
>>>>>>             log.error("Error, MapRecord's timestamp invalid.", e);
>>>>>>             return 0L;
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Class UserAccessLog:
>>>>>>
>>>>>> public class UserAccessLog extends AbstractRecord<UserAccessLog> {
>>>>>>     private MapRecord d;  // I think this is related to the problem...
>>>>>>
>>>>>>     ... ...
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道:
>>>>>>
>>>>>>> Actually the exception is different every time I stop the job.
>>>>>>> Such as:
>>>>>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class:
>>>>>>> g^XT
>>>>>>> The stack as I given above.
>>>>>>>
>>>>>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>>>>>>> 2021-02-03 18:37:24
>>>>>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>>>>>>>     at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>>>>>>     at java.util.ArrayList.get(ArrayList.java:433)
>>>>>>>     at com.esotericsoftware.kryo.util.MapReferenceResolver
>>>>>>> .getReadObject(MapReferenceResolver.java:42)
>>>>>>>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:
>>>>>>> 805)
>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:
>>>>>>> 759)
>>>>>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>>>>>> MapSerializer.java:135)
>>>>>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>>>>>> MapSerializer.java:21)
>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:
>>>>>>> 761)
>>>>>>>     at org.apache.flink.api.java.typeutils.runtime.kryo.
>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346)
>>>>>>>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>>>>> .deserialize(PojoSerializer.java:411)
>>>>>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202
>>>>>>> )
>>>>>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>>>>>     at org.apache.flink.runtime.plugable.
>>>>>>> NonReusingDeserializationDelegate.read(
>>>>>>> NonReusingDeserializationDelegate.java:55)
>>>>>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>> .emitNext(StreamTaskNetworkInput.java:145)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>>>> .processInput(StreamOneInputProcessor.java:67)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>>>>>> .processInput(StreamTwoInputProcessor.java:92)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .processInput(StreamTask.java:372)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .runMailboxLoop(StreamTask.java:575)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>>> StreamTask.java:539)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722
>>>>>>> )
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>> (3)  com.esotericsoftware.kryo.KryoException: Encountered
>>>>>>> unregistered class ID: 96
>>>>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered
>>>>>>> class ID: 96
>>>>>>>     at com.esotericsoftware.kryo.util.DefaultClassResolver
>>>>>>> .readClass(DefaultClassResolver.java:119)
>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:
>>>>>>> 752)
>>>>>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>>>>>> MapSerializer.java:135)
>>>>>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>>>>>> MapSerializer.java:21)
>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:
>>>>>>> 761)
>>>>>>>     at org.apache.flink.api.java.typeutils.runtime.kryo.
>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346)
>>>>>>>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>>>>>> .deserialize(PojoSerializer.java:411)
>>>>>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202
>>>>>>> )
>>>>>>>     at org.apache.flink.streaming.runtime.streamrecord.
>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>>>>>     at org.apache.flink.runtime.plugable.
>>>>>>> NonReusingDeserializationDelegate.read(
>>>>>>> NonReusingDeserializationDelegate.java:55)
>>>>>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>> .emitNext(StreamTaskNetworkInput.java:145)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>>>> .processInput(StreamOneInputProcessor.java:67)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>>>>>> .processInput(StreamTwoInputProcessor.java:92)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .processInput(StreamTask.java:372)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .runMailboxLoop(StreamTask.java:575)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>>> StreamTask.java:539)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722
>>>>>>> )
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> could you show us the job you are trying to resume? Is it a SQL job
>>>>>>>> or a DataStream job, for example?
>>>>>>>>
>>>>>>>> From the stack trace, it looks as if the class g^XT is not on the
>>>>>>>> class path.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I have a job, the checkpoint and savepoint all right.
>>>>>>>>> But, if I stop the job using 'stop -p', after the savepoint
>>>>>>>>> generated, then the job goes to fail. Here is the log:
>>>>>>>>>
>>>>>>>>> 2021-02-03 16:53:55,179 WARN
>>>>>>>>>  org.apache.flink.runtime.taskmanager.Task                    [] -
>>>>>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default
>>>>>>>>> PassThroughFilter[null, null) ->
>>>>>>>>> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter
>>>>>>>>> (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched
>>>>>>>>> from RUNNING to FAILED.
>>>>>>>>>
>>>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
>>>>>>>>> Caused by: java.lang.ClassNotFoundException: g^XT
>>>>>>>>>         at
>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) 
>>>>>>>>> ~[?:1.8.0_251]
>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>>>>>> ~[?:1.8.0_251]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>>>>>> ~[?:1.8.0_251]
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
>>>>>>>>>         at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>>>>>>>>>         ... 22 more
>>>>>>>>>
>>>>>>>>>

Reply via email to