I wanted to add that we are not configuring the WAL in our scenario.

Thanks again,
Nikunj


On Sat, Sep 26, 2015 at 11:35 AM, N B <nb.nos...@gmail.com> wrote:

> Hi Dibyendu,
>
> Thanks. I believe I understand why it has been an issue using S3 for
> checkpoints based on your explanation. But does this limitation apply only
> if recovery is needed in case of driver failure?
>
> What if we are not interested in recovery after a driver failure. However,
> just for the purposes of running streaming pipelines that do
> reduceByKeyAndWindow() and updateStateByKey() operations it needs to have a
> checkpoint directory configured.
>
> Do you think this usage will also run into issues if an S3 location is
> provided for the checkpoint directory. We will not use it to do any
> explicit recovery like I stated above.
>
> Thanks
> Nikunj
>
>
>
> On Sat, Sep 26, 2015 at 3:13 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> In Spark Streaming , Checkpoint Directory is used for two purpose
>>
>> 1. Metadata checkpointing
>>
>> 2. Data checkpointing
>>
>> If you enable WAL to recover from Driver failure, Spark Streaming will
>> also write the Received Blocks in WAL which stored in checkpoint directory.
>>
>> For streaming solution to recover from any failure without any data loss
>> , you need to enable Meta Data Check pointing and WAL.  You do not need to
>> enable Data Check pointing.
>>
>> From my experiments and the PR I mentioned , I configured the Meta Data
>> Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
>> did not use any WAL . The PR I proposed would recover from Driver fail-over
>> without using any WAL like feature because Blocks are already available in
>> Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
>> received blocks.
>>
>> Now the question is , can I configure Tachyon as my Metadata Checkpoint
>> location ? I tried that , and Streaming application writes the
>> receivedBlockMeataData to Tachyon, but on driver failure, it can not
>> recover the received block meta data from Tachyon. I sometime see Zero size
>> files in Tachyon checkpoint location , and it can not recover past events .
>> I need to understand what is the issue of storing meta data in Tachyon .
>> That needs a different JIRA I guess.
>>
>> Let me know I am able to explain the current scenario around Spark
>> Streaming and Tachyon .
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>>
>> On Sat, Sep 26, 2015 at 1:04 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Hi Dibyendu,
>>>
>>> I am not sure I understand completely. But are you suggesting that
>>> currently there is no way to enable Checkpoint directory to be in Tachyon?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for
>>>> Spark Streaming and make sure Spark Streaming can recover from Driver
>>>> failure and recover the blocks form Tachyon.
>>>>
>>>> The The Motivation for this PR is  :
>>>>
>>>> If Streaming application stores the blocks OFF_HEAP, it may not need
>>>> any WAL like feature to recover from Driver failure. As long as the writing
>>>> of blocks to Tachyon from Streaming receiver is durable, it should be
>>>> recoverable from Tachyon directly on Driver failure.
>>>> This can solve the issue of expensive WAL write and duplicating the
>>>> blocks both in MEMORY and also WAL and also guarantee end to end
>>>> No-Data-Loss channel using OFF_HEAP store.
>>>>
>>>> https://github.com/apache/spark/pull/8817
>>>>
>>>> This PR still under review . But having done various fail over testing
>>>> in my environment , I see this PR worked perfectly fine without any data
>>>> loss . Let see what TD and other have to say on this PR .
>>>>
>>>> Below is the configuration I used to test this PR ..
>>>>
>>>>
>>>> Spark : 1.6 from Master
>>>> Tachyon : 0.7.1
>>>>
>>>> SparkConfiguration Details :
>>>>
>>>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>>>> .set("spark.streaming.unpersist", "true")
>>>> .set("spark.local.dir", "/mnt1/spark/tincan")
>>>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>>>> .set("tachyon.usezookeeper","true")
>>>> .set("spark.externalBlockStore.url", "tachyon-ft://
>>>> ip-10-252-5-113.asskickery.us:19998")
>>>>         .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>>>>         .set("spark.externalBlockStore.folderName","pearson")
>>>>         .set("spark.externalBlockStore.dirId", "subpub")
>>>>
>>>> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>>>>
>>>> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
>>>> 10000));
>>>>
>>>> String checkpointDirectory = "hdfs://
>>>> 10.252.5.113:9000/user/hadoop/spark/wal";
>>>>
>>>> jsc.checkpoint(checkpointDirectory);
>>>>
>>>>
>>>> //I am using the My Receiver Based Consumer (
>>>> https://github.com/dibbhatt/kafka-spark-consumer) . But
>>>> KafkaUtil.CreateStream will also work
>>>>
>>>> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
>>>> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>>>>
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb.nos...@gmail.com> wrote:
>>>>
>>>>> Hi Dibyendu,
>>>>>
>>>>> How does one go about configuring spark streaming to use tachyon as
>>>>> its place for storing checkpoints? Also, can one do this with tachyon
>>>>> running on a completely different node than where spark processes are
>>>>> running?
>>>>>
>>>>> Thanks
>>>>> Nikunj
>>>>>
>>>>>
>>>>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>
>>>>>> Hi Tathagata,
>>>>>>
>>>>>> Thanks for looking into this. Further investigating I found that the
>>>>>> issue is with Tachyon does not support File Append. The streaming 
>>>>>> receiver
>>>>>> which writes to WAL when failed, and again restarted, not able to append 
>>>>>> to
>>>>>> same WAL file after restart.
>>>>>>
>>>>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>>>>> months time Tachyon file append will be ready. Will revisit this issue
>>>>>> again then .
>>>>>>
>>>>>> Regards,
>>>>>> Dibyendu
>>>>>>
>>>>>>
>>>>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Looks like somehow the file size reported by the FSInputDStream of
>>>>>>> Tachyon's FileSystem interface, is returning zero.
>>>>>>>
>>>>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Just to follow up this thread further .
>>>>>>>>
>>>>>>>> I was doing some fault tolerant testing of Spark Streaming with
>>>>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could 
>>>>>>>> able
>>>>>>>> to solve the BlockNotFound exception when I used Hierarchical Storage
>>>>>>>> of Tachyon ,  which is good.
>>>>>>>>
>>>>>>>> I continue doing some testing around storing the Spark Streaming
>>>>>>>> WAL and CheckPoint files also in Tachyon . Here is few finding ..
>>>>>>>>
>>>>>>>>
>>>>>>>> When I store the Spark Streaming Checkpoint location in Tachyon ,
>>>>>>>> the throughput is much higher . I tested the Driver and Receiver 
>>>>>>>> failure
>>>>>>>> cases , and Spark Streaming is able to recover without any Data Loss on
>>>>>>>> Driver failure.
>>>>>>>>
>>>>>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>>>>>> Exception while reading the WAL file from Tachyon "receivedData" 
>>>>>>>> location
>>>>>>>>  for the same Receiver id which just failed.
>>>>>>>>
>>>>>>>> If I change the Checkpoint location back to HDFS , Spark Streaming
>>>>>>>> can recover from both Driver and Receiver failure .
>>>>>>>>
>>>>>>>> Here is the Log details when Spark Streaming receiver failed ...I
>>>>>>>> raised a JIRA for the same issue :
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>>>>>>> (epoch 1)*
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying
>>>>>>>> to remove executor 2 from BlockManagerMaster.
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint -
>>>>>>>> Removing block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>>>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>>>>>>> successfully in removeExecutor
>>>>>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - 
>>>>>>>> *Registered
>>>>>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>>>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>>>>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: 
>>>>>>>> *Could
>>>>>>>> not read data from write ahead log record
>>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>>> at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is
>>>>>>>> past EOF: 645603894, fileSize = 0*
>>>>>>>> at
>>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>>> ... 15 more
>>>>>>>>
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task
>>>>>>>> 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>>>>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>>>>>> org.apache.spark.SparkException (Could not read data from write ahead 
>>>>>>>> log
>>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>>> [duplicate 1]
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task
>>>>>>>> 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>>>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor -
>>>>>>>> Executor updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>>>>>>> INFO :
>>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - 
>>>>>>>> Executor
>>>>>>>> app-20150511104442-0048/2 removed: worker lost
>>>>>>>> ERROR:
>>>>>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Asked 
>>>>>>>> to
>>>>>>>> remove non-existent executor 2
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>>>>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>>>>>> org.apache.spark.SparkException (Could not read data from write ahead 
>>>>>>>> log
>>>>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>>>>> [duplicate 2]
>>>>>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage
>>>>>>>> 103.0 failed 4 times; aborting job
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed
>>>>>>>> TaskSet 103.0, whose tasks have all completed, from pool
>>>>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling
>>>>>>>> stage 103
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>>>>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>>>>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>>>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
>>>>>>>> running job streaming job 1431341145000 ms.0
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>> Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 
>>>>>>>> 2.3 in
>>>>>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: 
>>>>>>>> Could
>>>>>>>> not read data from write ahead log record
>>>>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>>>>> )
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>>>> at
>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>>>>> at
>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Seek position is
>>>>>>>> past EOF: 645603894, fileSize = 0
>>>>>>>> at
>>>>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>>>>> ... 15 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the updates!
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Haoyuan
>>>>>>>>>
>>>>>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Just a followup on this Thread .
>>>>>>>>>>
>>>>>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) ,
>>>>>>>>>> and that
>>>>>>>>>> seems to have worked and I did not see any any Spark Job failed
>>>>>>>>>> due to
>>>>>>>>>> BlockNotFoundException.
>>>>>>>>>> below is my  Hierarchical Storage settings..
>>>>>>>>>>
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>>>>>
>>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Dibyendu
>>>>>>>>>>
>>>>>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> > Dear All ,
>>>>>>>>>> >
>>>>>>>>>> > I have been playing with Spark Streaming on Tachyon as the
>>>>>>>>>> OFF_HEAP block
>>>>>>>>>> > store  . Primary reason for evaluating Tachyon is to find if
>>>>>>>>>> Tachyon can
>>>>>>>>>> > solve the Spark BlockNotFoundException .
>>>>>>>>>> >
>>>>>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are
>>>>>>>>>> evicted , jobs
>>>>>>>>>> > failed due to block not found exception and storing blocks in
>>>>>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>>>>>>> throughput a
>>>>>>>>>> > lot .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from
>>>>>>>>>> master , and
>>>>>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant
>>>>>>>>>> Mode . Tachyon
>>>>>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running
>>>>>>>>>> in 3 node AWS
>>>>>>>>>> > x-large cluster.
>>>>>>>>>> >
>>>>>>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I
>>>>>>>>>> have written
>>>>>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > I found there is similar improvement in throughput (as
>>>>>>>>>> MEMORY_ONLY case )
>>>>>>>>>> > but very good overall memory utilization (as it is off heap
>>>>>>>>>> store) .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > But I found one issue on which I need to clarification .
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but
>>>>>>>>>> due to a
>>>>>>>>>> > different reason .  What I see TachyonBlockManager.scala put
>>>>>>>>>> the blocks in
>>>>>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks
>>>>>>>>>> ate evicted
>>>>>>>>>> > from Tachyon Cache and when Spark try to find the block it
>>>>>>>>>> throws
>>>>>>>>>> >  BlockNotFoundException .
>>>>>>>>>> >
>>>>>>>>>> > I see a pull request which discuss the same ..
>>>>>>>>>> >
>>>>>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > When I modified the WriteType to CACHE_THROUGH ,
>>>>>>>>>> BlockDropException is
>>>>>>>>>> > gone , but it again impact the throughput ..
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Just curious to know , if Tachyon has any settings which can
>>>>>>>>>> solve the
>>>>>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>>>>>>> > CACHE_THROUGH  ?
>>>>>>>>>> >
>>>>>>>>>> > Regards,
>>>>>>>>>> > Dibyendu
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Haoyuan Li
>>>>>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to