Sioa Song created SPARK-10358:
---------------------------------

             Summary: Spark-sql throws IOException on exit when using HDFS to 
store event log.
                 Key: SPARK-10358
                 URL: https://issues.apache.org/jira/browse/SPARK-10358
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.3.1
         Environment: * spark-1.3.1-bin-hadoop2.6
* hadoop-2.6.0
* Red hat 2.6.32-504.el6.x86_64
            Reporter: Sioa Song
            Priority: Minor
             Fix For: 1.3.1


h2. Summary 
In Spark 1.3.1, if using HDFS to store event log, spark-sql will throw an 
"java.io.IOException: Filesystem closed" when exit. 
h2. How to reproduce 
1. Enable event log mechanism, and configure the file location to HDFS. 
   You can do this by setting these two properties in spark-defaults.conf: 
spark.eventLog.enabled          true 
spark.eventLog.dir              hdfs://xxxxx:xxxxx/spark-events 
2. start spark-sql, and type exit once it starts. 
{noformat} 
spark-sql> exit; 
15/08/14 06:29:20 ERROR scheduler.LiveListenerBus: Listener 
EventLoggingListener threw an exception 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 
at java.lang.reflect.Method.invoke(Method.java:597) 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
 
at scala.Option.foreach(Option.scala:236) 
at 
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
 
at 
org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:181)
 
at 
org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:54)
 
at 
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
 
at 
org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
 
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) 
at 
org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36)
 
at 
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76)
 
at 
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
 
at 
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
 
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1678) 
at 
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
 
Caused by: java.io.IOException: Filesystem closed 
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) 
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985) 
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) 
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) 
... 19 more 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/metrics/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/rdd,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/pool/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/pool,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/jobs/job/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/jobs/job,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/jobs/json,null} 
15/08/14 06:29:20 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/jobs,null} 
15/08/14 06:29:20 INFO ui.SparkUI: Stopped Spark web UI at 
http://9.111.251.177:4040 
15/08/14 06:29:20 INFO scheduler.DAGScheduler: Stopping DAGScheduler 
15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Shutting down all 
executors 
15/08/14 06:29:20 INFO cluster.SparkDeploySchedulerBackend: Asking each 
executor to shut down 
Exception in thread "Thread-4" java.io.IOException: Filesystem closed 
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795) 
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1986) 
at 
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
 
at 
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
 
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
 
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) 
at 
org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:198)
 
at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) 
at org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1391) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.SparkContext.stop(SparkContext.scala:1391) 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66) 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)
 
{noformat} 
h2. Analysis 
Enabling DEBUG level, we can see more information: 
{noformat} 
15/08/14 09:32:34 DEBUG SessionState: Removing resource dir 
/tmp/a9d3b5b8-3329-4e93-aee2-0a3496d661fa_resources 
15/08/14 09:32:34 DEBUG SparkSQLEnv: Shutting down Spark SQL Environment 
15/08/14 09:32:34 DEBUG DiskBlockManager: Shutdown hook called 
15/08/14 09:32:34 DEBUG DFSClient: DFSClient writeChunk allocating new packet 
seqno=2, src=/spark/events/app-20150814093225-0002.snappy.inprogress, 
packetSize=65532, chunksPerPacket=127,bytesCurBlock=2048 
15/08/14 09:32:34 DEBUG DFSClient: Queued packet 2 
15/08/14 09:32:34 DEBUG DFSClient: Queued packet 3 
15/08/14 09:32:34 DEBUG DFSClient: Waiting for ack for: 3 
15/08/14 09:32:34 DEBUG Utils: Shutdown hook called 
15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block 
BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet packet 
seqno:2 offsetInBlock:2048 lastPacketInBlock:false lastByteOffsetInBlock: 2399 
15/08/14 09:32:34 INFO SparkContext: END POST APPLICATION END 
15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 2 status: SUCCESS 
downstreamAckTimeNanos: 0 
15/08/14 09:32:34 DEBUG DFSClient: DataStreamer block 
BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 sending packet packet 
seqno:3 offsetInBlock:2399 lastPacketInBlock:true lastByteOffsetInBlock: 2399 
15/08/14 09:32:34 DEBUG DFSClient: DFSClient seqno: 3 status: SUCCESS 
downstreamAckTimeNanos: 0 
15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping 
org.spark-project.jetty.server.Server@489bb457 
15/08/14 09:32:34 DEBUG DFSClient: Closing old block 
BP-456748518-127.0.0.1-1439547535043:blk_1073741833_1010 
15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping 
SelectChannelConnector@0.0.0.0:4040 
15/08/14 09:32:34 DEBUG AbstractLifeCycle: stopping 
org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@15e3d24a
 
15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-45 Selector0,5,main] 
on org.spark-project.jetty.io.nio.SelectorManager$1@25964fe8 
15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to 
/9.111.251.177:9000 from root sending #6 
15/08/14 09:32:34 DEBUG Client: IPC Client (123257783) connection to 
/9.111.251.177:9000 from root got value #6 
15/08/14 09:32:34 DEBUG ProtobufRpcEngine: Call: complete took 4ms 
15/08/14 09:32:34 DEBUG nio: Stopped Thread[qtp1717362942-46 Selector1,5,main] 
on org.spark-project.jetty.io.nio.SelectorManager$1@2f581b9f 
15/08/14 09:32:34 DEBUG Client: stopping client from cache: 
org.apache.hadoop.ipc.Client@3aca5e2 
15/08/14 09:32:34 DEBUG Client: removing client from cache: 
org.apache.hadoop.ipc.Client@3aca5e2 
15/08/14 09:32:34 DEBUG Client: stopping actual client because no more 
references remain: org.apache.hadoop.ipc.Client@3aca5e2 
15/08/14 09:32:34 DEBUG Client: Stopping client 
15/08/14 09:32:34 *ERROR* LiveListenerBus: Listener EventLoggingListener threw 
an exception 
java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 
at java.lang.reflect.Method.invoke(Method.java:597) 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144)
 
at scala.Option.foreach(Option.scala:236) 
{noformat} 

>From the stacktrace, we can see a "Shutdown hook called" of 
>org.apache.spark.util.Utils, and then the shutdown hook of HDFS where 
>DFSClient is stopped, and finally the SparkSQLEnv.stop, where the exception 
>was thrown. 
Clearly, *Spark is trying to add new event log message after HDFS client is 
closed, which caused the exception*. 

After checking org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver, we 
found that the SparkSQLEnv.stop() is invoked from a normal java runtime 
shutdown hook: 
{code} 
    // Clean up after we exit 
    Runtime.getRuntime.addShutdownHook( 
      new Thread() { 
        override def run() { 
          SparkSQLEnv.stop() 
        } 
      } 
    ) 
{code} 
This shutdown hook has lower priority, and will be executed after the DFSClient 
is closed. 
h2. Solution 
We changed it by *using Hadoop's ShutdownHookManager and assigning a higher 
priority than HDFS's shutdown hook*. It fixed the problem. 
We'll link a pull request later for review. 
The link is [https://github.com/apache/spark/pull/8530#issuecomment-136236733]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to