Yeah, and if it is different, why my job runs normally. The problem only occurres when I stop it.
Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午7:08写道: > Are you 100% sure that the jar files in the classpath (/lib folder) are > exactly the same on all machines? (It can happen quite easily in a > distributed standalone setup that some files are different) > > > On Fri, Feb 5, 2021 at 12:00 PM 赵一旦 <hinobl...@gmail.com> wrote: > >> Flink1.12.0; only using aligned checkpoint; Standalone Cluster; >> >> >> >> Robert Metzger <rmetz...@apache.org> 于2021年2月5日周五 下午6:52写道: >> >>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 >>> which can lead to corrupted data when using UC) >>> Can you tell us a little bit about your environment? (How are you >>> deploying Flink, which state backend are you using, what kind of job (I >>> guess DataStream API)) >>> >>> Somehow the process receiving the data is unable to deserialize it, most >>> likely because they are configured differently (different classpath, >>> dependency versions etc.) >>> >>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦 <hinobl...@gmail.com> wrote: >>> >>>> I do not think this is some code related problem anymore, maybe it is >>>> some bug? >>>> >>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月5日周五 下午4:30写道: >>>> >>>>> Hi all, I find that the failure always occurred in the second task, >>>>> after the source task. So I do something in the first chaining task, I >>>>> transform the 'Map' based class object to another normal class object, and >>>>> the problem disappeared. >>>>> >>>>> Based on the new solution, I also tried to stop and restore job with >>>>> savepoint (all successful). >>>>> >>>>> But, I also met another problem. Also this problem occurs while I stop >>>>> the job, and also occurs in the second task after the source task. The log >>>>> is below: >>>>> 2021-02-05 16:21:26 >>>>> java.io.EOFException >>>>> at org.apache.flink.core.memory.DataInputDeserializer >>>>> .readUnsignedByte(DataInputDeserializer.java:321) >>>>> at org.apache.flink.types.StringValue.readString(StringValue.java: >>>>> 783) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>>> .deserialize(StringSerializer.java:75) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer >>>>> .deserialize(StringSerializer.java:33) >>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>> .deserialize(PojoSerializer.java:411) >>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>> .deserialize(PojoSerializer.java:411) >>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>> at org.apache.flink.runtime.plugable. >>>>> NonReusingDeserializationDelegate.read( >>>>> NonReusingDeserializationDelegate.java:55) >>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>> .processInput(StreamOneInputProcessor.java:67) >>>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor >>>>> .processInput(StreamTwoInputProcessor.java:92) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .processInput(StreamTask.java:372) >>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .runMailboxLoop(StreamTask.java:575) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>> StreamTask.java:539) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> It is also about serialize and deserialize, but not related to kryo >>>>> this time. >>>>> >>>>> >>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午9:22写道: >>>>> >>>>>> From these snippets it is hard to tell what's going wrong. Could you >>>>>> maybe give us a minimal example with which to reproduce the problem? >>>>>> Alternatively, have you read through Flink's serializer documentation >>>>>> [1]? >>>>>> Have you tried to use a simple POJO instead of inheriting from a HashMap? >>>>>> >>>>>> The stack trace looks as if the job fails deserializing some key of >>>>>> your MapRecord map. >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>> >>>>>>> Some facts are possibly related with these, since another job do not >>>>>>> meet these expectations. >>>>>>> The problem job use a class which contains a field of Class >>>>>>> MapRecord, and MapRecord is defined to extend HashMap so as to >>>>>>> accept variable json data. >>>>>>> >>>>>>> Class MapRecord: >>>>>>> >>>>>>> @NoArgsConstructor >>>>>>> @Slf4j >>>>>>> public class MapRecord extends HashMap<Object, Object> implements >>>>>>> Serializable { >>>>>>> @Override >>>>>>> public void setTimestamp(Long timestamp) { >>>>>>> put("timestamp", timestamp); >>>>>>> put("server_time", timestamp); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public Long getTimestamp() { >>>>>>> try { >>>>>>> Object ts = getOrDefault("timestamp", >>>>>>> getOrDefault("server_time", 0L)); >>>>>>> return ((Number) >>>>>>> Optional.ofNullable(ts).orElse(0L)).longValue(); >>>>>>> } catch (Exception e) { >>>>>>> log.error("Error, MapRecord's timestamp invalid.", e); >>>>>>> return 0L; >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Class UserAccessLog: >>>>>>> >>>>>>> public class UserAccessLog extends AbstractRecord<UserAccessLog> { >>>>>>> private MapRecord d; // I think this is related to the problem... >>>>>>> >>>>>>> ... ... >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> 赵一旦 <hinobl...@gmail.com> 于2021年2月3日周三 下午6:43写道: >>>>>>> >>>>>>>> Actually the exception is different every time I stop the job. >>>>>>>> Such as: >>>>>>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>>>> g^XT >>>>>>>> The stack as I given above. >>>>>>>> >>>>>>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>>> 2021-02-03 18:37:24 >>>>>>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17 >>>>>>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >>>>>>>> at java.util.ArrayList.get(ArrayList.java:433) >>>>>>>> at com.esotericsoftware.kryo.util.MapReferenceResolver >>>>>>>> .getReadObject(MapReferenceResolver.java:42) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo >>>>>>>> .java:805) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>>> 759) >>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>> MapSerializer.java:135) >>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>> MapSerializer.java:21) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>>> 761) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java: >>>>>>>> 202) >>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46 >>>>>>>> ) >>>>>>>> at org.apache.flink.runtime.plugable. >>>>>>>> NonReusingDeserializationDelegate.read( >>>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor.java: >>>>>>>> 67) >>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>> StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java: >>>>>>>> 92) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>> .processInput(StreamTask.java:372) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>>>> StreamTask.java:539) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java: >>>>>>>> 722) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>> >>>>>>>> (3) com.esotericsoftware.kryo.KryoException: Encountered >>>>>>>> unregistered class ID: 96 >>>>>>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered >>>>>>>> class ID: 96 >>>>>>>> at com.esotericsoftware.kryo.util.DefaultClassResolver >>>>>>>> .readClass(DefaultClassResolver.java:119) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>>> 752) >>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>> MapSerializer.java:135) >>>>>>>> at com.esotericsoftware.kryo.serializers.MapSerializer.read( >>>>>>>> MapSerializer.java:21) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >>>>>>>> 761) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.kryo. >>>>>>>> KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer >>>>>>>> .deserialize(PojoSerializer.java:411) >>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java: >>>>>>>> 202) >>>>>>>> at org.apache.flink.streaming.runtime.streamrecord. >>>>>>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46 >>>>>>>> ) >>>>>>>> at org.apache.flink.runtime.plugable. >>>>>>>> NonReusingDeserializationDelegate.read( >>>>>>>> NonReusingDeserializationDelegate.java:55) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>>>> .emitNext(StreamTaskNetworkInput.java:145) >>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor.java: >>>>>>>> 67) >>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>> StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java: >>>>>>>> 92) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>> .processInput(StreamTask.java:372) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>> .runMailboxLoop(StreamTask.java:575) >>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>>>> StreamTask.java:539) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java: >>>>>>>> 722) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年2月3日周三 下午6:28写道: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> could you show us the job you are trying to resume? Is it a SQL >>>>>>>>> job or a DataStream job, for example? >>>>>>>>> >>>>>>>>> From the stack trace, it looks as if the class g^XT is not on the >>>>>>>>> class path. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Wed, Feb 3, 2021 at 10:30 AM 赵一旦 <hinobl...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I have a job, the checkpoint and savepoint all right. >>>>>>>>>> But, if I stop the job using 'stop -p', after the savepoint >>>>>>>>>> generated, then the job goes to fail. Here is the log: >>>>>>>>>> >>>>>>>>>> 2021-02-03 16:53:55,179 WARN >>>>>>>>>> org.apache.flink.runtime.taskmanager.Task [] - >>>>>>>>>> ual_ft_uid_subid_SidIncludeFilter -> ual_ft_uid_subid_Default >>>>>>>>>> PassThroughFilter[null, null) -> >>>>>>>>>> ual_ft_uid_subid_UalUidFtExtractor -> ual_ft_uid_subid_EmptyUidFilter >>>>>>>>>> (17/30)#0 (46abce5d1148b56094726d442df2fd9c) switched >>>>>>>>>> from RUNNING to FAILED. >>>>>>>>>> >>>>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>>>>>> g^XT >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >>>>>>>>>> [flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] >>>>>>>>>> Caused by: java.lang.ClassNotFoundException: g^XT >>>>>>>>>> at >>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) >>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) >>>>>>>>>> ~[?:1.8.0_251] >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251] >>>>>>>>>> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251] >>>>>>>>>> at >>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>>>>> ~[flink-dist_2.11-1.12.0.jar:1.12.0] >>>>>>>>>> ... 22 more >>>>>>>>>> >>>>>>>>>>