@Shannon @Gordon - is there some shading logic involved in the
dependencies, concerning the AWS libraries?


On Tue, Mar 7, 2017 at 5:50 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> I just had a quick look on this, but the Kafka fetcher thread’s context
> classloader doesn’t seem to be the issue (at least for 1.1.4).
>
> In Flink 1.1.4, a separate thread from the task thread is created to run
> the fetcher, but since the task thread sets the user code classloader as
> its context classloader, shouldn’t any threads created from it (i.e., the
> fetcher thread) use it also?
>
> A quickly checked the context classloader the Kafka09Fetcher thread in
> 1.1.4 was using, and it’s `FlinkUserCodeClassLoader`.
>
>
> On March 7, 2017 at 7:32:35 PM, Stephan Ewen (se...@apache.org) wrote:
>
> Ah, I see...
>
> The issue is that the Kafka fetcher thread apparently do not have the
> user-code class loader set as the context class loader. Kryo relies on that
> for class resolution.
>
> What Flink version are you on? I think that actual processing and
> forwarding does not happen in the Kafka Fetchers any more as of 1.2, so
> only Flink 1.1 should be affected...
>
>
> On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <sca...@expedia.com> wrote:
>
>> I think my previous guess was wrong. From what I can tell, when Kryo
>> tries to copy the exception object, it does that by serializing and
>> deserializing it. For subclasses of RuntimeException, it doesn't know how
>> to do it so it delegates serialization to Java. However, it doesn't use a
>> custom ObjectInputStream to override resolveClass() and provide classes
>> from the user code classloader… such as happens in RocksDBStateBackend's
>> use of InstantiationUtil.deserializeObject(). Instead, it uses
>> ObjectInputStream$latestUserDefinedLoader() which is the
>> Launcher$AppClassLoader which definitely doesn't have the user code in it.
>>
>> Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being
>> configured?
>>
>> Thanks,
>> Shannon
>>
>>
>> From: Shannon Carey <sca...@expedia.com>
>> Date: Monday, March 6, 2017 at 7:09 PM
>> To: "user@flink.apache.org" <user@flink.apache.org>
>> Subject: Re: AWS exception serialization problem
>>
>> This happened when running Flink with bin/run-local.sh I notice that
>> there only appears to be one Java process. The job manager and the task
>> manager run in the same JVM, right? I notice, however, that there are two
>> blob store folders on disk. Could the problem be caused by two different
>> FlinkUserCodeClassLoader objects pointing to the two different JARs?
>>
>>
>> From: Shannon Carey <sca...@expedia.com>
>> Date: Monday, March 6, 2017 at 6:39 PM
>> To: "user@flink.apache.org" <user@flink.apache.org>
>> Subject: AWS exception serialization problem
>>
>> Has anyone encountered this or know what might be causing it?
>>
>>
>> java.lang.RuntimeException: Could not forward element to next operator
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
>>         at 
>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
>>         at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
>>         at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
>> deserialization.
>>         at 
>> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
>>         at 
>> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
>>         at 
>> org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
>>         ... 7 more
>> Caused by: java.lang.ClassNotFoundException: 
>> com.amazonaws.services.s3.model.AmazonS3Exception
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>         at java.lang.Class.forName0(Native Method)
>>         at java.lang.Class.forName(Class.java:348)
>>         at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
>>         at 
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
>>         at 
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>>         at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
>>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>>         at 
>> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
>>         at java.lang.Throwable.readObject(Throwable.java:914)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>         at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
>>         at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>>         at 
>> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
>>         at java.lang.Throwable.readObject(Throwable.java:914)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>         at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
>>         at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>>         at 
>> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
>>         at java.lang.Throwable.readObject(Throwable.java:914)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>         at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
>>         at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>         at 
>> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
>>         ... 12 more
>>
>>
>

Reply via email to