Github user gss2002 commented on the issue:

    https://github.com/apache/spark/pull/22867
  
    @vanzin I made the following change and it didn't work. How do you want to 
proceed?
    --- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
    +++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
    @@ -98,6 +98,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
     
       override def isValid(): Boolean = true
     
    +  private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir")
    +
       override def getPartitions: Array[Partition] = {
         assertValid()
         Array.tabulate(_blockIds.length) { i =>
    @@ -136,7 +138,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
             // this dummy directory should not already exist otherwise the WAL 
will try to recover
             // past events from the directory and throw errors.
             val nonExistentDirectory = new File(
    -          System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString).getAbsolutePath
    +          tmpDir, UUID.randomUUID().toString).getAbsolutePath
             writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
               SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
             dataRead = writeAheadLog.read(partition.walRecordHandle)
    
    It did not work.. Looks like it needs an HDFS path..
    
    18/10/30 01:06:50 ERROR Executor: Exception in task 0.2 in stage 3.0 (TID 
73)
    org.apache.spark.SparkException: Could not read data from write ahead log 
record 
FileBasedWriteAheadLogSegment(hdfs://hdpmgmt1.hdp.senia.org:8020/user/sparkstreaming/sparkstreaming/Spark_Streaming_MQ/receivedData/0/log-1540875768007-1540875828007,0,988)
        at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:147)
        at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175)
        at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:175)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalArgumentException: Pathname 
/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000002/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000001/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e
 from 
/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000002/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_000001/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e
 is not a valid DFS filename.
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:197)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:245)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:80)
        at 
org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142)
        at 
org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:141)
        at 
org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForReceiver(WriteAheadLogUtils.scala:111)
        at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:142)
        ... 12 more


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to