Sorry, apparently only replied to Reynold, meant to copy the list as well,
so I'm self replying and taking the opportunity to illustrate with an
example.

Basically I want to conceptually do this:

val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i
=> (i, 1)).toDF("k", "v")
val deltaDf = sqlContext.sparkContext.parallelize(Array(1,
50000)).map(i => (i, 1)).toDF("k", "v")

bigDf.cache()

bigDf.registerTempTable("big")
deltaDf.registerTempTable("delta")

val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is
null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")

newBigDf.cache()
bigDf.unpersist()


This is essentially an update of keys "1" and "50000" only, in a dataset of
1 million keys.

This can be achieved efficiently if the join would preserve the cached
blocks that have been unaffected, and only copy and mutate the 2 affected
blocks corresponding to the matching join keys.

Statistics can determine which blocks actually need mutating. Note also
that shuffling is not required assuming both dataframes are pre-partitioned
by the same key K.

In SQL this could actually be expressed as an UPDATE statement or for a
more generalized use as a MERGE UPDATE:
https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx

While this may seem like a very special case optimization, it would
effectively implement UPDATE support for cached DataFrames, for both
optimal and non-optimal usage.

I appreciate there's quite a lot here, so thank you for taking the time to
consider it.

Cristian



On 12 November 2015 at 15:49, Cristian O <cristian.b.op...@googlemail.com>
wrote:

> Hi Reynold,
>
> Thanks for your reply.
>
> Parquet may very well be used as the underlying implementation, but this
> is more than about a particular storage representation.
>
> There are a few things here that are inter-related and open different
> possibilities, so it's hard to structure, but I'll give it a try:
>
> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet,
> just using that as a checkpoint would currently require explicitly reading
> it back. A proper checkpoint implementation would just save (perhaps
> asynchronously) and prune the logical plan while allowing to continue using
> the same DF, now backed by the checkpoint.
>
> It's important to prune the logical plan to avoid all kinds of issues that
> may arise from unbounded expansion with iterative use-cases, like this one
> I encountered recently: https://issues.apache.org/jira/browse/SPARK-11596
>
> But really what I'm after here is:
>
> 2. Efficient updating of cached DataFrames - The main use case here is
> keeping a relatively large dataset cached and updating it iteratively from
> streaming. For example one would like to perform ad-hoc queries on an
> incrementally updated, cached DataFrame. I expect this is already becoming
> an increasingly common use case. Note that the dataset may require merging
> (like adding) or overrriding values by key, so simply appending is not
> sufficient.
>
> This is very similar in concept with updateStateByKey for regular RDDs,
> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch
> level  (the row blocks for the columnar representation).
>
> This can be currently simulated with UNION or (OUTER) JOINs however is
> very inefficient as it requires copying and recaching the entire dataset,
> and unpersisting the original one. There are also the aforementioned
> problems with unbounded logical plans (physical plans are fine)
>
> These two together, checkpointing and updating cached DataFrames, would
> give fault-tolerant efficient updating of DataFrames, meaning streaming
> apps can take advantage of the compact columnar representation and Tungsten
> optimisations.
>
> I'm not quite sure if something like this can be achieved by other means
> or has been investigated before, hence why I'm looking for feedback here.
>
> While one could use external data stores, they would have the added IO
> penalty, plus most of what's available at the moment is either HDFS
> (extremely inefficient for updates) or key-value stores that have 5-10x
> space overhead over columnar formats.
>
> Thanks,
> Cristian
>
>
>
>
>
>
> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> wrote:
>
>> Thanks for the email. Can you explain what the difference is between this
>> and existing formats such as Parquet/ORC?
>>
>>
>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>> cristian.b.op...@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I was wondering if there's any planned support for local disk columnar
>>> storage.
>>>
>>> This could be an extension of the in-memory columnar store, or possibly
>>> something similar to the recently added local checkpointing for RDDs
>>>
>>> This could also have the added benefit of enabling iterative usage for
>>> DataFrames by pruning the query plan through local checkpoints.
>>>
>>> A further enhancement would be to add update support to the columnar
>>> format (in the immutable copy-on-write sense of course), by maintaining
>>> references to unchanged row blocks and only copying and mutating the ones
>>> that have changed.
>>>
>>> A use case here is streaming and merging updates in a large dataset that
>>> can be efficiently stored internally in a columnar format, rather than
>>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>>
>>> Thanks,
>>> Cristian
>>>
>>
>>
>

Reply via email to