Hi,

I am studying the RDD Caching function and write a small program to verify it. 
I run the program in a Spark1.3.0 environment and on Yarn cluster. But I meet a 
weird exception. It isn't always generated in the log. Only sometimes I can see 
this exception. And it does not affect the output of my program.  Could anyone 
explain why this happens and how to eliminate it?

My program and the exception is listed in the following. Thanks very much for 
the help!

*****Program*****
object TestSparkCaching01 {
 def main(args: Array[String]) {
   val conf = new SparkConf()
   conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   conf.set("spark.kryo.registrationRequired","true")
   conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]]))
   val inFile = "hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt"
   val sc = new SparkContext(conf)
   val rdd = sc.textFile(inFile)
   rdd.cache()
   rdd.map("Cache String: "+_).foreach(println )
   sc.stop()
 }
}

*****Exception*****
15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was thrown 
by an exception handler.
java.util.concurrent.RejectedExecutionException: Worker has already been 
shutdown
                at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
                at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
                at 
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
                at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
                at 
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
                at 
org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
                at 
org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
                at 
org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
                at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
                at 
org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
                at 
org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211)
                at 
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223)
                at 
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222)
                at scala.util.Success.foreach(Try.scala:205)
                at 
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
                at 
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
                at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
                at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
                at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
                at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
                at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
                at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
                at 
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
                at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
                at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.

Reply via email to