Hi,

I just had a quick look on this, but the Kafka fetcher thread’s context 
classloader doesn’t seem to be the issue (at least for 1.1.4).

In Flink 1.1.4, a separate thread from the task thread is created to run the 
fetcher, but since the task thread sets the user code classloader as its 
context classloader, shouldn’t any threads created from it (i.e., the fetcher 
thread) use it also?

A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 
was using, and it’s `FlinkUserCodeClassLoader`.


On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote:

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code 
class loader set as the context class loader. Kryo relies on that for class 
resolution.

What Flink version are you on? I think that actual processing and forwarding 
does not happen in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 
should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to 
copy the exception object, it does that by serializing and deserializing it. 
For subclasses of RuntimeException, it doesn't know how to do it so it 
delegates serialization to Java. However, it doesn't use a custom 
ObjectInputStream to override resolveClass() and provide classes from the user 
code classloader… such as happens in RocksDBStateBackend's use of 
InstantiationUtil.deserializeObject(). Instead, it uses 
ObjectInputStream$latestUserDefinedLoader() which is the 
Launcher$AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being 
configured?

Thanks,
Shannon


From: Shannon Carey <sca...@expedia.com>
Date: Monday, March 6, 2017 at 7:09 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only 
appears to be one Java process. The job manager and the task manager run in the 
same JVM, right? I notice, however, that there are two blob store folders on 
disk. Could the problem be caused by two different FlinkUserCodeClassLoader 
objects pointing to the two different JARs?


From: Shannon Carey <sca...@expedia.com>
Date: Monday, March 6, 2017 at 6:39 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
        at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
deserialization.
        at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
        at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
        at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
        ... 7 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at 
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
        ... 12 more

Reply via email to