[ 
https://issues.apache.org/jira/browse/FLINK-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reassigned FLINK-2381:
----------------------------------

    Assignee: Ufuk Celebi

> Possible class not found Exception on failed partition producer
> ---------------------------------------------------------------
>
>                 Key: FLINK-2381
>                 URL: https://issues.apache.org/jira/browse/FLINK-2381
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>    Affects Versions: 0.9, master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10, 0.9.1
>
>
> Failing the production of a result partition marks the respective partition 
> as failed with a ProducerFailedException.
> The cause of this exception can be a user defined class, which can only be 
> loaded by the user code class loader. The network stack fails the shuffle 
> with a RemoteTransportException, which has the user exception as a cause. 
> When the consuming task receives this exception, this leads to a class not 
> found exception, because the network stack tries to load the class with the 
> system class loader.
> {code}
> +----------+
> | FAILING  |
> | PRODUCER |
> +----------+
>      || 
>      \/
>  ProducerFailedException(CAUSE) via network
>      || 
>      \/
> +----------+
> | RECEIVER |
> +----------+
> {code}
> CAUSE is only loadable by the user code class loader.
> When trying to deserialize this, RECEIVER fails with a 
> LocalTransportException, which is super confusing, because the error is not 
> local, but remote.
> Thanks to [~rmetzger] for reporting and debugging the issue with the 
> following stack trace:
> {code}
> Flat Map (26/120)
> 14:03:00,343 ERROR 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask   - Flat Map 
> (26/120) failed
> java.lang.RuntimeException: Could not read next record.
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71)
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> java.lang.ClassNotFoundException: 
> kafka.common.ConsumerRebalanceFailedException
>         at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>         at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>         at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>         at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>         at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         ... 1 more
> Caused by: io.netty.handler.codec.DecoderException: 
> java.lang.ClassNotFoundException: 
> kafka.common.ConsumerRebalanceFailedException
>         at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>         ... 12 more
> Caused by: java.lang.ClassNotFoundException: 
> kafka.common.ConsumerRebalanceFailedException
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:278)
>         at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>         at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>         at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>         at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at java.lang.Throwable.readObject(Throwable.java:914)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$ErrorResponse.readFrom(NettyMessage.java:338)
>         at org.apache.flink.runtime.io.network.netty.NettyMessage$ 
> .decode(NettyMessage.java:161)
>         at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:123)
>         at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
>         ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to