Are you 100% sure that the jar files in the classpath (/lib folder) are exactly the same on all machines? (It can happen quite easily in a distributed standalone setup that some files are different)
On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <hinobl...@gmail.com> wrote: > Flink1.12.0; only using aligned checkpoint; Standalone Cluster; > > > > Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午6:52写道: > >> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which >> can lead to corrupted data when using UC) >> Can you tell us a little bit about your environment? (How are you >> deploying Flink, which state backend are you using, what kind of job (I >> guess DataStream API)) >> >> Somehow the process receiving the data is unable to deserialize it, most >> likely because they are configured differently (different classpath, >> dependency versions etc.) >> >> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <hinobl...@gmail.com> wrote: >> >>> I do not think this is some code related problem anymore, maybe it is >>> some bug? >>> >>> 赵一旦 <hinobl...@gmail.com> 于2021年2月5日周五 下午4:30写道: >>> >>>> Hi all, I find that the failure always occurred in the second task, >>>> after the source task. So I do something in the first chaining task, I >>>> transform the 'Map' based class object to another normal class object, and >>>> the problem disappeared. >>>> >>>> Based on the new solution, I also tried to stop and restore job with >>>> savepoint (all successful). >>>> >>>> But, I also met another problem. Also this problem occurs while I stop >>>> the job, and also occurs in the second task after the source task. The log >>>> is below: >>>> 2021-02-05 16:21:26 >>>> java.io.EOFException >>>> at org.apache.flink.core.memory.DataInputDeserializer >>>> .readUnsignedByte(DataInputDeserializer.java:321) >>>> at org.apache.flink.types.StringValue.readString(StringValue.java: >>>> 783) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>> .deserialize(StringSerializer.java:75) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>> .deserialize(StringSerializer.java:33) >>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>> .deserialize(PojoSerializer.java:411) >>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>> .deserialize(PojoSerializer.java:411) >>>> at org.apache.flink.streaming.runtime.streamrecord. >>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>> at org.apache.flink.streaming.runtime.streamrecord. >>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>> at org.apache.flink.runtime.plugable. >>>> NonReusingDeserializationDelegate.read( >>>> NonReusingDeserializationDelegate.java:55) >>>> at org.apache.flink.runtime.io.network.api.serialization. >>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>> .emitNext(StreamTaskNetworkInput.java:145) >>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>> .processInput(StreamOneInputProcessor.java:67) >>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>> .processInput(StreamTwoInputProcessor.java:92) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .processInput(StreamTask.java:372) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .runMailboxLoop(StreamTask.java:575) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>> StreamTask.java:539) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> It is also about serialize and deserialize, but not related to kryo >>>> this time. >>>> >>>> >>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道: >>>> >>>>> From these snippets it is hard to tell what's going wrong. Could you >>>>> maybe give us a minimal example with which to reproduce the problem? >>>>> Alternatively, have you read through Flink's serializer documentation [1]? >>>>> Have you tried to use a simple POJO instead of inheriting from a HashMap? >>>>> >>>>> The stack trace looks as if the job fails deserializing some key of >>>>> your MapRecord map. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>> >>>>>> Some facts are possibly related with these, since another job do not >>>>>> meet these expectations. >>>>>> The problem job use a class which contains a field of Class >>>>>> MapRecord, and MapRecord is defined to extend HashMap so as to >>>>>> accept variable json data. >>>>>> >>>>>> Class MapRecord: >>>>>> >>>>>> @NoArgsConstructor >>>>>> @Slf4j >>>>>> public class MapRecord extends HashMap<Object, Object> implements >>>>>> Serializable { >>>>>> @Override >>>>>> public void setTimestamp(Long timestamp) { >>>>>> put("timestamp", timestamp); >>>>>> put("server_time", timestamp); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public Long getTimestamp() { >>>>>> try { >>>>>> Object ts = getOrDefault("timestamp", >>>>>> getOrDefault("server_time", 0L)); >>>>>> return ((Number) >>>>>> Optional.ofNullable(ts).orElse(0L)).longValue(); >>>>>> } catch (Exception e) { >>>>>> log.error("Error, MapRecord's timestamp invalid.", e); >>>>>> return 0L; >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> Class UserAccessLog: >>>>>> >>>>>> public class UserAccessLog extends AbstractRecord<UserAccessLog> { >>>>>> private MapRecord d; // I think this is related to the problem... >>>>>> >>>>>> ... ... >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道: >>>>>> >>>>>>> Actually the exception is different every time I stop the job. >>>>>>> Such as: >>>>>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>>> g^XT >>>>>>> The stack as I given above. >>>>>>> >>>>>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>> 2021-02-03 18:37:24 >>>>>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>>>>>> at java.util.ArrayList.get(ArrayList.java:433) >>>>>>> at com.esotericsoftware.kryo.util.MapReferenceResolver >>>>>>> .getReadObject(MapReferenceResolver.java:42) >>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java: >>>>>>> 805) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>> 759) >>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>> MapSerializer.java:135) >>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>> MapSerializer.java:21) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>> 761) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202 >>>>>>> ) >>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>> at org.apache.flink.runtime.plugable. >>>>>>> NonReusingDeserializationDelegate.read( >>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>>>> .processInput(StreamOneInputProcessor.java:67) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .processInput(StreamTask.java:372) >>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>>> StreamTask.java:539) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722 >>>>>>> ) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> (3) com.esotericsoftware.kryo.KryoException: Encountered >>>>>>> unregistered class ID: 96 >>>>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered >>>>>>> class ID: 96 >>>>>>> at com.esotericsoftware.kryo.util.DefaultClassResolver >>>>>>> .readClass(DefaultClassResolver.java:119) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>> 752) >>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>> MapSerializer.java:135) >>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>> MapSerializer.java:21) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>> 761) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202 >>>>>>> ) >>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>> at org.apache.flink.runtime.plugable. >>>>>>> NonReusingDeserializationDelegate.read( >>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>>>> .processInput(StreamOneInputProcessor.java:67) >>>>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .processInput(StreamTask.java:372) >>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>>> StreamTask.java:539) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722 >>>>>>> ) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> ... >>>>>>> >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> could you show us the job you are trying to resume? Is it a SQL job >>>>>>>> or a DataStream job, for example? >>>>>>>> >>>>>>>> From the stack trace, it looks as if the class g^XT is not on the >>>>>>>> class path. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I have a job, the checkpoint and savepoint all right. >>>>>>>>> But, if I stop the job using 'stop -p', after the savepoint >>>>>>>>> generated, then the job goes to fail. Here is the log: >>>>>>>>> >>>>>>>>> 2021-02-03 16:53:55,179 WARN >>>>>>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default >>>>>>>>> PassThroughFilter[null, null) -> >>>>>>>>> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter >>>>>>>>> (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched >>>>>>>>> from RUNNING to FAILED. >>>>>>>>> >>>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] >>>>>>>>> Caused by: java.lang.ClassNotFoundException: g^XT >>>>>>>>> at >>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>>>>>> ~[?:1.8.0_251] >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>>>>>>> ~[?:1.8.0_251] >>>>>>>>> at >>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at >>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>>>>>>> ~[?:1.8.0_251] >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251] >>>>>>>>> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251] >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>> ... 22 more >>>>>>>>> >>>>>>>>>