Re: Support for local disk columnar storage for DataFrames

2015-11-20 Thread Cristian O
Raised this for checkpointing, hopefully it gets some priority as it's very
useful and relatively straightforward to implement ?

https://issues.apache.org/jira/browse/SPARK-11879

On 18 November 2015 at 16:31, Cristian O 
wrote:

> Hi,
>
> While these OSS efforts are interesting, they're for now quite unproven.
> Personally would be much more interested in seeing Spark incrementally
> moving towards supporting updating DataFrames on various storage
> substrates, and first of all locally, perhaps as an extension of cached
> DataFrames.
>
> However before we get full blown update support, I would suggest two
> enhancements that are fairly straightforward with the current design. If
> they make sense please let me know and I'll add them as Jiras:
>
> 1. Checkpoint support for DataFrames - as mentioned this can be as simple
> as saving to a parquet file or some other format, but would not require
> re-reading the file to alter the lineage, and would also prune the logical
> plan. Alternatively checkpointing a cached DataFrame can delegate to
> checkpointing the underlying RDD but again needs to prune the logical plan.
>
> 2. Efficient transformation of cached DataFrames to cached DataFrames - an
> efficient copy-on-write mechanism can be used to avoid unpacking
> CachedBatches (row groups) into InternalRows when building a cached
> DataFrame out of a source cached DataFrame through transformations (like an
> outer join) that only affect a small subset of rows. Statistics and
> partitioning information can be used to determine which row groups are
> affected and which can be copied *by reference* unchanged. This would
> effectively allow performing immutable updates of cached DataFrames in
> scenarios like Streaming or other iterative use cases like ML.
>
> Thanks,
> Cristian
>
>
>
> On 16 November 2015 at 08:30, Mark Hamstra 
> wrote:
>
>> FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB
>>
>> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
>>> input/output format support:
>>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>>>
>>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin 
>>> wrote:
>>>
 This (updates) is something we are going to think about in the next
 release or two.

 On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
 cristian.b.op...@googlemail.com> wrote:

> Sorry, apparently only replied to Reynold, meant to copy the list as
> well, so I'm self replying and taking the opportunity to illustrate with 
> an
> example.
>
> Basically I want to conceptually do this:
>
> val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => 
> (i, 1)).toDF("k", "v")
> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i 
> => (i, 1)).toDF("k", "v")
>
> bigDf.cache()
>
> bigDf.registerTempTable("big")
> deltaDf.registerTempTable("delta")
>
> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 
> 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>
> newBigDf.cache()
> bigDf.unpersist()
>
>
> This is essentially an update of keys "1" and "5" only, in a
> dataset of 1 million keys.
>
> This can be achieved efficiently if the join would preserve the cached
> blocks that have been unaffected, and only copy and mutate the 2 affected
> blocks corresponding to the matching join keys.
>
> Statistics can determine which blocks actually need mutating. Note
> also that shuffling is not required assuming both dataframes are
> pre-partitioned by the same key K.
>
> In SQL this could actually be expressed as an UPDATE statement or for
> a more generalized use as a MERGE UPDATE:
> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>
> While this may seem like a very special case optimization, it would
> effectively implement UPDATE support for cached DataFrames, for both
> optimal and non-optimal usage.
>
> I appreciate there's quite a lot here, so thank you for taking the
> time to consider it.
>
> Cristian
>
>
>
> On 12 November 2015 at 15:49, Cristian O <
> cristian.b.op...@googlemail.com> wrote:
>
>> Hi Reynold,
>>
>> Thanks for your reply.
>>
>> Parquet may very well be used as the underlying implementation, but
>> this is more than about a particular storage representation.
>>
>> There are a few things here that are inter-related and open different
>> possibilities, so it's hard to structure, but I'll give it a try:
>>
>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>> parquet, just using that as a checkpoint would currently require 
>> ex

Re: Support for local disk columnar storage for DataFrames

2015-11-18 Thread Cristian O
Hi,

While these OSS efforts are interesting, they're for now quite unproven.
Personally would be much more interested in seeing Spark incrementally
moving towards supporting updating DataFrames on various storage
substrates, and first of all locally, perhaps as an extension of cached
DataFrames.

However before we get full blown update support, I would suggest two
enhancements that are fairly straightforward with the current design. If
they make sense please let me know and I'll add them as Jiras:

1. Checkpoint support for DataFrames - as mentioned this can be as simple
as saving to a parquet file or some other format, but would not require
re-reading the file to alter the lineage, and would also prune the logical
plan. Alternatively checkpointing a cached DataFrame can delegate to
checkpointing the underlying RDD but again needs to prune the logical plan.

2. Efficient transformation of cached DataFrames to cached DataFrames - an
efficient copy-on-write mechanism can be used to avoid unpacking
CachedBatches (row groups) into InternalRows when building a cached
DataFrame out of a source cached DataFrame through transformations (like an
outer join) that only affect a small subset of rows. Statistics and
partitioning information can be used to determine which row groups are
affected and which can be copied *by reference* unchanged. This would
effectively allow performing immutable updates of cached DataFrames in
scenarios like Streaming or other iterative use cases like ML.

Thanks,
Cristian



On 16 November 2015 at 08:30, Mark Hamstra  wrote:

> FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB
>
> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath  > wrote:
>
>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
>> input/output format support:
>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>>
>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin  wrote:
>>
>>> This (updates) is something we are going to think about in the next
>>> release or two.
>>>
>>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>>> cristian.b.op...@googlemail.com> wrote:
>>>
 Sorry, apparently only replied to Reynold, meant to copy the list as
 well, so I'm self replying and taking the opportunity to illustrate with an
 example.

 Basically I want to conceptually do this:

 val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => 
 (i, 1)).toDF("k", "v")
 val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i 
 => (i, 1)).toDF("k", "v")

 bigDf.cache()

 bigDf.registerTempTable("big")
 deltaDf.registerTempTable("delta")

 val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 
 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")

 newBigDf.cache()
 bigDf.unpersist()


 This is essentially an update of keys "1" and "5" only, in a
 dataset of 1 million keys.

 This can be achieved efficiently if the join would preserve the cached
 blocks that have been unaffected, and only copy and mutate the 2 affected
 blocks corresponding to the matching join keys.

 Statistics can determine which blocks actually need mutating. Note also
 that shuffling is not required assuming both dataframes are pre-partitioned
 by the same key K.

 In SQL this could actually be expressed as an UPDATE statement or for a
 more generalized use as a MERGE UPDATE:
 https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx

 While this may seem like a very special case optimization, it would
 effectively implement UPDATE support for cached DataFrames, for both
 optimal and non-optimal usage.

 I appreciate there's quite a lot here, so thank you for taking the time
 to consider it.

 Cristian



 On 12 November 2015 at 15:49, Cristian O <
 cristian.b.op...@googlemail.com> wrote:

> Hi Reynold,
>
> Thanks for your reply.
>
> Parquet may very well be used as the underlying implementation, but
> this is more than about a particular storage representation.
>
> There are a few things here that are inter-related and open different
> possibilities, so it's hard to structure, but I'll give it a try:
>
> 1. Checkpointing DataFrames - while a DF can be saved locally as
> parquet, just using that as a checkpoint would currently require 
> explicitly
> reading it back. A proper checkpoint implementation would just save
> (perhaps asynchronously) and prune the logical plan while allowing to
> continue using the same DF, now backed by the checkpoint.
>
> It's important to prune the logical plan to avoid all kinds of issues
> that may arise from unbounded expansion with iterative use-cases, like 
> this
> one I encounte

Re: Support for local disk columnar storage for DataFrames

2015-11-16 Thread Mark Hamstra
FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB

On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath 
wrote:

> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
> input/output format support:
> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>
> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin  wrote:
>
>> This (updates) is something we are going to think about in the next
>> release or two.
>>
>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>> cristian.b.op...@googlemail.com> wrote:
>>
>>> Sorry, apparently only replied to Reynold, meant to copy the list as
>>> well, so I'm self replying and taking the opportunity to illustrate with an
>>> example.
>>>
>>> Basically I want to conceptually do this:
>>>
>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => 
>>> (i, 1)).toDF("k", "v")
>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => 
>>> (i, 1)).toDF("k", "v")
>>>
>>> bigDf.cache()
>>>
>>> bigDf.registerTempTable("big")
>>> deltaDf.registerTempTable("delta")
>>>
>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, 
>>> delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>>
>>> newBigDf.cache()
>>> bigDf.unpersist()
>>>
>>>
>>> This is essentially an update of keys "1" and "5" only, in a dataset
>>> of 1 million keys.
>>>
>>> This can be achieved efficiently if the join would preserve the cached
>>> blocks that have been unaffected, and only copy and mutate the 2 affected
>>> blocks corresponding to the matching join keys.
>>>
>>> Statistics can determine which blocks actually need mutating. Note also
>>> that shuffling is not required assuming both dataframes are pre-partitioned
>>> by the same key K.
>>>
>>> In SQL this could actually be expressed as an UPDATE statement or for a
>>> more generalized use as a MERGE UPDATE:
>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>>
>>> While this may seem like a very special case optimization, it would
>>> effectively implement UPDATE support for cached DataFrames, for both
>>> optimal and non-optimal usage.
>>>
>>> I appreciate there's quite a lot here, so thank you for taking the time
>>> to consider it.
>>>
>>> Cristian
>>>
>>>
>>>
>>> On 12 November 2015 at 15:49, Cristian O <
>>> cristian.b.op...@googlemail.com> wrote:
>>>
 Hi Reynold,

 Thanks for your reply.

 Parquet may very well be used as the underlying implementation, but
 this is more than about a particular storage representation.

 There are a few things here that are inter-related and open different
 possibilities, so it's hard to structure, but I'll give it a try:

 1. Checkpointing DataFrames - while a DF can be saved locally as
 parquet, just using that as a checkpoint would currently require explicitly
 reading it back. A proper checkpoint implementation would just save
 (perhaps asynchronously) and prune the logical plan while allowing to
 continue using the same DF, now backed by the checkpoint.

 It's important to prune the logical plan to avoid all kinds of issues
 that may arise from unbounded expansion with iterative use-cases, like this
 one I encountered recently:
 https://issues.apache.org/jira/browse/SPARK-11596

 But really what I'm after here is:

 2. Efficient updating of cached DataFrames - The main use case here is
 keeping a relatively large dataset cached and updating it iteratively from
 streaming. For example one would like to perform ad-hoc queries on an
 incrementally updated, cached DataFrame. I expect this is already becoming
 an increasingly common use case. Note that the dataset may require merging
 (like adding) or overrriding values by key, so simply appending is not
 sufficient.

 This is very similar in concept with updateStateByKey for regular RDDs,
 i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
 level  (the row blocks for the columnar representation).

 This can be currently simulated with UNION or (OUTER) JOINs however is
 very inefficient as it requires copying and recaching the entire dataset,
 and unpersisting the original one. There are also the aforementioned
 problems with unbounded logical plans (physical plans are fine)

 These two together, checkpointing and updating cached DataFrames, would
 give fault-tolerant efficient updating of DataFrames, meaning streaming
 apps can take advantage of the compact columnar representation and Tungsten
 optimisations.

 I'm not quite sure if something like this can be achieved by other
 means or has been investigated before, hence why I'm looking for feedback
 here.

 While one could use external data stores, they would have the added IO
 penalty, plus most of what's availab

Re: Support for local disk columnar storage for DataFrames

2015-11-16 Thread Nick Pentreath
Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
input/output format support:
https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java

On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin  wrote:

> This (updates) is something we are going to think about in the next
> release or two.
>
> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
> cristian.b.op...@googlemail.com> wrote:
>
>> Sorry, apparently only replied to Reynold, meant to copy the list as
>> well, so I'm self replying and taking the opportunity to illustrate with an
>> example.
>>
>> Basically I want to conceptually do this:
>>
>> val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => (i, 
>> 1)).toDF("k", "v")
>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => 
>> (i, 1)).toDF("k", "v")
>>
>> bigDf.cache()
>>
>> bigDf.registerTempTable("big")
>> deltaDf.registerTempTable("delta")
>>
>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, 
>> delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>
>> newBigDf.cache()
>> bigDf.unpersist()
>>
>>
>> This is essentially an update of keys "1" and "5" only, in a dataset
>> of 1 million keys.
>>
>> This can be achieved efficiently if the join would preserve the cached
>> blocks that have been unaffected, and only copy and mutate the 2 affected
>> blocks corresponding to the matching join keys.
>>
>> Statistics can determine which blocks actually need mutating. Note also
>> that shuffling is not required assuming both dataframes are pre-partitioned
>> by the same key K.
>>
>> In SQL this could actually be expressed as an UPDATE statement or for a
>> more generalized use as a MERGE UPDATE:
>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>
>> While this may seem like a very special case optimization, it would
>> effectively implement UPDATE support for cached DataFrames, for both
>> optimal and non-optimal usage.
>>
>> I appreciate there's quite a lot here, so thank you for taking the time
>> to consider it.
>>
>> Cristian
>>
>>
>>
>> On 12 November 2015 at 15:49, Cristian O > > wrote:
>>
>>> Hi Reynold,
>>>
>>> Thanks for your reply.
>>>
>>> Parquet may very well be used as the underlying implementation, but this
>>> is more than about a particular storage representation.
>>>
>>> There are a few things here that are inter-related and open different
>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>
>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>> parquet, just using that as a checkpoint would currently require explicitly
>>> reading it back. A proper checkpoint implementation would just save
>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>> continue using the same DF, now backed by the checkpoint.
>>>
>>> It's important to prune the logical plan to avoid all kinds of issues
>>> that may arise from unbounded expansion with iterative use-cases, like this
>>> one I encountered recently:
>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>
>>> But really what I'm after here is:
>>>
>>> 2. Efficient updating of cached DataFrames - The main use case here is
>>> keeping a relatively large dataset cached and updating it iteratively from
>>> streaming. For example one would like to perform ad-hoc queries on an
>>> incrementally updated, cached DataFrame. I expect this is already becoming
>>> an increasingly common use case. Note that the dataset may require merging
>>> (like adding) or overrriding values by key, so simply appending is not
>>> sufficient.
>>>
>>> This is very similar in concept with updateStateByKey for regular RDDs,
>>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
>>> level  (the row blocks for the columnar representation).
>>>
>>> This can be currently simulated with UNION or (OUTER) JOINs however is
>>> very inefficient as it requires copying and recaching the entire dataset,
>>> and unpersisting the original one. There are also the aforementioned
>>> problems with unbounded logical plans (physical plans are fine)
>>>
>>> These two together, checkpointing and updating cached DataFrames, would
>>> give fault-tolerant efficient updating of DataFrames, meaning streaming
>>> apps can take advantage of the compact columnar representation and Tungsten
>>> optimisations.
>>>
>>> I'm not quite sure if something like this can be achieved by other means
>>> or has been investigated before, hence why I'm looking for feedback here.
>>>
>>> While one could use external data stores, they would have the added IO
>>> penalty, plus most of what's available at the moment is either HDFS
>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>> space overhead over columnar formats.
>>>
>>> Thanks,
>>> Cristian
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 12 November 2015 at 03:31, Reynold Xin  wrote:
>>>
 Thanks for the ema

Re: Support for local disk columnar storage for DataFrames

2015-11-15 Thread Reynold Xin
This (updates) is something we are going to think about in the next release
or two.

On Thu, Nov 12, 2015 at 8:57 AM, Cristian O  wrote:

> Sorry, apparently only replied to Reynold, meant to copy the list as well,
> so I'm self replying and taking the opportunity to illustrate with an
> example.
>
> Basically I want to conceptually do this:
>
> val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => (i, 
> 1)).toDF("k", "v")
> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => 
> (i, 1)).toDF("k", "v")
>
> bigDf.cache()
>
> bigDf.registerTempTable("big")
> deltaDf.registerTempTable("delta")
>
> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, 
> delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>
> newBigDf.cache()
> bigDf.unpersist()
>
>
> This is essentially an update of keys "1" and "5" only, in a dataset
> of 1 million keys.
>
> This can be achieved efficiently if the join would preserve the cached
> blocks that have been unaffected, and only copy and mutate the 2 affected
> blocks corresponding to the matching join keys.
>
> Statistics can determine which blocks actually need mutating. Note also
> that shuffling is not required assuming both dataframes are pre-partitioned
> by the same key K.
>
> In SQL this could actually be expressed as an UPDATE statement or for a
> more generalized use as a MERGE UPDATE:
> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>
> While this may seem like a very special case optimization, it would
> effectively implement UPDATE support for cached DataFrames, for both
> optimal and non-optimal usage.
>
> I appreciate there's quite a lot here, so thank you for taking the time to
> consider it.
>
> Cristian
>
>
>
> On 12 November 2015 at 15:49, Cristian O 
> wrote:
>
>> Hi Reynold,
>>
>> Thanks for your reply.
>>
>> Parquet may very well be used as the underlying implementation, but this
>> is more than about a particular storage representation.
>>
>> There are a few things here that are inter-related and open different
>> possibilities, so it's hard to structure, but I'll give it a try:
>>
>> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet,
>> just using that as a checkpoint would currently require explicitly reading
>> it back. A proper checkpoint implementation would just save (perhaps
>> asynchronously) and prune the logical plan while allowing to continue using
>> the same DF, now backed by the checkpoint.
>>
>> It's important to prune the logical plan to avoid all kinds of issues
>> that may arise from unbounded expansion with iterative use-cases, like this
>> one I encountered recently:
>> https://issues.apache.org/jira/browse/SPARK-11596
>>
>> But really what I'm after here is:
>>
>> 2. Efficient updating of cached DataFrames - The main use case here is
>> keeping a relatively large dataset cached and updating it iteratively from
>> streaming. For example one would like to perform ad-hoc queries on an
>> incrementally updated, cached DataFrame. I expect this is already becoming
>> an increasingly common use case. Note that the dataset may require merging
>> (like adding) or overrriding values by key, so simply appending is not
>> sufficient.
>>
>> This is very similar in concept with updateStateByKey for regular RDDs,
>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
>> level  (the row blocks for the columnar representation).
>>
>> This can be currently simulated with UNION or (OUTER) JOINs however is
>> very inefficient as it requires copying and recaching the entire dataset,
>> and unpersisting the original one. There are also the aforementioned
>> problems with unbounded logical plans (physical plans are fine)
>>
>> These two together, checkpointing and updating cached DataFrames, would
>> give fault-tolerant efficient updating of DataFrames, meaning streaming
>> apps can take advantage of the compact columnar representation and Tungsten
>> optimisations.
>>
>> I'm not quite sure if something like this can be achieved by other means
>> or has been investigated before, hence why I'm looking for feedback here.
>>
>> While one could use external data stores, they would have the added IO
>> penalty, plus most of what's available at the moment is either HDFS
>> (extremely inefficient for updates) or key-value stores that have 5-10x
>> space overhead over columnar formats.
>>
>> Thanks,
>> Cristian
>>
>>
>>
>>
>>
>>
>> On 12 November 2015 at 03:31, Reynold Xin  wrote:
>>
>>> Thanks for the email. Can you explain what the difference is between
>>> this and existing formats such as Parquet/ORC?
>>>
>>>
>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>> cristian.b.op...@googlemail.com> wrote:
>>>
 Hi,

 I was wondering if there's any planned support for local disk columnar
 storage.

 This could be an extension of the in-memory columnar store, or possibly
 something similar to the recently add

Re: Support for local disk columnar storage for DataFrames

2015-11-12 Thread Cristian O
Sorry, apparently only replied to Reynold, meant to copy the list as well,
so I'm self replying and taking the opportunity to illustrate with an
example.

Basically I want to conceptually do this:

val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i
=> (i, 1)).toDF("k", "v")
val deltaDf = sqlContext.sparkContext.parallelize(Array(1,
5)).map(i => (i, 1)).toDF("k", "v")

bigDf.cache()

bigDf.registerTempTable("big")
deltaDf.registerTempTable("delta")

val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is
null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")

newBigDf.cache()
bigDf.unpersist()


This is essentially an update of keys "1" and "5" only, in a dataset of
1 million keys.

This can be achieved efficiently if the join would preserve the cached
blocks that have been unaffected, and only copy and mutate the 2 affected
blocks corresponding to the matching join keys.

Statistics can determine which blocks actually need mutating. Note also
that shuffling is not required assuming both dataframes are pre-partitioned
by the same key K.

In SQL this could actually be expressed as an UPDATE statement or for a
more generalized use as a MERGE UPDATE:
https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx

While this may seem like a very special case optimization, it would
effectively implement UPDATE support for cached DataFrames, for both
optimal and non-optimal usage.

I appreciate there's quite a lot here, so thank you for taking the time to
consider it.

Cristian



On 12 November 2015 at 15:49, Cristian O 
wrote:

> Hi Reynold,
>
> Thanks for your reply.
>
> Parquet may very well be used as the underlying implementation, but this
> is more than about a particular storage representation.
>
> There are a few things here that are inter-related and open different
> possibilities, so it's hard to structure, but I'll give it a try:
>
> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet,
> just using that as a checkpoint would currently require explicitly reading
> it back. A proper checkpoint implementation would just save (perhaps
> asynchronously) and prune the logical plan while allowing to continue using
> the same DF, now backed by the checkpoint.
>
> It's important to prune the logical plan to avoid all kinds of issues that
> may arise from unbounded expansion with iterative use-cases, like this one
> I encountered recently: https://issues.apache.org/jira/browse/SPARK-11596
>
> But really what I'm after here is:
>
> 2. Efficient updating of cached DataFrames - The main use case here is
> keeping a relatively large dataset cached and updating it iteratively from
> streaming. For example one would like to perform ad-hoc queries on an
> incrementally updated, cached DataFrame. I expect this is already becoming
> an increasingly common use case. Note that the dataset may require merging
> (like adding) or overrriding values by key, so simply appending is not
> sufficient.
>
> This is very similar in concept with updateStateByKey for regular RDDs,
> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
> level  (the row blocks for the columnar representation).
>
> This can be currently simulated with UNION or (OUTER) JOINs however is
> very inefficient as it requires copying and recaching the entire dataset,
> and unpersisting the original one. There are also the aforementioned
> problems with unbounded logical plans (physical plans are fine)
>
> These two together, checkpointing and updating cached DataFrames, would
> give fault-tolerant efficient updating of DataFrames, meaning streaming
> apps can take advantage of the compact columnar representation and Tungsten
> optimisations.
>
> I'm not quite sure if something like this can be achieved by other means
> or has been investigated before, hence why I'm looking for feedback here.
>
> While one could use external data stores, they would have the added IO
> penalty, plus most of what's available at the moment is either HDFS
> (extremely inefficient for updates) or key-value stores that have 5-10x
> space overhead over columnar formats.
>
> Thanks,
> Cristian
>
>
>
>
>
>
> On 12 November 2015 at 03:31, Reynold Xin  wrote:
>
>> Thanks for the email. Can you explain what the difference is between this
>> and existing formats such as Parquet/ORC?
>>
>>
>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>> cristian.b.op...@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I was wondering if there's any planned support for local disk columnar
>>> storage.
>>>
>>> This could be an extension of the in-memory columnar store, or possibly
>>> something similar to the recently added local checkpointing for RDDs
>>>
>>> This could also have the added benefit of enabling iterative usage for
>>> DataFrames by pruning the query plan through local checkpoints.
>>>
>>> A further enhancement would be to add update support to the columnar
>>> format (in the immutable copy-on-write sense o

Re: Support for local disk columnar storage for DataFrames

2015-11-12 Thread Andrew Duffy
Relevant link:
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files

On Wed, Nov 11, 2015 at 7:31 PM, Reynold Xin  wrote:

> Thanks for the email. Can you explain what the difference is between this
> and existing formats such as Parquet/ORC?
>
>
> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
> cristian.b.op...@googlemail.com> wrote:
>
>> Hi,
>>
>> I was wondering if there's any planned support for local disk columnar
>> storage.
>>
>> This could be an extension of the in-memory columnar store, or possibly
>> something similar to the recently added local checkpointing for RDDs
>>
>> This could also have the added benefit of enabling iterative usage for
>> DataFrames by pruning the query plan through local checkpoints.
>>
>> A further enhancement would be to add update support to the columnar
>> format (in the immutable copy-on-write sense of course), by maintaining
>> references to unchanged row blocks and only copying and mutating the ones
>> that have changed.
>>
>> A use case here is streaming and merging updates in a large dataset that
>> can be efficiently stored internally in a columnar format, rather than
>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>
>> Thanks,
>> Cristian
>>
>
>


Re: Support for local disk columnar storage for DataFrames

2015-11-11 Thread Reynold Xin
Thanks for the email. Can you explain what the difference is between this
and existing formats such as Parquet/ORC?


On Wed, Nov 11, 2015 at 4:59 AM, Cristian O  wrote:

> Hi,
>
> I was wondering if there's any planned support for local disk columnar
> storage.
>
> This could be an extension of the in-memory columnar store, or possibly
> something similar to the recently added local checkpointing for RDDs
>
> This could also have the added benefit of enabling iterative usage for
> DataFrames by pruning the query plan through local checkpoints.
>
> A further enhancement would be to add update support to the columnar
> format (in the immutable copy-on-write sense of course), by maintaining
> references to unchanged row blocks and only copying and mutating the ones
> that have changed.
>
> A use case here is streaming and merging updates in a large dataset that
> can be efficiently stored internally in a columnar format, rather than
> accessing a more inefficient external  data store like HDFS or Cassandra.
>
> Thanks,
> Cristian
>


Support for local disk columnar storage for DataFrames

2015-11-11 Thread Cristian O
Hi,

I was wondering if there's any planned support for local disk columnar
storage.

This could be an extension of the in-memory columnar store, or possibly
something similar to the recently added local checkpointing for RDDs

This could also have the added benefit of enabling iterative usage for
DataFrames by pruning the query plan through local checkpoints.

A further enhancement would be to add update support to the columnar format
(in the immutable copy-on-write sense of course), by maintaining references
to unchanged row blocks and only copying and mutating the ones that have
changed.

A use case here is streaming and merging updates in a large dataset that
can be efficiently stored internally in a columnar format, rather than
accessing a more inefficient external  data store like HDFS or Cassandra.

Thanks,
Cristian