Hi Dibyendu, How does one go about configuring spark streaming to use tachyon as its place for storing checkpoints? Also, can one do this with tachyon running on a completely different node than where spark processes are running?
Thanks Nikunj On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > Hi Tathagata, > > Thanks for looking into this. Further investigating I found that the issue > is with Tachyon does not support File Append. The streaming receiver which > writes to WAL when failed, and again restarted, not able to append to same > WAL file after restart. > > I raised this with Tachyon user group, and Haoyuan told that within 3 > months time Tachyon file append will be ready. Will revisit this issue > again then . > > Regards, > Dibyendu > > > On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com> > wrote: > >> Looks like somehow the file size reported by the FSInputDStream of >> Tachyon's FileSystem interface, is returning zero. >> >> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> Just to follow up this thread further . >>> >>> I was doing some fault tolerant testing of Spark Streaming with Tachyon >>> as OFF_HEAP block store. As I said in earlier email, I could able to solve >>> the BlockNotFound exception when I used Hierarchical Storage of Tachyon >>> , which is good. >>> >>> I continue doing some testing around storing the Spark Streaming WAL and >>> CheckPoint files also in Tachyon . Here is few finding .. >>> >>> >>> When I store the Spark Streaming Checkpoint location in Tachyon , the >>> throughput is much higher . I tested the Driver and Receiver failure cases >>> , and Spark Streaming is able to recover without any Data Loss on Driver >>> failure. >>> >>> *But on Receiver failure , Spark Streaming looses data* as I see >>> Exception while reading the WAL file from Tachyon "receivedData" location >>> for the same Receiver id which just failed. >>> >>> If I change the Checkpoint location back to HDFS , Spark Streaming can >>> recover from both Driver and Receiver failure . >>> >>> Here is the Log details when Spark Streaming receiver failed ...I raised >>> a JIRA for the same issue : >>> https://issues.apache.org/jira/browse/SPARK-7525 >>> >>> >>> >>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 >>> (epoch 1)* >>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to >>> remove executor 2 from BlockManagerMaster. >>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing >>> block manager BlockManagerId(2, 10.252.5.54, 45789) >>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 >>> successfully in removeExecutor >>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered >>> receiver for stream 2 from 10.252.5.62*:47255 >>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in >>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could >>> not read data from write ahead log record >>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 >>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)* >>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) >>> at >>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>> at >>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>> at scala.Option.getOrElse(Option.scala:120) >>> at >>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) >>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:744) >>> Caused by: java.lang.IllegalArgumentException:* Seek position is past >>> EOF: 645603894, fileSize = 0* >>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) >>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) >>> at >>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) >>> at >>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) >>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) >>> ... 15 more >>> >>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in >>> stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) >>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in >>> stage 103.0 (TID 422) on executor 10.252.5.61: >>> org.apache.spark.SparkException (Could not read data from write ahead log >>> record FileBasedWriteAheadLogSegment(tachyon-ft:// >>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) >>> [duplicate 1] >>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in >>> stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) >>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor >>> updated: app-20150511104442-0048/2 is now LOST (worker lost) >>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - >>> Executor app-20150511104442-0048/2 removed: worker lost >>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - >>> Asked to remove non-existent executor 2 >>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in >>> stage 103.0 (TID 423) on executor 10.252.5.62: >>> org.apache.spark.SparkException (Could not read data from write ahead log >>> record FileBasedWriteAheadLogSegment(tachyon-ft:// >>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) >>> [duplicate 2] >>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage 103.0 >>> failed 4 times; aborting job >>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet >>> 103.0, whose tasks have all completed, from pool >>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage >>> 103 >>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103 >>> (foreachRDD at Consumer.java:92) failed in 0.943 s >>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed: >>> foreachRDD at Consumer.java:92, took 0.953482 s >>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error running >>> job streaming job 1431341145000 ms.0 >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in >>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could >>> not read data from write ahead log record >>> FileBasedWriteAheadLogSegment(tachyon-ft:// >>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 >>> ) >>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) >>> at >>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>> at >>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>> at scala.Option.getOrElse(Option.scala:120) >>> at >>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) >>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:744) >>> Caused by: java.lang.IllegalArgumentException: Seek position is past >>> EOF: 645603894, fileSize = 0 >>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) >>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) >>> at >>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) >>> at >>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) >>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) >>> ... 15 more >>> >>> >>> >>> >>> >>> >>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan...@gmail.com> >>> wrote: >>> >>>> Thanks for the updates! >>>> >>>> Best, >>>> >>>> Haoyuan >>>> >>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya < >>>> dibyendu.bhattach...@gmail.com> wrote: >>>> >>>>> Just a followup on this Thread . >>>>> >>>>> I tried Hierarchical Storage on Tachyon ( >>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and >>>>> that >>>>> seems to have worked and I did not see any any Spark Job failed due to >>>>> BlockNotFoundException. >>>>> below is my Hierarchical Storage settings.. >>>>> >>>>> -Dtachyon.worker.hierarchystore.level.max=2 >>>>> -Dtachyon.worker.hierarchystore.level0.alias=MEM >>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER >>>>> >>>>> >>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE >>>>> -Dtachyon.worker.hierarchystore.level1.alias=HDD >>>>> -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon >>>>> -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB >>>>> -Dtachyon.worker.allocate.strategy=MAX_FREE >>>>> -Dtachyon.worker.evict.strategy=LRU >>>>> >>>>> Regards, >>>>> Dibyendu >>>>> >>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya < >>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>> >>>>> > Dear All , >>>>> > >>>>> > I have been playing with Spark Streaming on Tachyon as the OFF_HEAP >>>>> block >>>>> > store . Primary reason for evaluating Tachyon is to find if Tachyon >>>>> can >>>>> > solve the Spark BlockNotFoundException . >>>>> > >>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted , >>>>> jobs >>>>> > failed due to block not found exception and storing blocks in >>>>> > MEMORY_AND_DISK is not a good option either as it impact the >>>>> throughput a >>>>> > lot . >>>>> > >>>>> > >>>>> > To test how Tachyon behave , I took the latest spark 1.4 from master >>>>> , and >>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode . >>>>> Tachyon >>>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3 >>>>> node AWS >>>>> > x-large cluster. >>>>> > >>>>> > I have used the low level Receiver based Kafka consumer ( >>>>> > https://github.com/dibbhatt/kafka-spark-consumer) which I have >>>>> written >>>>> > to pull from Kafka and write Blocks to Tachyon >>>>> > >>>>> > >>>>> > I found there is similar improvement in throughput (as MEMORY_ONLY >>>>> case ) >>>>> > but very good overall memory utilization (as it is off heap store) . >>>>> > >>>>> > >>>>> > But I found one issue on which I need to clarification . >>>>> > >>>>> > >>>>> > In Tachyon case also , I find BlockNotFoundException , but due to a >>>>> > different reason . What I see TachyonBlockManager.scala put the >>>>> blocks in >>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate >>>>> evicted >>>>> > from Tachyon Cache and when Spark try to find the block it throws >>>>> > BlockNotFoundException . >>>>> > >>>>> > I see a pull request which discuss the same .. >>>>> > >>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271 >>>>> > >>>>> > >>>>> > When I modified the WriteType to CACHE_THROUGH , BlockDropException >>>>> is >>>>> > gone , but it again impact the throughput .. >>>>> > >>>>> > >>>>> > Just curious to know , if Tachyon has any settings which can solve >>>>> the >>>>> > Block Eviction from Cache to Disk, other than explicitly setting >>>>> > CACHE_THROUGH ? >>>>> > >>>>> > Regards, >>>>> > Dibyendu >>>>> > >>>>> > >>>>> > >>>>> >>>> >>>> >>>> >>>> -- >>>> Haoyuan Li >>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/> >>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/ >>>> >>> >>> >> >