[jira] [Commented] (FLINK-12031) the registerFactory method of TypeExtractor Should not be private
[ https://issues.apache.org/jira/browse/FLINK-12031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283883#comment-17283883 ] Ben La Monica commented on FLINK-12031: --- I'm trying to figure out why LocalDate's aren't being serialized, and noticed this method. As far as I can tell, `registeredTypeInfoFactories` is NEVER able to have anything in it, so we should just remove it? How exactly does one register TypeInfo factories? > the registerFactory method of TypeExtractor Should not be private > -- > > Key: FLINK-12031 > URL: https://issues.apache.org/jira/browse/FLINK-12031 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Reporter: frank wang >Priority: Minor > > [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java] > {code:java} > /** > * Registers a type information factory globally for a certain type. Every > following type extraction > * operation will use the provided factory for this type. The factory will > have highest precedence > * for this type. In a hierarchy of types the registered factory has higher > precedence than annotations > * at the same level but lower precedence than factories defined down the > hierarchy. > * > * @param t type for which a new factory is registered > * @param factory type information factory that will produce {@link > TypeInformation} > */ > private static void registerFactory(Type t, Class > factory) { >Preconditions.checkNotNull(t, "Type parameter must not be null."); >Preconditions.checkNotNull(factory, "Factory parameter must not be null."); >if (!TypeInfoFactory.class.isAssignableFrom(factory)) { > throw new IllegalArgumentException("Class is not a TypeInfoFactory."); >} >if (registeredTypeInfoFactories.containsKey(t)) { > throw new InvalidTypesException("A TypeInfoFactory for type '" + t + "' > is already registered."); >} >registeredTypeInfoFactories.put(t, factory); > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19968) Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
[ https://issues.apache.org/jira/browse/FLINK-19968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250656#comment-17250656 ] Ben La Monica edited comment on FLINK-19968 at 12/16/20, 11:38 PM: --- I had this problem as well with a shaded jar, and it's because there are files in the META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file in multiple jars. This causes the file to be overwritten, and if you are unlucky (like I was) it will overwrite it with a blank file or with a pipeline executor factory that doesn't support your deployment type. I added this to my maven-shade plugin to solve the problem: {code:java} META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory {code} was (Author: mstar_benl): I had this problem as well with a shaded jar, and it's because there are files in the META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file in multiple jars. This causes the file to be overwritten, and if you are unlucky (like I was) it will overwrite it with a blank file or with a pipeline executor factory that doesn't support your deployment type. I added this to my maven-shade plugin to solve the problem: {code:java} META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory {code} > Exception in thread "main" java.lang.IllegalStateException: No > ExecutorFactory found to execute the application. > > > Key: FLINK-19968 > URL: https://issues.apache.org/jira/browse/FLINK-19968 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: PeteZhang >Priority: Major > Attachments: iShot2020-11-04 18.06.39.png > > > In idea, I added the pom of flink-clients, which can run normally. After I > packaged it with maven and run it with java-jar, this error would still be > reported. All versions before 1.11 can run normally, and it has been > determined that there is this dependency in the pom. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19968) Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
[ https://issues.apache.org/jira/browse/FLINK-19968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250656#comment-17250656 ] Ben La Monica edited comment on FLINK-19968 at 12/16/20, 11:38 PM: --- I had this problem as well with a shaded jar, and it's because there are files in the META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file in multiple jars. This causes the file to be overwritten, and if you are unlucky (like I was) it will overwrite it with a blank file or with a pipeline executor factory that doesn't support your deployment type. I added this to my maven-shade plugin to solve the problem: {code:java} META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory {code} was (Author: mstar_benl): I had this problem as well with a shaded jar, and it's because there are files in the META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file in multiple jars. This causes the file to be overwritten, and if you are unlucky (like I was) it will overwrite it with a blank file or with a pipeline executor factory that doesn't support your deployment type. I added this to my maven-shade plugin to solve the problem: {code:java} META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory {code} > Exception in thread "main" java.lang.IllegalStateException: No > ExecutorFactory found to execute the application. > > > Key: FLINK-19968 > URL: https://issues.apache.org/jira/browse/FLINK-19968 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: PeteZhang >Priority: Major > Attachments: iShot2020-11-04 18.06.39.png > > > In idea, I added the pom of flink-clients, which can run normally. After I > packaged it with maven and run it with java-jar, this error would still be > reported. All versions before 1.11 can run normally, and it has been > determined that there is this dependency in the pom. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19968) Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
[ https://issues.apache.org/jira/browse/FLINK-19968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250656#comment-17250656 ] Ben La Monica commented on FLINK-19968: --- I had this problem as well with a shaded jar, and it's because there are files in the META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file in multiple jars. This causes the file to be overwritten, and if you are unlucky (like I was) it will overwrite it with a blank file or with a pipeline executor factory that doesn't support your deployment type. I added this to my maven-shade plugin to solve the problem: {code:java} META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory {code} > Exception in thread "main" java.lang.IllegalStateException: No > ExecutorFactory found to execute the application. > > > Key: FLINK-19968 > URL: https://issues.apache.org/jira/browse/FLINK-19968 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: PeteZhang >Priority: Major > Attachments: iShot2020-11-04 18.06.39.png > > > In idea, I added the pom of flink-clients, which can run normally. After I > packaged it with maven and run it with java-jar, this error would still be > reported. All versions before 1.11 can run normally, and it has been > determined that there is this dependency in the pom. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905243#comment-16905243 ] Ben La Monica commented on FLINK-12122: --- [~till.rohrmann], I realize that you're busy, is there anything I can do to help on this issue? > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853806#comment-16853806 ] Ben La Monica commented on FLINK-12122: --- I am able to get even distribution as long as everything at the same parallelism, so that is what I'll do for now and will eagerly await your feature :). > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845115#comment-16845115 ] Ben La Monica commented on FLINK-12122: --- It actually does a worse job. Let's stuff everything onto a single task manager! It would if it could! |ip-10-255-58-174:38717|27| |ip-10-255-58-179:41079|15| |ip-10-255-58-44:34639|2| > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845095#comment-16845095 ] Ben La Monica commented on FLINK-12122: --- I think the thing that is throwing everything off is that I have 3 slot sharing groups set up. 1 for data enrichment (default) and 1 for calculation and 1 for sorting. I added the slot sharing group because without it a cpu intensive message coming in would result in all other messages just backing up behind it on the same partition instead of processing in parallel, but maybe I can figure out how to prevent that without a slot sharing group. I'll try with everything in the same slot sharing group and parallelism but just force a new chain at the calculation step and the sorting step. (you can see the slot sharing groups break everything apart at the calc, sorter and gatherer operators). !image-2019-05-21-13-02-50-251.png! > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben La Monica updated FLINK-12122: -- Attachment: image-2019-05-21-13-02-50-251.png > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845062#comment-16845062 ] Ben La Monica commented on FLINK-12122: --- I'm running into this exact problem, I have a CoProcessFunction that contains a large amount of state, and they are spread primarily on only 2 of my 6 task managers. This causes memory issues on those boxes and then there is 60GB of ram on the third box unused. ||TaskManager||Num Slots Used for Memory Intensive Tasks|| |ip-10-255-58-174:39389|17| |ip-10-255-58-174:45161|8| |ip-10-255-58-179:33657|1| |ip-10-255-58-179:38439|0| |ip-10-255-58-44:40181|6| |ip-10-255-58-44:45435|18| And then I end up with resource usage in my YARN cluster that looks like this: !image-2019-05-21-12-28-29-538.png! Is there an estimate on when this problem will be fixed? I'm pretty much blocked unless I move to much larger servers and that is wasteful of money :). > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Attachments: image-2019-05-21-12-28-29-538.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben La Monica updated FLINK-12122: -- Attachment: image-2019-05-21-12-28-29-538.png > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Attachments: image-2019-05-21-12-28-29-538.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10682) EOFException occurs during deserialization of Avro class
[ https://issues.apache.org/jira/browse/FLINK-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben La Monica closed FLINK-10682. - Resolution: Resolved Fix Version/s: 1.5.5 [~till.rohrmann] that did the trick! It no longer has those exceptions. I think I must have also been running into FLINK-10469. I'll close out this ticket. > EOFException occurs during deserialization of Avro class > > > Key: FLINK-10682 > URL: https://issues.apache.org/jira/browse/FLINK-10682 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.4 > Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4) > 3 task managers, 1 job manager running in YARN in Hadoop > Running on Amazon Linux with OpenJDK 1.8 >Reporter: Ben La Monica >Priority: Critical > Fix For: 1.5.5 > > > I'm having trouble (which usually occurs after an hour of processing in a > StreamExecutionEnvironment) where I get this failure message. I'm at a loss > for what is causing it. I'm running this in AWS on EMR 5.17 with 3 task > managers and a job manager running in a YARN cluster and I've upgraded my > flink libraries to 1.5.4 to bypass another serialization issue and the > kerberos auth issues. > The avro classes that are being deserialized were generated with avro 1.8.2. > {code:java} > 2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task > thread="Calculate Estimated NAV -> Split into single messages (3/10)" > Calculate Estimated NAV -> Split into single messages (3/10) (de7d8fa77 > 84903a475391d0168d56f2e) switched from RUNNING to FAILED. > java.io.EOFException: null > at > org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219) > at > org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138) > at > org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70) > at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at > org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:172) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:208) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:208) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:748){code} > Do you have any ideas on how I could further troubleshoot this issue? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10682) EOFException occurs during deserialization of Avro class
[ https://issues.apache.org/jira/browse/FLINK-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678306#comment-16678306 ] Ben La Monica commented on FLINK-10682: --- Thank you, I'll give it a try. > EOFException occurs during deserialization of Avro class > > > Key: FLINK-10682 > URL: https://issues.apache.org/jira/browse/FLINK-10682 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.4 > Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4) > 3 task managers, 1 job manager running in YARN in Hadoop > Running on Amazon Linux with OpenJDK 1.8 >Reporter: Ben La Monica >Priority: Critical > > I'm having trouble (which usually occurs after an hour of processing in a > StreamExecutionEnvironment) where I get this failure message. I'm at a loss > for what is causing it. I'm running this in AWS on EMR 5.17 with 3 task > managers and a job manager running in a YARN cluster and I've upgraded my > flink libraries to 1.5.4 to bypass another serialization issue and the > kerberos auth issues. > The avro classes that are being deserialized were generated with avro 1.8.2. > {code:java} > 2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task > thread="Calculate Estimated NAV -> Split into single messages (3/10)" > Calculate Estimated NAV -> Split into single messages (3/10) (de7d8fa77 > 84903a475391d0168d56f2e) switched from RUNNING to FAILED. > java.io.EOFException: null > at > org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219) > at > org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138) > at > org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70) > at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at > org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:172) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:208) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:208) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:748){code} > Do you have any ideas on how I could further troubleshoot this issue? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10682) EOFException occurs during deserialization of Avro class
[ https://issues.apache.org/jira/browse/FLINK-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678193#comment-16678193 ] Ben La Monica commented on FLINK-10682: --- So an update on this. I don't think it's a mismatch in avro classes because this is running in a yarn cluster and it copies all of the code to each node, I've installed more logging and decreased the number of task managers so that it's easier to track. I'm still seeing this problem, and it's not always due to AVRO. Here is another stack trace... {code:java} 2018-11-07 12:40:55,495 [INFO ] class=o.a.f.r.e.ExecutionGraph thread="flink-akka.actor.default-dispatcher-5202" Combine Price and Fund (7/9) (f0854988c294e4b256718746aff6bd6c) switched from RUNNING to FAILED. java.io.IOException: Corrupt stream, found tag: 82 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748){code} I think it may be due to a very large object being serialized. I will continue to try to track it down. > EOFException occurs during deserialization of Avro class > > > Key: FLINK-10682 > URL: https://issues.apache.org/jira/browse/FLINK-10682 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.4 > Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4) > 3 task managers, 1 job manager running in YARN in Hadoop > Running on Amazon Linux with OpenJDK 1.8 >Reporter: Ben La Monica >Priority: Critical > > I'm having trouble (which usually occurs after an hour of processing in a > StreamExecutionEnvironment) where I get this failure message. I'm at a loss > for what is causing it. I'm running this in AWS on EMR 5.17 with 3 task > managers and a job manager running in a YARN cluster and I've upgraded my > flink libraries to 1.5.4 to bypass another serialization issue and the > kerberos auth issues. > The avro classes that are being deserialized were generated with avro 1.8.2. > {code:java} > 2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task > thread="Calculate Estimated NAV -> Split into single messages (3/10)" > Calculate Estimated NAV -> Split into single messages (3/10) (de7d8fa77 > 84903a475391d0168d56f2e) switched from RUNNING to FAILED. > java.io.EOFException: null > at > org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219) > at > org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138) > at > org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70) > at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at >
[jira] [Created] (FLINK-10682) EOFException occurs during deserialization of Avro class
Ben La Monica created FLINK-10682: - Summary: EOFException occurs during deserialization of Avro class Key: FLINK-10682 URL: https://issues.apache.org/jira/browse/FLINK-10682 Project: Flink Issue Type: Bug Components: Type Serialization System Affects Versions: 1.5.4 Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4) 3 task managers, 1 job manager running in YARN in Hadoop Running on Amazon Linux with OpenJDK 1.8 Reporter: Ben La Monica I'm having trouble (which usually occurs after an hour of processing in a StreamExecutionEnvironment) where I get this failure message. I'm at a loss for what is causing it. I'm running this in AWS on EMR 5.17 with 3 task managers and a job manager running in a YARN cluster and I've upgraded my flink libraries to 1.5.4 to bypass another serialization issue and the kerberos auth issues. The avro classes that are being deserialized were generated with avro 1.8.2. {code:java} 2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task thread="Calculate Estimated NAV -> Split into single messages (3/10)" Calculate Estimated NAV -> Split into single messages (3/10) (de7d8fa77 84903a475391d0168d56f2e) switched from RUNNING to FAILED. java.io.EOFException: null at org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219) at org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138) at org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70) at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:172) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:208) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:208) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748){code} Do you have any ideas on how I could further troubleshoot this issue? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab
[ https://issues.apache.org/jira/browse/FLINK-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben La Monica closed FLINK-10278. - Resolution: Fixed It appears that this was fixed as part of FLINK-10013. Closing. > Flink in YARN cluster uses wrong path when looking for Kerberos Keytab > -- > > Key: FLINK-10278 > URL: https://issues.apache.org/jira/browse/FLINK-10278 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.2 >Reporter: Ben La Monica >Priority: Major > Fix For: 1.5.3 > > > While trying to run Flink in a yarn cluster with more than 1 physical > computer in the cluster, the first task manager will start fine, but the > second task manager fails to start because it is looking for the kerberos > keytab in the location that is on the *FIRST* taskmanager. See below log > lines (unrelated lines removed for clarity): > {code:java} > 2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main Current working/local Directory: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 > 2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main > Setting directories for temporary files to: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 > 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main keytab path: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab > 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main YARN daemon is running as: hadoop Yarn client user obtainer: > hadoop > 2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main YARN TaskManager initialization failed. > org.apache.flink.configuration.IllegalConfigurationException: Kerberos login > configuration is invalid; keytab > '/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab' > does not exist > at > org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139) > at > org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90) > at > org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71) > at > org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120) > at > org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code} > > You'll notice that the log statement says that the keytab should be located > in container 000319: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab > But after I changed the code so that it would show the file that it's > actually checking when doing the SecurityConfiguration init it is actually > checking container 01, which is not on the host: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab > This causes the YARN task managers to restart over and over again (which is > why we're up to container 319!) > I'll submit a PR for this fix, though basically it's just moving the > initialization of the SecurityConfiguration down 2 lines. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab
[ https://issues.apache.org/jira/browse/FLINK-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben La Monica updated FLINK-10278: -- Fix Version/s: 1.5.3 > Flink in YARN cluster uses wrong path when looking for Kerberos Keytab > -- > > Key: FLINK-10278 > URL: https://issues.apache.org/jira/browse/FLINK-10278 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.2 >Reporter: Ben La Monica >Priority: Major > Fix For: 1.5.3 > > > While trying to run Flink in a yarn cluster with more than 1 physical > computer in the cluster, the first task manager will start fine, but the > second task manager fails to start because it is looking for the kerberos > keytab in the location that is on the *FIRST* taskmanager. See below log > lines (unrelated lines removed for clarity): > {code:java} > 2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main Current working/local Directory: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 > 2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main > Setting directories for temporary files to: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 > 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main keytab path: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab > 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main YARN daemon is running as: hadoop Yarn client user obtainer: > hadoop > 2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main YARN TaskManager initialization failed. > org.apache.flink.configuration.IllegalConfigurationException: Kerberos login > configuration is invalid; keytab > '/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab' > does not exist > at > org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139) > at > org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90) > at > org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71) > at > org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120) > at > org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code} > > You'll notice that the log statement says that the keytab should be located > in container 000319: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab > But after I changed the code so that it would show the file that it's > actually checking when doing the SecurityConfiguration init it is actually > checking container 01, which is not on the host: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab > This causes the YARN task managers to restart over and over again (which is > why we're up to container 319!) > I'll submit a PR for this fix, though basically it's just moving the > initialization of the SecurityConfiguration down 2 lines. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab
[ https://issues.apache.org/jira/browse/FLINK-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602387#comment-16602387 ] Ben La Monica commented on FLINK-10278: --- It appears to already be fixed in 1.5.3. > Flink in YARN cluster uses wrong path when looking for Kerberos Keytab > -- > > Key: FLINK-10278 > URL: https://issues.apache.org/jira/browse/FLINK-10278 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.2 >Reporter: Ben La Monica >Priority: Major > > While trying to run Flink in a yarn cluster with more than 1 physical > computer in the cluster, the first task manager will start fine, but the > second task manager fails to start because it is looking for the kerberos > keytab in the location that is on the *FIRST* taskmanager. See below log > lines (unrelated lines removed for clarity): > {code:java} > 2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main Current working/local Directory: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 > 2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main > Setting directories for temporary files to: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 > 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main keytab path: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab > 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main YARN daemon is running as: hadoop Yarn client user obtainer: > hadoop > 2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner > thread=main YARN TaskManager initialization failed. > org.apache.flink.configuration.IllegalConfigurationException: Kerberos login > configuration is invalid; keytab > '/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab' > does not exist > at > org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139) > at > org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90) > at > org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71) > at > org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120) > at > org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code} > > You'll notice that the log statement says that the keytab should be located > in container 000319: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab > But after I changed the code so that it would show the file that it's > actually checking when doing the SecurityConfiguration init it is actually > checking container 01, which is not on the host: > /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab > This causes the YARN task managers to restart over and over again (which is > why we're up to container 319!) > I'll submit a PR for this fix, though basically it's just moving the > initialization of the SecurityConfiguration down 2 lines. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab
Ben La Monica created FLINK-10278: - Summary: Flink in YARN cluster uses wrong path when looking for Kerberos Keytab Key: FLINK-10278 URL: https://issues.apache.org/jira/browse/FLINK-10278 Project: Flink Issue Type: Bug Affects Versions: 1.5.2 Reporter: Ben La Monica While trying to run Flink in a yarn cluster with more than 1 physical computer in the cluster, the first task manager will start fine, but the second task manager fails to start because it is looking for the kerberos keytab in the location that is on the *FIRST* taskmanager. See below log lines (unrelated lines removed for clarity): {code:java} 2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner thread=main Current working/local Directory: /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main Setting directories for temporary files to: /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner thread=main keytab path: /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner thread=main YARN daemon is running as: hadoop Yarn client user obtainer: hadoop 2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner thread=main YARN TaskManager initialization failed. org.apache.flink.configuration.IllegalConfigurationException: Kerberos login configuration is invalid; keytab '/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab' does not exist at org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139) at org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90) at org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71) at org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120) at org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code} You'll notice that the log statement says that the keytab should be located in container 000319: /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab But after I changed the code so that it would show the file that it's actually checking when doing the SecurityConfiguration init it is actually checking container 01, which is not on the host: /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab This causes the YARN task managers to restart over and over again (which is why we're up to container 319!) I'll submit a PR for this fix, though basically it's just moving the initialization of the SecurityConfiguration down 2 lines. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10204) Job is marked as FAILED after serialization exception
Ben La Monica created FLINK-10204: - Summary: Job is marked as FAILED after serialization exception Key: FLINK-10204 URL: https://issues.apache.org/jira/browse/FLINK-10204 Project: Flink Issue Type: Bug Reporter: Ben La Monica We have a long running flink job that eventually fails and is shut down due to an internal serialization exception that we keep on getting. Here is the stack trace: {code:java} 2018-08-23 18:39:48,199 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. java.io.IOException: Corrupt stream, found tag: 127 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748){code} I think I have tracked down the issue to a mismatch in the serialization/deserialization/copy code in the StreamElementSerializer with regards to the LATENCY_MARKER. The Serialization logic writes 3 LONGs and an INT. The copy logic only writes (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an EOFException, and fixing the copy code causes the test to pass again. I've written a unit test that highlights the problem, and have written the code to correct it. I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)