[jira] [Commented] (FLINK-12031) the registerFactory method of TypeExtractor Should not be private

2021-02-12 Thread Ben La Monica (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283883#comment-17283883
 ] 

Ben La Monica commented on FLINK-12031:
---

I'm trying to figure out why LocalDate's aren't being serialized, and noticed 
this method. As far as I can tell, `registeredTypeInfoFactories` is NEVER able 
to have anything in it, so we should just remove it? How exactly does one 
register TypeInfo factories?

> the registerFactory method of TypeExtractor  Should not be private
> --
>
> Key: FLINK-12031
> URL: https://issues.apache.org/jira/browse/FLINK-12031
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Reporter: frank wang
>Priority: Minor
>
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java]
> {code:java}
> /**
>  * Registers a type information factory globally for a certain type. Every 
> following type extraction
>  * operation will use the provided factory for this type. The factory will 
> have highest precedence
>  * for this type. In a hierarchy of types the registered factory has higher 
> precedence than annotations
>  * at the same level but lower precedence than factories defined down the 
> hierarchy.
>  *
>  * @param t type for which a new factory is registered
>  * @param factory type information factory that will produce {@link 
> TypeInformation}
>  */
> private static void registerFactory(Type t, Class 
> factory) {
>Preconditions.checkNotNull(t, "Type parameter must not be null.");
>Preconditions.checkNotNull(factory, "Factory parameter must not be null.");
>if (!TypeInfoFactory.class.isAssignableFrom(factory)) {
>   throw new IllegalArgumentException("Class is not a TypeInfoFactory.");
>}
>if (registeredTypeInfoFactories.containsKey(t)) {
>   throw new InvalidTypesException("A TypeInfoFactory for type '" + t + "' 
> is already registered.");
>}
>registeredTypeInfoFactories.put(t, factory);
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19968) Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2020-12-16 Thread Ben La Monica (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250656#comment-17250656
 ] 

Ben La Monica edited comment on FLINK-19968 at 12/16/20, 11:38 PM:
---

I had this problem as well with a shaded jar, and it's because there are files 
in the 
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file 
in multiple jars. This causes the file to be overwritten, and if you are 
unlucky (like I was) it will overwrite it with a blank file or with a pipeline 
executor factory that doesn't support your deployment type.

I added this to my maven-shade plugin to solve the problem:
{code:java}


META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory

{code}


was (Author: mstar_benl):
I had this problem as well with a shaded jar, and it's because there are files 
in the 
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file 
in multiple jars. This causes the file to be overwritten, and if you are 
unlucky (like I was) it will overwrite it with a blank file or with a pipeline 
executor factory that doesn't support your deployment type.

I added this to my maven-shade plugin to solve the problem:
{code:java}


META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
 {code}

> Exception in thread "main" java.lang.IllegalStateException: No 
> ExecutorFactory found to execute the application.
> 
>
> Key: FLINK-19968
> URL: https://issues.apache.org/jira/browse/FLINK-19968
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.2
>Reporter: PeteZhang
>Priority: Major
> Attachments: iShot2020-11-04 18.06.39.png
>
>
> In idea, I added the pom of flink-clients, which can run normally. After I 
> packaged it with maven and run it with java-jar, this error would still be 
> reported. All versions before 1.11 can run normally, and it has been 
> determined that there is this dependency in the pom.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-19968) Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2020-12-16 Thread Ben La Monica (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250656#comment-17250656
 ] 

Ben La Monica edited comment on FLINK-19968 at 12/16/20, 11:38 PM:
---

I had this problem as well with a shaded jar, and it's because there are files 
in the 
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file 
in multiple jars. This causes the file to be overwritten, and if you are 
unlucky (like I was) it will overwrite it with a blank file or with a pipeline 
executor factory that doesn't support your deployment type.

I added this to my maven-shade plugin to solve the problem:
{code:java}


META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
 {code}


was (Author: mstar_benl):
I had this problem as well with a shaded jar, and it's because there are files 
in the 
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file 
in multiple jars. This causes the file to be overwritten, and if you are 
unlucky (like I was) it will overwrite it with a blank file or with a pipeline 
executor factory that doesn't support your deployment type.

I added this to my maven-shade plugin to solve the problem:
{code:java}
 
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
 {code}

> Exception in thread "main" java.lang.IllegalStateException: No 
> ExecutorFactory found to execute the application.
> 
>
> Key: FLINK-19968
> URL: https://issues.apache.org/jira/browse/FLINK-19968
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.2
>Reporter: PeteZhang
>Priority: Major
> Attachments: iShot2020-11-04 18.06.39.png
>
>
> In idea, I added the pom of flink-clients, which can run normally. After I 
> packaged it with maven and run it with java-jar, this error would still be 
> reported. All versions before 1.11 can run normally, and it has been 
> determined that there is this dependency in the pom.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19968) Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2020-12-16 Thread Ben La Monica (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250656#comment-17250656
 ] 

Ben La Monica commented on FLINK-19968:
---

I had this problem as well with a shaded jar, and it's because there are files 
in the 
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory file 
in multiple jars. This causes the file to be overwritten, and if you are 
unlucky (like I was) it will overwrite it with a blank file or with a pipeline 
executor factory that doesn't support your deployment type.

I added this to my maven-shade plugin to solve the problem:
{code:java}
 
META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
 {code}

> Exception in thread "main" java.lang.IllegalStateException: No 
> ExecutorFactory found to execute the application.
> 
>
> Key: FLINK-19968
> URL: https://issues.apache.org/jira/browse/FLINK-19968
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.2
>Reporter: PeteZhang
>Priority: Major
> Attachments: iShot2020-11-04 18.06.39.png
>
>
> In idea, I added the pom of flink-clients, which can run normally. After I 
> packaged it with maven and run it with java-jar, this error would still be 
> reported. All versions before 1.11 can run normally, and it has been 
> determined that there is this dependency in the pom.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-08-12 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905243#comment-16905243
 ] 

Ben La Monica commented on FLINK-12122:
---

[~till.rohrmann], I realize that you're busy, is there anything I can do to 
help on this issue?

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-06-01 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853806#comment-16853806
 ] 

Ben La Monica commented on FLINK-12122:
---

I am able to get even distribution as long as everything at the same 
parallelism, so that is what I'll do for now and will eagerly await your 
feature :).

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-05-21 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845115#comment-16845115
 ] 

Ben La Monica commented on FLINK-12122:
---

It actually does a worse job. Let's stuff everything onto a single task 
manager! It would if it could!
|ip-10-255-58-174:38717|27|
|ip-10-255-58-179:41079|15|
|ip-10-255-58-44:34639|2|

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-05-21 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845095#comment-16845095
 ] 

Ben La Monica commented on FLINK-12122:
---

I think the thing that is throwing everything off is that I have 3 slot sharing 
groups set up. 1 for data enrichment (default) and 1 for calculation and 1 for 
sorting. I added the slot sharing group because without it a cpu intensive 
message coming in would result in all other messages just backing up behind it 
on the same partition instead of processing in parallel, but maybe I can figure 
out how to prevent that without a slot sharing group.

I'll try with everything in the same slot sharing group and parallelism but 
just force a new chain at the calculation step and the sorting step. (you can 
see the slot sharing groups break everything apart at the calc, sorter and 
gatherer operators).

!image-2019-05-21-13-02-50-251.png!

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-05-21 Thread Ben La Monica (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben La Monica updated FLINK-12122:
--
Attachment: image-2019-05-21-13-02-50-251.png

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-05-21 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845062#comment-16845062
 ] 

Ben La Monica commented on FLINK-12122:
---

I'm running into this exact problem, I have a CoProcessFunction that contains a 
large amount of state, and they are spread primarily on only 2 of my 6 task 
managers. This causes memory issues on those boxes and then there is 60GB of 
ram on the third box unused.
||TaskManager||Num Slots Used for Memory Intensive Tasks||
|ip-10-255-58-174:39389|17|
|ip-10-255-58-174:45161|8|
|ip-10-255-58-179:33657|1|
|ip-10-255-58-179:38439|0|
|ip-10-255-58-44:40181|6|
|ip-10-255-58-44:45435|18|

And then I end up with resource usage in my YARN cluster that looks like this:

!image-2019-05-21-12-28-29-538.png!

Is there an estimate on when this problem will be fixed? I'm pretty much 
blocked unless I move to much larger servers and that is wasteful of money :).

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
> Attachments: image-2019-05-21-12-28-29-538.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-05-21 Thread Ben La Monica (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben La Monica updated FLINK-12122:
--
Attachment: image-2019-05-21-12-28-29-538.png

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
> Attachments: image-2019-05-21-12-28-29-538.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10682) EOFException occurs during deserialization of Avro class

2018-11-07 Thread Ben La Monica (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben La Monica closed FLINK-10682.
-
   Resolution: Resolved
Fix Version/s: 1.5.5

[~till.rohrmann] that did the trick! It no longer has those exceptions. I think 
I must have also been running into FLINK-10469. I'll close out this ticket. 

> EOFException occurs during deserialization of Avro class
> 
>
> Key: FLINK-10682
> URL: https://issues.apache.org/jira/browse/FLINK-10682
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.4
> Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4)
> 3 task managers, 1 job manager running in YARN in Hadoop
> Running on Amazon Linux with OpenJDK 1.8
>Reporter: Ben La Monica
>Priority: Critical
> Fix For: 1.5.5
>
>
> I'm having trouble (which usually occurs after an hour of processing in a 
> StreamExecutionEnvironment) where I get this failure message. I'm at a loss 
> for what is causing it. I'm running this in AWS on EMR 5.17 with 3 task 
> managers and a job manager running in a YARN cluster and I've upgraded my 
> flink libraries to 1.5.4 to bypass another serialization issue and the 
> kerberos auth issues.
> The avro classes that are being deserialized were generated with avro 1.8.2.
> {code:java}
> 2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task 
> thread="Calculate Estimated NAV -> Split into single messages (3/10)" 
> Calculate Estimated NAV -> Split into single messages (3/10) (de7d8fa77
> 84903a475391d0168d56f2e) switched from RUNNING to FAILED.
> java.io.EOFException: null
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219)
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138)
> at 
> org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70)
> at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:172)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:208)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:208)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:748){code}
> Do you have any ideas on how I could further troubleshoot this issue?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10682) EOFException occurs during deserialization of Avro class

2018-11-07 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678306#comment-16678306
 ] 

Ben La Monica commented on FLINK-10682:
---

Thank you, I'll give it a try.

> EOFException occurs during deserialization of Avro class
> 
>
> Key: FLINK-10682
> URL: https://issues.apache.org/jira/browse/FLINK-10682
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.4
> Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4)
> 3 task managers, 1 job manager running in YARN in Hadoop
> Running on Amazon Linux with OpenJDK 1.8
>Reporter: Ben La Monica
>Priority: Critical
>
> I'm having trouble (which usually occurs after an hour of processing in a 
> StreamExecutionEnvironment) where I get this failure message. I'm at a loss 
> for what is causing it. I'm running this in AWS on EMR 5.17 with 3 task 
> managers and a job manager running in a YARN cluster and I've upgraded my 
> flink libraries to 1.5.4 to bypass another serialization issue and the 
> kerberos auth issues.
> The avro classes that are being deserialized were generated with avro 1.8.2.
> {code:java}
> 2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task 
> thread="Calculate Estimated NAV -> Split into single messages (3/10)" 
> Calculate Estimated NAV -> Split into single messages (3/10) (de7d8fa77
> 84903a475391d0168d56f2e) switched from RUNNING to FAILED.
> java.io.EOFException: null
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219)
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138)
> at 
> org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70)
> at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:172)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:208)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:208)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:748){code}
> Do you have any ideas on how I could further troubleshoot this issue?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10682) EOFException occurs during deserialization of Avro class

2018-11-07 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678193#comment-16678193
 ] 

Ben La Monica commented on FLINK-10682:
---

So an update on this. I don't think it's a mismatch in avro classes because 
this is running in a yarn cluster and it copies all of the code to each node, 
I've installed more logging and decreased the number of task managers so that 
it's easier to track. I'm still seeing this problem, and it's not always due to 
AVRO. Here is another stack trace...
{code:java}
2018-11-07 12:40:55,495 [INFO ] class=o.a.f.r.e.ExecutionGraph 
thread="flink-akka.actor.default-dispatcher-5202" Combine Price and Fund (7/9) 
(f0854988c294e4b256718746aff6bd6c) switched from RUNNING to FAILED.
java.io.IOException: Corrupt stream, found tag: 82
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748){code}

I think it may be due to a very large object being serialized. I will continue 
to try to track it down.

> EOFException occurs during deserialization of Avro class
> 
>
> Key: FLINK-10682
> URL: https://issues.apache.org/jira/browse/FLINK-10682
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.4
> Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4)
> 3 task managers, 1 job manager running in YARN in Hadoop
> Running on Amazon Linux with OpenJDK 1.8
>Reporter: Ben La Monica
>Priority: Critical
>
> I'm having trouble (which usually occurs after an hour of processing in a 
> StreamExecutionEnvironment) where I get this failure message. I'm at a loss 
> for what is causing it. I'm running this in AWS on EMR 5.17 with 3 task 
> managers and a job manager running in a YARN cluster and I've upgraded my 
> flink libraries to 1.5.4 to bypass another serialization issue and the 
> kerberos auth issues.
> The avro classes that are being deserialized were generated with avro 1.8.2.
> {code:java}
> 2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task 
> thread="Calculate Estimated NAV -> Split into single messages (3/10)" 
> Calculate Estimated NAV -> Split into single messages (3/10) (de7d8fa77
> 84903a475391d0168d56f2e) switched from RUNNING to FAILED.
> java.io.EOFException: null
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219)
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138)
> at 
> org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70)
> at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at 
> 

[jira] [Created] (FLINK-10682) EOFException occurs during deserialization of Avro class

2018-10-25 Thread Ben La Monica (JIRA)
Ben La Monica created FLINK-10682:
-

 Summary: EOFException occurs during deserialization of Avro class
 Key: FLINK-10682
 URL: https://issues.apache.org/jira/browse/FLINK-10682
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 1.5.4
 Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4)
3 task managers, 1 job manager running in YARN in Hadoop
Running on Amazon Linux with OpenJDK 1.8
Reporter: Ben La Monica


I'm having trouble (which usually occurs after an hour of processing in a 
StreamExecutionEnvironment) where I get this failure message. I'm at a loss for 
what is causing it. I'm running this in AWS on EMR 5.17 with 3 task managers 
and a job manager running in a YARN cluster and I've upgraded my flink 
libraries to 1.5.4 to bypass another serialization issue and the kerberos auth 
issues.

The avro classes that are being deserialized were generated with avro 1.8.2.
{code:java}
2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task 
thread="Calculate Estimated NAV -> Split into single messages (3/10)" Calculate 
Estimated NAV -> Split into single messages (3/10) (de7d8fa77
84903a475391d0168d56f2e) switched from RUNNING to FAILED.
java.io.EOFException: null
at 
org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219)
at 
org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138)
at 
org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70)
at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at 
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at 
org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:172)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:208)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:208)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748){code}
Do you have any ideas on how I could further troubleshoot this issue?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab

2018-09-03 Thread Ben La Monica (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben La Monica closed FLINK-10278.
-
Resolution: Fixed

It appears that this was fixed as part of FLINK-10013. Closing.

> Flink in YARN cluster uses wrong path when looking for Kerberos Keytab
> --
>
> Key: FLINK-10278
> URL: https://issues.apache.org/jira/browse/FLINK-10278
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.2
>Reporter: Ben La Monica
>Priority: Major
> Fix For: 1.5.3
>
>
> While trying to run Flink in a yarn cluster with more than 1 physical 
> computer in the cluster, the first task manager will start fine, but the 
> second task manager fails to start because it is looking for the kerberos 
> keytab in the location that is on the *FIRST* taskmanager. See below log 
> lines (unrelated lines removed for clarity):
> {code:java}
> 2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main Current working/local Directory: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005
> 2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main 
> Setting directories for temporary files to: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005
> 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main keytab path: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab
> 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main YARN daemon is running as: hadoop Yarn client user obtainer: 
> hadoop
> 2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main YARN TaskManager initialization failed.
> org.apache.flink.configuration.IllegalConfigurationException: Kerberos login 
> configuration is invalid; keytab 
> '/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab'
>  does not exist
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71)
> at 
> org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120)
> at 
> org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code}
>  
> You'll notice that the log statement says that the keytab should be located 
> in container 000319:
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab
> But after I changed the code so that it would show the file that it's 
> actually checking when doing the SecurityConfiguration init it is actually 
> checking container 01, which is not on the host:
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab
> This causes the YARN task managers to restart over and over again (which is 
> why we're up to container 319!)
> I'll submit a PR for this fix, though basically it's just moving the 
> initialization of the SecurityConfiguration down 2 lines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab

2018-09-03 Thread Ben La Monica (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben La Monica updated FLINK-10278:
--
Fix Version/s: 1.5.3

> Flink in YARN cluster uses wrong path when looking for Kerberos Keytab
> --
>
> Key: FLINK-10278
> URL: https://issues.apache.org/jira/browse/FLINK-10278
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.2
>Reporter: Ben La Monica
>Priority: Major
> Fix For: 1.5.3
>
>
> While trying to run Flink in a yarn cluster with more than 1 physical 
> computer in the cluster, the first task manager will start fine, but the 
> second task manager fails to start because it is looking for the kerberos 
> keytab in the location that is on the *FIRST* taskmanager. See below log 
> lines (unrelated lines removed for clarity):
> {code:java}
> 2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main Current working/local Directory: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005
> 2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main 
> Setting directories for temporary files to: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005
> 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main keytab path: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab
> 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main YARN daemon is running as: hadoop Yarn client user obtainer: 
> hadoop
> 2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main YARN TaskManager initialization failed.
> org.apache.flink.configuration.IllegalConfigurationException: Kerberos login 
> configuration is invalid; keytab 
> '/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab'
>  does not exist
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71)
> at 
> org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120)
> at 
> org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code}
>  
> You'll notice that the log statement says that the keytab should be located 
> in container 000319:
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab
> But after I changed the code so that it would show the file that it's 
> actually checking when doing the SecurityConfiguration init it is actually 
> checking container 01, which is not on the host:
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab
> This causes the YARN task managers to restart over and over again (which is 
> why we're up to container 319!)
> I'll submit a PR for this fix, though basically it's just moving the 
> initialization of the SecurityConfiguration down 2 lines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab

2018-09-03 Thread Ben La Monica (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602387#comment-16602387
 ] 

Ben La Monica commented on FLINK-10278:
---

It appears to already be fixed in 1.5.3.

> Flink in YARN cluster uses wrong path when looking for Kerberos Keytab
> --
>
> Key: FLINK-10278
> URL: https://issues.apache.org/jira/browse/FLINK-10278
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.2
>Reporter: Ben La Monica
>Priority: Major
>
> While trying to run Flink in a yarn cluster with more than 1 physical 
> computer in the cluster, the first task manager will start fine, but the 
> second task manager fails to start because it is looking for the kerberos 
> keytab in the location that is on the *FIRST* taskmanager. See below log 
> lines (unrelated lines removed for clarity):
> {code:java}
> 2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main Current working/local Directory: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005
> 2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main 
> Setting directories for temporary files to: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005
> 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main keytab path: 
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab
> 2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main YARN daemon is running as: hadoop Yarn client user obtainer: 
> hadoop
> 2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner 
> thread=main YARN TaskManager initialization failed.
> org.apache.flink.configuration.IllegalConfigurationException: Kerberos login 
> configuration is invalid; keytab 
> '/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab'
>  does not exist
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90)
> at 
> org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71)
> at 
> org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120)
> at 
> org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code}
>  
> You'll notice that the log statement says that the keytab should be located 
> in container 000319:
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab
> But after I changed the code so that it would show the file that it's 
> actually checking when doing the SecurityConfiguration init it is actually 
> checking container 01, which is not on the host:
> /mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab
> This causes the YARN task managers to restart over and over again (which is 
> why we're up to container 319!)
> I'll submit a PR for this fix, though basically it's just moving the 
> initialization of the SecurityConfiguration down 2 lines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10278) Flink in YARN cluster uses wrong path when looking for Kerberos Keytab

2018-09-03 Thread Ben La Monica (JIRA)
Ben La Monica created FLINK-10278:
-

 Summary: Flink in YARN cluster uses wrong path when looking for 
Kerberos Keytab
 Key: FLINK-10278
 URL: https://issues.apache.org/jira/browse/FLINK-10278
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.2
Reporter: Ben La Monica


While trying to run Flink in a yarn cluster with more than 1 physical computer 
in the cluster, the first task manager will start fine, but the second task 
manager fails to start because it is looking for the kerberos keytab in the 
location that is on the *FIRST* taskmanager. See below log lines (unrelated 
lines removed for clarity):
{code:java}
2018-09-01 23:00:34,322 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
thread=main Current working/local Directory: 
/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005
2018-09-01 23:00:34,339 INFO class=o.a.f.r.c.BootstrapTools thread=main Setting 
directories for temporary files to: 
/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005

2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
thread=main keytab path: 
/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_000319/krb5.keytab

2018-09-01 23:00:34,339 INFO class=o.a.f.yarn.YarnTaskExecutorRunner 
thread=main YARN daemon is running as: hadoop Yarn client user obtainer: hadoop

2018-09-01 23:00:34,343 ERROR class=o.a.f.yarn.YarnTaskExecutorRunner 
thread=main YARN TaskManager initialization failed.
org.apache.flink.configuration.IllegalConfigurationException: Kerberos login 
configuration is invalid; keytab 
'/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_01/krb5.keytab'
 does not exist
at 
org.apache.flink.runtime.security.SecurityConfiguration.validate(SecurityConfiguration.java:139)
at 
org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:90)
at 
org.apache.flink.runtime.security.SecurityConfiguration.(SecurityConfiguration.java:71)
at 
org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:120)
at 
org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:73){code}
 

You'll notice that the log statement says that the keytab should be located in 
container 000319:
/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#14892c}*000319*{color}/krb5.keytab

But after I changed the code so that it would show the file that it's actually 
checking when doing the SecurityConfiguration init it is actually checking 
container 01, which is not on the host:

/mnt/yarn/usercache/hadoop/appcache/application_1535833786616_0005/container_1535833786616_0005_01_{color:#d04437}*01*{color}/krb5.keytab

This causes the YARN task managers to restart over and over again (which is why 
we're up to container 319!)

I'll submit a PR for this fix, though basically it's just moving the 
initialization of the SecurityConfiguration down 2 lines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-23 Thread Ben La Monica (JIRA)
Ben La Monica created FLINK-10204:
-

 Summary: Job is marked as FAILED after serialization exception
 Key: FLINK-10204
 URL: https://issues.apache.org/jira/browse/FLINK-10204
 Project: Flink
  Issue Type: Bug
Reporter: Ben La Monica


We have a long running flink job that eventually fails and is shut down due to 
an internal serialization exception that we keep on getting. Here is the stack 
trace:
{code:java}
2018-08-23 18:39:48,199 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
(4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
java.io.IOException: Corrupt stream, found tag: 127
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748){code}
 

I think I have tracked down the issue to a mismatch in the 
serialization/deserialization/copy code in the StreamElementSerializer with 
regards to the LATENCY_MARKER.

The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
(and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
EOFException, and fixing the copy code causes the test to pass again.

I've written a unit test that highlights the problem, and have written the code 
to correct it.

I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)