Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the
user-code class loader set as the context class loader. Kryo relies on that
for class resolution.

What Flink version are you on? I think that actual processing and
forwarding does not happen in the Kafka Fetchers any more as of 1.2, so
only Flink 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote:

> I think my previous guess was wrong. From what I can tell, when Kryo tries
> to copy the exception object, it does that by serializing and deserializing
> it. For subclasses of RuntimeException, it doesn't know how to do it so it
> delegates serialization to Java. However, it doesn't use a
> custom ObjectInputStream to override resolveClass() and provide classes
> from the user code classloader… such as happens in RocksDBStateBackend's
> use of InstantiationUtil.deserializeObject(). Instead, it uses
> ObjectInputStream$latestUserDefinedLoader() which is the
> Launcher$AppClassLoader which definitely doesn't have the user code in it.
>
> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being
> configured?
>
> Thanks,
> Shannon
>
>
> From: Shannon Carey <sca...@expedia.com>
> Date: Monday, March 6, 2017 at 7:09 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: AWS exception serialization problem
>
> This happened when running Flink with bin/run-local.sh I notice that there
> only appears to be one Java process. The job manager and the task manager
> run in the same JVM, right? I notice, however, that there are two blob
> store folders on disk. Could the problem be caused by two different
> FlinkUserCodeClassLoader objects pointing to the two different JARs?
>
>
> From: Shannon Carey <sca...@expedia.com>
> Date: Monday, March 6, 2017 at 6:39 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: AWS exception serialization problem
>
> Has anyone encountered this or know what might be causing it?
>
>
> java.lang.RuntimeException: Could not forward element to next operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
> deserialization.
>       at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
>       at 
> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
>       at 
> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
>       ... 7 more
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.services.s3.model.AmazonS3Exception
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>       at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
>       at java.lang.Throwable.readObject(Throwable.java:914)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>       at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
>       at java.lang.Throwable.readObject(Throwable.java:914)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>       at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
>       at java.lang.Throwable.readObject(Throwable.java:914)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>       at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
>       ... 12 more
>
>

Reply via email to