Hi all,

I keep having an odd error that happens frequently (but not always, a Spark job 
can run for ~10 minutes before this is thrown). The NiFiReceiver's Runnable is 
unable to be serialized due to it being modified at the same time as it is 
being serialized. The full stack trace is below. I've looked through the 
NiFiReceiver code, including taking a copy and making the Runnable extend 
serializable but that did not solve it either.

Has anyone seen this before? To be frank, I'm unsure whether it's the receiver 
or my own code affecting the receiver.

Many thanks,

Michael

15/09/04 16:55:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 48.0 
(TID 411, slave6.localdomain): java.lang.RuntimeException: 
com.esotericsoftware.kryo.KryoException: 
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
cachedPDs (javax.security.auth.SubjectDomainCombiner)
combiner (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
contextClassLoader (org.apache.spark.streaming.util.RecurringTimer$$anon$1)
thread (org.apache.spark.streaming.util.RecurringTimer)
blockIntervalTimer (org.apache.spark.streaming.receiver.BlockGenerator)
blockGenerator (org.apache.spark.streaming.receiver.ReceiverSupervisorImpl)
executor_ (org.apache.nifi.spark.NiFiReceiver)
this$0 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable)
this$1 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable$1)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
        at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
        at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1189)
        at 
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1198)
        at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:190)
        at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:480)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
        at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
        at java.util.Vector$Itr.next(Vector.java:1137)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        ... 78 more

        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:745)



Michael Griffiths
NationProtect Developer
BAE Systems Applied Intelligence
___________________________________________________________

E: [email protected]

BAE Systems Applied Intelligence, Surrey Research Park, Guildford, Surrey, GU2 
7RQ.
www.baesystems.com/ai<http://www.baesystems.com/ai>

Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.

Reply via email to