Re: AWS exception serialization problem
Thank you for analyzing the problem Gordon! We can not upgrade Kryo anytime soon because state in old savepoints is still serialized with the current Kryo version. I would propose to add our own JavaThrowableSerializer to Flink and document how users can register that serializer if they run into the error. Shannon and Bruno can just use the serializer in their current Flink version. On Sat, Mar 11, 2017 at 12:00 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > FYI: Here’s the JIRA ticket to track this issue - > https://issues.apache.org/jira/browse/FLINK-6025. > > > On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) > wrote: > > Hi Shannon, > > Thanks a lot for providing the example, it was very helpful in reproducing > the problem. > > I think this is actually a Kryo bug, that was just recently fixed: > https://github.com/EsotericSoftware/kryo/pull/483 > <https://github.com/EsotericSoftware/kryo/commit/19a6b5e> > It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be > released yet. > > The problem is that when Kryo defaults to Java serialization for the > exception instance, the `ObjectInputStream` used to read the object does > not correctly use Kryo’s configured class loader (i.e., the user code class > loader). That’s why it's complaining that the class cannot be found. > > We can “workaround” this by registering our own `JavaSerializer` as the > serializer for Throwables in Kryo, but I’m not sure if we should actually > do this, or just wait for the Kryo fix to be released. > > - Gordon > > > On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote: > > Here ya go (see attached). > > > From: Robert Metzger <rmetz...@apache.org> > Date: Friday, March 10, 2017 at 1:18 PM > To: "user@flink.apache.org" <user@flink.apache.org> > Subject: Re: AWS exception serialization problem > > Can one of you guys provide us with a minimal example to reproduce the > issue? (Ideally locally, not using EMR?) > I think once we can reproduce the issue its easy to fix. > > On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com> > wrote: > >> Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster) >> >> On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote: >> >>> @Bruno: How are you running Flink? On yarn, standalone, mesos, docker? >>> >>> On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> We have seen something similar in Flink 1.2. We have an operation that >>> parses some JSON, and when it fails to parse it, we can see the >>> ClassNotFoundException for the relevant exception (in our case >>> JsResultException from the play-json library). The library is indeed in the >>> shaded JAR, otherwise we would not be able to parse the JSON. >>> >>> Cheers, >>> >>> Bruno >>> >>> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> >>> wrote: >>> >>> Hi Shannon, >>> >>> Just to clarify: >>> >>> From the error trace, it seems like that the messages fetched from Kafka >>> are serialized `AmazonS3Exception`s, and you’re emitting a stream of >>> `AmazonS3Exception` as records from FlinkKafkaConsumer? >>> Is this correct? If so, I think we should just make sure that the >>> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the >>> user fat jar. >>> >>> Also, what is the Flink version you are using? >>> >>> Cheers, >>> Gordon >>> >>> >>> >
Re: AWS exception serialization problem
FYI: Here’s the JIRA ticket to track this issue - https://issues.apache.org/jira/browse/FLINK-6025. On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote: Hi Shannon, Thanks a lot for providing the example, it was very helpful in reproducing the problem. I think this is actually a Kryo bug, that was just recently fixed: https://github.com/EsotericSoftware/kryo/pull/483 It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be released yet. The problem is that when Kryo defaults to Java serialization for the exception instance, the `ObjectInputStream` used to read the object does not correctly use Kryo’s configured class loader (i.e., the user code class loader). That’s why it's complaining that the class cannot be found. We can “workaround” this by registering our own `JavaSerializer` as the serializer for Throwables in Kryo, but I’m not sure if we should actually do this, or just wait for the Kryo fix to be released. - Gordon On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote: Here ya go (see attached). From: Robert Metzger <rmetz...@apache.org> Date: Friday, March 10, 2017 at 1:18 PM To: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: AWS exception serialization problem Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?) I think once we can reproduce the issue its easy to fix. On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com> wrote: Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster) On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote: @Bruno: How are you running Flink? On yarn, standalone, mesos, docker? On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com> wrote: Hi, We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON. Cheers, Bruno On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: Hi Shannon, Just to clarify: From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer? Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar. Also, what is the Flink version you are using? Cheers, Gordon
Re: AWS exception serialization problem
Hi Shannon, Thanks a lot for providing the example, it was very helpful in reproducing the problem. I think this is actually a Kryo bug, that was just recently fixed: https://github.com/EsotericSoftware/kryo/pull/483 It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be released yet. The problem is that when Kryo defaults to Java serialization for the exception instance, the `ObjectInputStream` used to read the object does not correctly use Kryo’s configured class loader (i.e., the user code class loader). That’s why it's complaining that the class cannot be found. We can “workaround” this by registering our own `JavaSerializer` as the serializer for Throwables in Kryo, but I’m not sure if we should actually do this, or just wait for the Kryo fix to be released. - Gordon On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote: Here ya go (see attached). From: Robert Metzger <rmetz...@apache.org> Date: Friday, March 10, 2017 at 1:18 PM To: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: AWS exception serialization problem Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?) I think once we can reproduce the issue its easy to fix. On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com> wrote: Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster) On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote: @Bruno: How are you running Flink? On yarn, standalone, mesos, docker? On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com> wrote: Hi, We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON. Cheers, Bruno On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: Hi Shannon, Just to clarify: From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer? Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar. Also, what is the Flink version you are using? Cheers, Gordon
Re: AWS exception serialization problem
Can one of you guys provide us with a minimal example to reproduce the issue? (Ideally locally, not using EMR?) I think once we can reproduce the issue its easy to fix. On Thu, Mar 9, 2017 at 1:24 AM, Bruno Arandawrote: > Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster) > > On Wed, 8 Mar 2017, 21:41 Stephan Ewen, wrote: > >> @Bruno: How are you running Flink? On yarn, standalone, mesos, docker? >> >> On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda >> wrote: >> >> Hi, >> >> We have seen something similar in Flink 1.2. We have an operation that >> parses some JSON, and when it fails to parse it, we can see the >> ClassNotFoundException for the relevant exception (in our case >> JsResultException from the play-json library). The library is indeed in the >> shaded JAR, otherwise we would not be able to parse the JSON. >> >> Cheers, >> >> Bruno >> >> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai >> wrote: >> >> Hi Shannon, >> >> Just to clarify: >> >> From the error trace, it seems like that the messages fetched from Kafka >> are serialized `AmazonS3Exception`s, and you’re emitting a stream of >> `AmazonS3Exception` as records from FlinkKafkaConsumer? >> Is this correct? If so, I think we should just make sure that the >> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the >> user fat jar. >> >> Also, what is the Flink version you are using? >> >> Cheers, >> Gordon >> >> >>
Re: AWS exception serialization problem
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster) On Wed, 8 Mar 2017, 21:41 Stephan Ewen,wrote: > @Bruno: How are you running Flink? On yarn, standalone, mesos, docker? > > On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda > wrote: > > Hi, > > We have seen something similar in Flink 1.2. We have an operation that > parses some JSON, and when it fails to parse it, we can see the > ClassNotFoundException for the relevant exception (in our case > JsResultException from the play-json library). The library is indeed in the > shaded JAR, otherwise we would not be able to parse the JSON. > > Cheers, > > Bruno > > On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai > wrote: > > Hi Shannon, > > Just to clarify: > > From the error trace, it seems like that the messages fetched from Kafka > are serialized `AmazonS3Exception`s, and you’re emitting a stream of > `AmazonS3Exception` as records from FlinkKafkaConsumer? > Is this correct? If so, I think we should just make sure that the > `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the > user fat jar. > > Also, what is the Flink version you are using? > > Cheers, > Gordon > > >
Re: AWS exception serialization problem
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker? On Wed, Mar 8, 2017 at 2:13 PM, Bruno Arandawrote: > Hi, > > We have seen something similar in Flink 1.2. We have an operation that > parses some JSON, and when it fails to parse it, we can see the > ClassNotFoundException for the relevant exception (in our case > JsResultException from the play-json library). The library is indeed in the > shaded JAR, otherwise we would not be able to parse the JSON. > > Cheers, > > Bruno > > On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai > wrote: > >> Hi Shannon, >> >> Just to clarify: >> >> From the error trace, it seems like that the messages fetched from Kafka >> are serialized `AmazonS3Exception`s, and you’re emitting a stream of >> `AmazonS3Exception` as records from FlinkKafkaConsumer? >> Is this correct? If so, I think we should just make sure that the >> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the >> user fat jar. >> >> Also, what is the Flink version you are using? >> >> Cheers, >> Gordon >> >
Re: AWS exception serialization problem
Hi, We have seen something similar in Flink 1.2. We have an operation that parses some JSON, and when it fails to parse it, we can see the ClassNotFoundException for the relevant exception (in our case JsResultException from the play-json library). The library is indeed in the shaded JAR, otherwise we would not be able to parse the JSON. Cheers, Bruno On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Taiwrote: > Hi Shannon, > > Just to clarify: > > From the error trace, it seems like that the messages fetched from Kafka > are serialized `AmazonS3Exception`s, and you’re emitting a stream of > `AmazonS3Exception` as records from FlinkKafkaConsumer? > Is this correct? If so, I think we should just make sure that the > `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the > user fat jar. > > Also, what is the Flink version you are using? > > Cheers, > Gordon >
Re: AWS exception serialization problem
Hi Shannon, Just to clarify: From the error trace, it seems like that the messages fetched from Kafka are serialized `AmazonS3Exception`s, and you’re emitting a stream of `AmazonS3Exception` as records from FlinkKafkaConsumer? Is this correct? If so, I think we should just make sure that the `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user fat jar. Also, what is the Flink version you are using? Cheers, Gordon
Re: AWS exception serialization problem
> is there some shading logic involved in the dependencies, concerning the AWS > libraries? Not that I am aware of. The AWS code is included in the job's fat jar as-is.
Re: AWS exception serialization problem
@Shannon @Gordon - is there some shading logic involved in the dependencies, concerning the AWS libraries? On Tue, Mar 7, 2017 at 5:50 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi, > > I just had a quick look on this, but the Kafka fetcher thread’s context > classloader doesn’t seem to be the issue (at least for 1.1.4). > > In Flink 1.1.4, a separate thread from the task thread is created to run > the fetcher, but since the task thread sets the user code classloader as > its context classloader, shouldn’t any threads created from it (i.e., the > fetcher thread) use it also? > > A quickly checked the context classloader the Kafka09Fetcher thread in > 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`. > > > On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote: > > Ah, I see... > > The issue is that the Kafka fetcher thread apparently do not have the > user-code class loader set as the context class loader. Kryo relies on that > for class resolution. > > What Flink version are you on? I think that actual processing and > forwarding does not happen in the Kafka Fetchers any more as of 1.2, so > only Flink 1.1 should be affected... > > > On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote: > >> I think my previous guess was wrong. From what I can tell, when Kryo >> tries to copy the exception object, it does that by serializing and >> deserializing it. For subclasses of RuntimeException, it doesn't know how >> to do it so it delegates serialization to Java. However, it doesn't use a >> custom ObjectInputStream to override resolveClass() and provide classes >> from the user code classloader… such as happens in RocksDBStateBackend's >> use of InstantiationUtil.deserializeObject(). Instead, it uses >> ObjectInputStream$latestUserDefinedLoader() which is the >> Launcher$AppClassLoader which definitely doesn't have the user code in it. >> >> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being >> configured? >> >> Thanks, >> Shannon >> >> >> From: Shannon Carey <sca...@expedia.com> >> Date: Monday, March 6, 2017 at 7:09 PM >> To: "user@flink.apache.org" <user@flink.apache.org> >> Subject: Re: AWS exception serialization problem >> >> This happened when running Flink with bin/run-local.sh I notice that >> there only appears to be one Java process. The job manager and the task >> manager run in the same JVM, right? I notice, however, that there are two >> blob store folders on disk. Could the problem be caused by two different >> FlinkUserCodeClassLoader objects pointing to the two different JARs? >> >> >> From: Shannon Carey <sca...@expedia.com> >> Date: Monday, March 6, 2017 at 6:39 PM >> To: "user@flink.apache.org" <user@flink.apache.org> >> Subject: AWS exception serialization problem >> >> Has anyone encountered this or know what might be causing it? >> >> >> java.lang.RuntimeException: Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) >> at >> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java >> deserialization. >> at >> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) >> at >> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) >> at >> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySeriali
Re: AWS exception serialization problem
Hi, I just had a quick look on this, but the Kafka fetcher thread’s context classloader doesn’t seem to be the issue (at least for 1.1.4). In Flink 1.1.4, a separate thread from the task thread is created to run the fetcher, but since the task thread sets the user code classloader as its context classloader, shouldn’t any threads created from it (i.e., the fetcher thread) use it also? A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`. On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote: Ah, I see... The issue is that the Kafka fetcher thread apparently do not have the user-code class loader set as the context class loader. Kryo relies on that for class resolution. What Flink version are you on? I think that actual processing and forwarding does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 should be affected... On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote: I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. For subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream$latestUserDefinedLoader() which is the Launcher$AppClassLoader which definitely doesn't have the user code in it. Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured? Thanks, Shannon From: Shannon Carey <sca...@expedia.com> Date: Monday, March 6, 2017 at 7:09 PM To: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: AWS exception serialization problem This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs? From: Shannon Carey <sca...@expedia.com> Date: Monday, March 6, 2017 at 6:39 PM To: "user@flink.apache.org" <user@flink.apache.org> Subject: AWS exception serialization problem Has anyone encountered this or know what might be causing it? java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
Re: AWS exception serialization problem
Ah, I see... The issue is that the Kafka fetcher thread apparently do not have the user-code class loader set as the context class loader. Kryo relies on that for class resolution. What Flink version are you on? I think that actual processing and forwarding does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 should be affected... On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote: > I think my previous guess was wrong. From what I can tell, when Kryo tries > to copy the exception object, it does that by serializing and deserializing > it. For subclasses of RuntimeException, it doesn't know how to do it so it > delegates serialization to Java. However, it doesn't use a > custom ObjectInputStream to override resolveClass() and provide classes > from the user code classloader… such as happens in RocksDBStateBackend's > use of InstantiationUtil.deserializeObject(). Instead, it uses > ObjectInputStream$latestUserDefinedLoader() which is the > Launcher$AppClassLoader which definitely doesn't have the user code in it. > > Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being > configured? > > Thanks, > Shannon > > > From: Shannon Carey <sca...@expedia.com> > Date: Monday, March 6, 2017 at 7:09 PM > To: "user@flink.apache.org" <user@flink.apache.org> > Subject: Re: AWS exception serialization problem > > This happened when running Flink with bin/run-local.sh I notice that there > only appears to be one Java process. The job manager and the task manager > run in the same JVM, right? I notice, however, that there are two blob > store folders on disk. Could the problem be caused by two different > FlinkUserCodeClassLoader objects pointing to the two different JARs? > > > From: Shannon Carey <sca...@expedia.com> > Date: Monday, March 6, 2017 at 6:39 PM > To: "user@flink.apache.org" <user@flink.apache.org> > Subject: AWS exception serialization problem > > Has anyone encountered this or know what might be causing it? > > > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) > at > org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) > at java.lang.Thread.run(Thread.java:745) > Caused by: com.esotericsoftware.kryo.KryoException: Error during Java > deserialization. > at > com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) > at > org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) > at > org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) > ... 7 more > Caused by: java.lang.ClassNotFoundException: > com.amazonaws.services.s3.model.AmazonS3Exception > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) >
Re: AWS exception serialization problem
This happened when running Flink with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right? I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different FlinkUserCodeClassLoader objects pointing to the two different JARs? From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>> Date: Monday, March 6, 2017 at 6:39 PM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: AWS exception serialization problem Has anyone encountered this or know what might be causing it? java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream
AWS exception serialization problem
Has anyone encountered this or know what might be causing it? java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at