[ https://issues.apache.org/jira/browse/FLINK-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ufuk Celebi reassigned FLINK-2381: ---------------------------------- Assignee: Ufuk Celebi > Possible class not found Exception on failed partition producer > --------------------------------------------------------------- > > Key: FLINK-2381 > URL: https://issues.apache.org/jira/browse/FLINK-2381 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Affects Versions: 0.9, master > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Fix For: 0.10, 0.9.1 > > > Failing the production of a result partition marks the respective partition > as failed with a ProducerFailedException. > The cause of this exception can be a user defined class, which can only be > loaded by the user code class loader. The network stack fails the shuffle > with a RemoteTransportException, which has the user exception as a cause. > When the consuming task receives this exception, this leads to a class not > found exception, because the network stack tries to load the class with the > system class loader. > {code} > +----------+ > | FAILING | > | PRODUCER | > +----------+ > || > \/ > ProducerFailedException(CAUSE) via network > || > \/ > +----------+ > | RECEIVER | > +----------+ > {code} > CAUSE is only loadable by the user code class loader. > When trying to deserialize this, RECEIVER fails with a > LocalTransportException, which is super confusing, because the error is not > local, but remote. > Thanks to [~rmetzger] for reporting and debugging the issue with the > following stack trace: > {code} > Flat Map (26/120) > 14:03:00,343 ERROR > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask - Flat Map > (26/120) failed > java.lang.RuntimeException: Could not read next record. > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > java.lang.ClassNotFoundException: > kafka.common.ConsumerRebalanceFailedException > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) > at > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) > at > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) > at > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > ... 1 more > Caused by: io.netty.handler.codec.DecoderException: > java.lang.ClassNotFoundException: > kafka.common.ConsumerRebalanceFailedException > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > ... 12 more > Caused by: java.lang.ClassNotFoundException: > kafka.common.ConsumerRebalanceFailedException > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:278) > at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) > at java.lang.Throwable.readObject(Throwable.java:914) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.runtime.io.network.netty.NettyMessage$ErrorResponse.readFrom(NettyMessage.java:338) > at org.apache.flink.runtime.io.network.netty.NettyMessage$ > .decode(NettyMessage.java:161) > at > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:123) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89) > ... 13 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)