Re: AWS exception serialization problem

2017-03-11 Thread Robert Metzger
Thank you for analyzing the problem Gordon!

We can not upgrade Kryo anytime soon because state in old savepoints is
still serialized with the current Kryo version.
I would propose to add our own JavaThrowableSerializer to Flink and
document how users can register that serializer if they run into the error.

Shannon and Bruno can just use the serializer in their current Flink
version.


On Sat, Mar 11, 2017 at 12:00 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> FYI: Here’s the JIRA ticket to track this issue -
> https://issues.apache.org/jira/browse/FLINK-6025.
>
>
> On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org)
> wrote:
>
> Hi Shannon,
>
> Thanks a lot for providing the example, it was very helpful in reproducing
> the problem.
>
> I think this is actually a Kryo bug, that was just recently fixed:
> https://github.com/EsotericSoftware/kryo/pull/483
> <https://github.com/EsotericSoftware/kryo/commit/19a6b5e>
> It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be
> released yet.
>
> The problem is that when Kryo defaults to Java serialization for the
> exception instance, the `ObjectInputStream` used to read the object does
> not correctly use Kryo’s configured class loader (i.e., the user code class
> loader). That’s why it's complaining that the class cannot be found.
>
> We can “workaround” this by registering our own `JavaSerializer` as the
> serializer for Throwables in Kryo, but I’m not sure if we should actually
> do this, or just wait for the Kryo fix to be released.
>
> - Gordon
>
>
> On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote:
>
> Here ya go (see attached).
>
>
> From: Robert Metzger <rmetz...@apache.org>
> Date: Friday, March 10, 2017 at 1:18 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: AWS exception serialization problem
>
> Can one of you guys provide us with a minimal example to reproduce the
> issue? (Ideally locally, not using EMR?)
> I think once we can reproduce the issue its easy to fix.
>
> On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com>
> wrote:
>
>> Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)
>>
>> On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote:
>>
>>> @Bruno: How are you running Flink? On yarn, standalone, mesos, docker?
>>>
>>> On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> We have seen something similar in Flink 1.2. We have an operation that
>>> parses some JSON, and when it fails to parse it, we can see the
>>> ClassNotFoundException for the relevant exception (in our case
>>> JsResultException from the play-json library). The library is indeed in the
>>> shaded JAR, otherwise we would not be able to parse the JSON.
>>>
>>> Cheers,
>>>
>>> Bruno
>>>
>>> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>>> wrote:
>>>
>>> Hi Shannon,
>>>
>>> Just to clarify:
>>>
>>> From the error trace, it seems like that the messages fetched from Kafka
>>> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
>>> `AmazonS3Exception` as records from FlinkKafkaConsumer?
>>> Is this correct? If so, I think we should just make sure that the
>>> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
>>> user fat jar.
>>>
>>> Also, what is the Flink version you are using?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>


Re: AWS exception serialization problem

2017-03-11 Thread Tzu-Li (Gordon) Tai
FYI: Here’s the JIRA ticket to track this issue - 
https://issues.apache.org/jira/browse/FLINK-6025.


On March 11, 2017 at 6:27:36 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi Shannon,

Thanks a lot for providing the example, it was very helpful in reproducing the 
problem.

I think this is actually a Kryo bug, that was just recently fixed: 
https://github.com/EsotericSoftware/kryo/pull/483
It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be 
released yet.

The problem is that when Kryo defaults to Java serialization for the exception 
instance, the `ObjectInputStream` used to read the object does not correctly 
use Kryo’s configured class loader (i.e., the user code class loader). That’s 
why it's complaining that the class cannot be found.

We can “workaround” this by registering our own `JavaSerializer` as the 
serializer for Throwables in Kryo, but I’m not sure if we should actually do 
this, or just wait for the Kryo fix to be released.

- Gordon


On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote:

Here ya go (see attached).


From: Robert Metzger <rmetz...@apache.org>
Date: Friday, March 10, 2017 at 1:18 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? 
(Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses 
some JSON, and when it fails to parse it, we can see the ClassNotFoundException 
for the relevant exception (in our case JsResultException from the play-json 
library). The library is indeed in the shaded JAR, otherwise we would not be 
able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are 
serialized `AmazonS3Exception`s, and you’re emitting a stream of 
`AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the 
`com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user 
fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon




Re: AWS exception serialization problem

2017-03-11 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Thanks a lot for providing the example, it was very helpful in reproducing the 
problem.

I think this is actually a Kryo bug, that was just recently fixed: 
https://github.com/EsotericSoftware/kryo/pull/483
It will be available in Kryo 4.0.1, which unfortunately doesn’t seem to be 
released yet.

The problem is that when Kryo defaults to Java serialization for the exception 
instance, the `ObjectInputStream` used to read the object does not correctly 
use Kryo’s configured class loader (i.e., the user code class loader). That’s 
why it's complaining that the class cannot be found.

We can “workaround” this by registering our own `JavaSerializer` as the 
serializer for Throwables in Kryo, but I’m not sure if we should actually do 
this, or just wait for the Kryo fix to be released.

- Gordon


On March 11, 2017 at 9:54:03 AM, Shannon Carey (sca...@expedia.com) wrote:

Here ya go (see attached).


From: Robert Metzger <rmetz...@apache.org>
Date: Friday, March 10, 2017 at 1:18 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: AWS exception serialization problem

Can one of you guys provide us with a minimal example to reproduce the issue? 
(Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)


On Wed, 8 Mar 2017, 21:41 Stephan Ewen, <se...@apache.org> wrote:
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda <brunoara...@gmail.com> wrote:
Hi,

We have seen something similar in Flink 1.2. We have an operation that parses 
some JSON, and when it fails to parse it, we can see the ClassNotFoundException 
for the relevant exception (in our case JsResultException from the play-json 
library). The library is indeed in the shaded JAR, otherwise we would not be 
able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are 
serialized `AmazonS3Exception`s, and you’re emitting a stream of 
`AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the 
`com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user 
fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon




Re: AWS exception serialization problem

2017-03-10 Thread Robert Metzger
Can one of you guys provide us with a minimal example to reproduce the
issue? (Ideally locally, not using EMR?)
I think once we can reproduce the issue its easy to fix.

On Thu, Mar 9, 2017 at 1:24 AM, Bruno Aranda  wrote:

> Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)
>
> On Wed, 8 Mar 2017, 21:41 Stephan Ewen,  wrote:
>
>> @Bruno: How are you running Flink? On yarn, standalone, mesos, docker?
>>
>> On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda 
>> wrote:
>>
>> Hi,
>>
>> We have seen something similar in Flink 1.2. We have an operation that
>> parses some JSON, and when it fails to parse it, we can see the
>> ClassNotFoundException for the relevant exception (in our case
>> JsResultException from the play-json library). The library is indeed in the
>> shaded JAR, otherwise we would not be able to parse the JSON.
>>
>> Cheers,
>>
>> Bruno
>>
>> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> Hi Shannon,
>>
>> Just to clarify:
>>
>> From the error trace, it seems like that the messages fetched from Kafka
>> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
>> `AmazonS3Exception` as records from FlinkKafkaConsumer?
>> Is this correct? If so, I think we should just make sure that the
>> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
>> user fat jar.
>>
>> Also, what is the Flink version you are using?
>>
>> Cheers,
>> Gordon
>>
>>
>>


Re: AWS exception serialization problem

2017-03-08 Thread Bruno Aranda
Hi Stephan, we are running Flink 1.2.0 on Yarn (AWS EMR cluster)

On Wed, 8 Mar 2017, 21:41 Stephan Ewen,  wrote:

> @Bruno: How are you running Flink? On yarn, standalone, mesos, docker?
>
> On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda 
> wrote:
>
> Hi,
>
> We have seen something similar in Flink 1.2. We have an operation that
> parses some JSON, and when it fails to parse it, we can see the
> ClassNotFoundException for the relevant exception (in our case
> JsResultException from the play-json library). The library is indeed in the
> shaded JAR, otherwise we would not be able to parse the JSON.
>
> Cheers,
>
> Bruno
>
> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi Shannon,
>
> Just to clarify:
>
> From the error trace, it seems like that the messages fetched from Kafka
> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
> `AmazonS3Exception` as records from FlinkKafkaConsumer?
> Is this correct? If so, I think we should just make sure that the
> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
> user fat jar.
>
> Also, what is the Flink version you are using?
>
> Cheers,
> Gordon
>
>
>


Re: AWS exception serialization problem

2017-03-08 Thread Stephan Ewen
@Bruno: How are you running Flink? On yarn, standalone, mesos, docker?

On Wed, Mar 8, 2017 at 2:13 PM, Bruno Aranda  wrote:

> Hi,
>
> We have seen something similar in Flink 1.2. We have an operation that
> parses some JSON, and when it fails to parse it, we can see the
> ClassNotFoundException for the relevant exception (in our case
> JsResultException from the play-json library). The library is indeed in the
> shaded JAR, otherwise we would not be able to parse the JSON.
>
> Cheers,
>
> Bruno
>
> On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Shannon,
>>
>> Just to clarify:
>>
>> From the error trace, it seems like that the messages fetched from Kafka
>> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
>> `AmazonS3Exception` as records from FlinkKafkaConsumer?
>> Is this correct? If so, I think we should just make sure that the
>> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
>> user fat jar.
>>
>> Also, what is the Flink version you are using?
>>
>> Cheers,
>> Gordon
>>
>


Re: AWS exception serialization problem

2017-03-08 Thread Bruno Aranda
Hi,

We have seen something similar in Flink 1.2. We have an operation that
parses some JSON, and when it fails to parse it, we can see the
ClassNotFoundException for the relevant exception (in our case
JsResultException from the play-json library). The library is indeed in the
shaded JAR, otherwise we would not be able to parse the JSON.

Cheers,

Bruno

On Wed, 8 Mar 2017 at 12:57 Tzu-Li (Gordon) Tai  wrote:

> Hi Shannon,
>
> Just to clarify:
>
> From the error trace, it seems like that the messages fetched from Kafka
> are serialized `AmazonS3Exception`s, and you’re emitting a stream of
> `AmazonS3Exception` as records from FlinkKafkaConsumer?
> Is this correct? If so, I think we should just make sure that the
> `com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the
> user fat jar.
>
> Also, what is the Flink version you are using?
>
> Cheers,
> Gordon
>


Re: AWS exception serialization problem

2017-03-08 Thread Tzu-Li (Gordon) Tai
Hi Shannon,

Just to clarify:

From the error trace, it seems like that the messages fetched from Kafka are 
serialized `AmazonS3Exception`s, and you’re emitting a stream of 
`AmazonS3Exception` as records from FlinkKafkaConsumer?
Is this correct? If so, I think we should just make sure that the 
`com.amazonaws.services.s3.model.AmazonS3Exception` class exists in the user 
fat jar.

Also, what is the Flink version you are using?

Cheers,
Gordon

Re: AWS exception serialization problem

2017-03-07 Thread Shannon Carey
> is there some shading logic involved in the dependencies, concerning the AWS 
> libraries?

Not that I am aware of. The AWS code is included in the job's fat jar as-is.



Re: AWS exception serialization problem

2017-03-07 Thread Stephan Ewen
@Shannon @Gordon - is there some shading logic involved in the
dependencies, concerning the AWS libraries?


On Tue, Mar 7, 2017 at 5:50 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> I just had a quick look on this, but the Kafka fetcher thread’s context
> classloader doesn’t seem to be the issue (at least for 1.1.4).
>
> In Flink 1.1.4, a separate thread from the task thread is created to run
> the fetcher, but since the task thread sets the user code classloader as
> its context classloader, shouldn’t any threads created from it (i.e., the
> fetcher thread) use it also?
>
> A quickly checked the context classloader the Kafka09Fetcher thread in
> 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`.
>
>
> On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote:
>
> Ah, I see...
>
> The issue is that the Kafka fetcher thread apparently do not have the
> user-code class loader set as the context class loader. Kryo relies on that
> for class resolution.
>
> What Flink version are you on? I think that actual processing and
> forwarding does not happen in the Kafka Fetchers any more as of 1.2, so
> only Flink 1.1 should be affected...
>
>
> On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote:
>
>> I think my previous guess was wrong. From what I can tell, when Kryo
>> tries to copy the exception object, it does that by serializing and
>> deserializing it. For subclasses of RuntimeException, it doesn't know how
>> to do it so it delegates serialization to Java. However, it doesn't use a
>> custom ObjectInputStream to override resolveClass() and provide classes
>> from the user code classloader… such as happens in RocksDBStateBackend's
>> use of InstantiationUtil.deserializeObject(). Instead, it uses
>> ObjectInputStream$latestUserDefinedLoader() which is the
>> Launcher$AppClassLoader which definitely doesn't have the user code in it.
>>
>> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being
>> configured?
>>
>> Thanks,
>> Shannon
>>
>>
>> From: Shannon Carey <sca...@expedia.com>
>> Date: Monday, March 6, 2017 at 7:09 PM
>> To: "user@flink.apache.org" <user@flink.apache.org>
>> Subject: Re: AWS exception serialization problem
>>
>> This happened when running Flink with bin/run-local.sh I notice that
>> there only appears to be one Java process. The job manager and the task
>> manager run in the same JVM, right? I notice, however, that there are two
>> blob store folders on disk. Could the problem be caused by two different
>> FlinkUserCodeClassLoader objects pointing to the two different JARs?
>>
>>
>> From: Shannon Carey <sca...@expedia.com>
>> Date: Monday, March 6, 2017 at 6:39 PM
>> To: "user@flink.apache.org" <user@flink.apache.org>
>> Subject: AWS exception serialization problem
>>
>> Has anyone encountered this or know what might be causing it?
>>
>>
>> java.lang.RuntimeException: Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
>> deserialization.
>> at 
>> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
>> at 
>> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
>> at 
>> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySeriali

Re: AWS exception serialization problem

2017-03-07 Thread Tzu-Li (Gordon) Tai
Hi,

I just had a quick look on this, but the Kafka fetcher thread’s context 
classloader doesn’t seem to be the issue (at least for 1.1.4).

In Flink 1.1.4, a separate thread from the task thread is created to run the 
fetcher, but since the task thread sets the user code classloader as its 
context classloader, shouldn’t any threads created from it (i.e., the fetcher 
thread) use it also?

A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 
was using, and it’s `FlinkUserCodeClassLoader`.


On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote:

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code 
class loader set as the context class loader. Kryo relies on that for class 
resolution.

What Flink version are you on? I think that actual processing and forwarding 
does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 
should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to 
copy the exception object, it does that by serializing and deserializing it. 
For subclasses of RuntimeException, it doesn't know how to do it so it 
delegates serialization to Java. However, it doesn't use a custom 
ObjectInputStream to override resolveClass() and provide classes from the user 
code classloader… such as happens in RocksDBStateBackend's use of 
InstantiationUtil.deserializeObject(). Instead, it uses 
ObjectInputStream$latestUserDefinedLoader() which is the 
Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being 
configured?

Thanks,
Shannon


From: Shannon Carey <sca...@expedia.com>
Date: Monday, March 6, 2017 at 7:09 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only 
appears to be one Java process. The job manager and the task manager run in the 
same JVM, right? I notice, however, that there are two blob store folders on 
disk. Could the problem be caused by two different FlinkUserCodeClassLoader 
objects pointing to the two different JARs?


From: Shannon Carey <sca...@expedia.com>
Date: Monday, March 6, 2017 at 6:39 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
  

Re: AWS exception serialization problem

2017-03-07 Thread Stephan Ewen
Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the
user-code class loader set as the context class loader. Kryo relies on that
for class resolution.

What Flink version are you on? I think that actual processing and
forwarding does not happen in the Kafka Fetchers any more as of 1.2, so
only Flink 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote:

> I think my previous guess was wrong. From what I can tell, when Kryo tries
> to copy the exception object, it does that by serializing and deserializing
> it. For subclasses of RuntimeException, it doesn't know how to do it so it
> delegates serialization to Java. However, it doesn't use a
> custom ObjectInputStream to override resolveClass() and provide classes
> from the user code classloader… such as happens in RocksDBStateBackend's
> use of InstantiationUtil.deserializeObject(). Instead, it uses
> ObjectInputStream$latestUserDefinedLoader() which is the
> Launcher$AppClassLoader which definitely doesn't have the user code in it.
>
> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being
> configured?
>
> Thanks,
> Shannon
>
>
> From: Shannon Carey <sca...@expedia.com>
> Date: Monday, March 6, 2017 at 7:09 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: AWS exception serialization problem
>
> This happened when running Flink with bin/run-local.sh I notice that there
> only appears to be one Java process. The job manager and the task manager
> run in the same JVM, right? I notice, however, that there are two blob
> store folders on disk. Could the problem be caused by two different
> FlinkUserCodeClassLoader objects pointing to the two different JARs?
>
>
> From: Shannon Carey <sca...@expedia.com>
> Date: Monday, March 6, 2017 at 6:39 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: AWS exception serialization problem
>
> Has anyone encountered this or know what might be causing it?
>
>
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
> deserialization.
>   at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
>   at 
> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
>   at 
> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
>   ... 7 more
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.services.s3.model.AmazonS3Exception
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>   

Re: AWS exception serialization problem

2017-03-06 Thread Shannon Carey
This happened when running Flink with bin/run-local.sh I notice that there only 
appears to be one Java process. The job manager and the task manager run in the 
same JVM, right? I notice, however, that there are two blob store folders on 
disk. Could the problem be caused by two different FlinkUserCodeClassLoader 
objects pointing to the two different JARs?


From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Monday, March 6, 2017 at 6:39 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?



java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream

AWS exception serialization problem

2017-03-06 Thread Shannon Carey
Has anyone encountered this or know what might be causing it?



java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at