Hi Dibyendu,

How does one go about configuring spark streaming to use tachyon as its
place for storing checkpoints? Also, can one do this with tachyon running
on a completely different node than where spark processes are running?

Thanks
Nikunj


On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi Tathagata,
>
> Thanks for looking into this. Further investigating I found that the issue
> is with Tachyon does not support File Append. The streaming receiver which
> writes to WAL when failed, and again restarted, not able to append to same
> WAL file after restart.
>
> I raised this with Tachyon user group, and Haoyuan told that within 3
> months time Tachyon file append will be ready. Will revisit this issue
> again then .
>
> Regards,
> Dibyendu
>
>
> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Looks like somehow the file size reported by the FSInputDStream of
>> Tachyon's FileSystem interface, is returning zero.
>>
>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Just to follow up this thread further .
>>>
>>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>>> the BlockNotFound exception when I used Hierarchical Storage of Tachyon
>>> ,  which is good.
>>>
>>> I continue doing some testing around storing the Spark Streaming WAL and
>>> CheckPoint files also in Tachyon . Here is few finding ..
>>>
>>>
>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>> failure.
>>>
>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>  for the same Receiver id which just failed.
>>>
>>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>>> recover from both Driver and Receiver failure .
>>>
>>> Here is the Log details when Spark Streaming receiver failed ...I raised
>>> a JIRA for the same issue :
>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>
>>>
>>>
>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>> (epoch 1)*
>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>>> remove executor 2 from BlockManagerMaster.
>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>> successfully in removeExecutor
>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>> receiver for stream 2 from 10.252.5.62*:47255
>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>>> not read data from write ahead log record
>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>>> EOF: 645603894, fileSize = 0*
>>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>> at
>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>> at
>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>> ... 15 more
>>>
>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in
>>> stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>> [duplicate 1]
>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in
>>> stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor
>>> updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
>>> Executor app-20150511104442-0048/2 removed: worker lost
>>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
>>> Asked to remove non-existent executor 2
>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>> [duplicate 2]
>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage 103.0
>>> failed 4 times; aborting job
>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet
>>> 103.0, whose tasks have all completed, from pool
>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage
>>> 103
>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error running
>>> job streaming job 1431341145000 ms.0
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in
>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could
>>> not read data from write ahead log record
>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>> )
>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.IllegalArgumentException: Seek position is past
>>> EOF: 645603894, fileSize = 0
>>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>> at
>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>> at
>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>> ... 15 more
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for the updates!
>>>>
>>>> Best,
>>>>
>>>> Haoyuan
>>>>
>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>
>>>>> Just a followup on this Thread .
>>>>>
>>>>> I tried Hierarchical Storage on Tachyon (
>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and
>>>>> that
>>>>> seems to have worked and I did not see any any Spark Job failed due to
>>>>> BlockNotFoundException.
>>>>> below is my  Hierarchical Storage settings..
>>>>>
>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>
>>>>>
>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>
>>>>> Regards,
>>>>> Dibyendu
>>>>>
>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>
>>>>> > Dear All ,
>>>>> >
>>>>> > I have been playing with Spark Streaming on Tachyon as the OFF_HEAP
>>>>> block
>>>>> > store  . Primary reason for evaluating Tachyon is to find if Tachyon
>>>>> can
>>>>> > solve the Spark BlockNotFoundException .
>>>>> >
>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted ,
>>>>> jobs
>>>>> > failed due to block not found exception and storing blocks in
>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>> throughput a
>>>>> > lot .
>>>>> >
>>>>> >
>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from master
>>>>> , and
>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode .
>>>>> Tachyon
>>>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3
>>>>> node AWS
>>>>> > x-large cluster.
>>>>> >
>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>>>> written
>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>> >
>>>>> >
>>>>> > I found there is similar improvement in throughput (as MEMORY_ONLY
>>>>> case )
>>>>> > but very good overall memory utilization (as it is off heap store) .
>>>>> >
>>>>> >
>>>>> > But I found one issue on which I need to clarification .
>>>>> >
>>>>> >
>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but due to a
>>>>> > different reason .  What I see TachyonBlockManager.scala put the
>>>>> blocks in
>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate
>>>>> evicted
>>>>> > from Tachyon Cache and when Spark try to find the block it throws
>>>>> >  BlockNotFoundException .
>>>>> >
>>>>> > I see a pull request which discuss the same ..
>>>>> >
>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>> >
>>>>> >
>>>>> > When I modified the WriteType to CACHE_THROUGH , BlockDropException
>>>>> is
>>>>> > gone , but it again impact the throughput ..
>>>>> >
>>>>> >
>>>>> > Just curious to know , if Tachyon has any settings which can solve
>>>>> the
>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>> > CACHE_THROUGH  ?
>>>>> >
>>>>> > Regards,
>>>>> > Dibyendu
>>>>> >
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Haoyuan Li
>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>
>>>
>>>
>>
>

Reply via email to