Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin I made the following change and it didn't work. How do you want to proceed? --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -98,6 +98,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( override def isValid(): Boolean = true + private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir") + override def getPartitions: Array[Partition] = { assertValid() Array.tabulate(_blockIds.length) { i => @@ -136,7 +138,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath + tmpDir, UUID.randomUUID().toString).getAbsolutePath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) It did not work.. Looks like it needs an HDFS path.. 18/10/30 01:06:50 ERROR Executor: Exception in task 0.2 in stage 3.0 (TID 73) org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://hdpmgmt1.hdp.senia.org:8020/user/sparkstreaming/sparkstreaming/Spark_Streaming_MQ/receivedData/0/log-1540875768007-1540875828007,0,988) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:147) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:175) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Pathname /hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000002/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000001/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e from /hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000002/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000001/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:197) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:245) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:80) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:141) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForReceiver(WriteAheadLogUtils.scala:111) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:142) ... 12 more
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org