Re: Support for local disk columnar storage for DataFrames
Raised this for checkpointing, hopefully it gets some priority as it's very useful and relatively straightforward to implement ? https://issues.apache.org/jira/browse/SPARK-11879 On 18 November 2015 at 16:31, Cristian O wrote: > Hi, > > While these OSS efforts are interesting, they're for now quite unproven. > Personally would be much more interested in seeing Spark incrementally > moving towards supporting updating DataFrames on various storage > substrates, and first of all locally, perhaps as an extension of cached > DataFrames. > > However before we get full blown update support, I would suggest two > enhancements that are fairly straightforward with the current design. If > they make sense please let me know and I'll add them as Jiras: > > 1. Checkpoint support for DataFrames - as mentioned this can be as simple > as saving to a parquet file or some other format, but would not require > re-reading the file to alter the lineage, and would also prune the logical > plan. Alternatively checkpointing a cached DataFrame can delegate to > checkpointing the underlying RDD but again needs to prune the logical plan. > > 2. Efficient transformation of cached DataFrames to cached DataFrames - an > efficient copy-on-write mechanism can be used to avoid unpacking > CachedBatches (row groups) into InternalRows when building a cached > DataFrame out of a source cached DataFrame through transformations (like an > outer join) that only affect a small subset of rows. Statistics and > partitioning information can be used to determine which row groups are > affected and which can be copied *by reference* unchanged. This would > effectively allow performing immutable updates of cached DataFrames in > scenarios like Streaming or other iterative use cases like ML. > > Thanks, > Cristian > > > > On 16 November 2015 at 08:30, Mark Hamstra > wrote: > >> FiloDB is also closely reated. https://github.com/tuplejump/FiloDB >> >> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath < >> nick.pentre...@gmail.com> wrote: >> >>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop >>> input/output format support: >>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java >>> >>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin >>> wrote: >>> This (updates) is something we are going to think about in the next release or two. On Thu, Nov 12, 2015 at 8:57 AM, Cristian O < cristian.b.op...@googlemail.com> wrote: > Sorry, apparently only replied to Reynold, meant to copy the list as > well, so I'm self replying and taking the opportunity to illustrate with > an > example. > > Basically I want to conceptually do this: > > val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => > (i, 1)).toDF("k", "v") > val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i > => (i, 1)).toDF("k", "v") > > bigDf.cache() > > bigDf.registerTempTable("big") > deltaDf.registerTempTable("delta") > > val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, > 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k") > > newBigDf.cache() > bigDf.unpersist() > > > This is essentially an update of keys "1" and "5" only, in a > dataset of 1 million keys. > > This can be achieved efficiently if the join would preserve the cached > blocks that have been unaffected, and only copy and mutate the 2 affected > blocks corresponding to the matching join keys. > > Statistics can determine which blocks actually need mutating. Note > also that shuffling is not required assuming both dataframes are > pre-partitioned by the same key K. > > In SQL this could actually be expressed as an UPDATE statement or for > a more generalized use as a MERGE UPDATE: > https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx > > While this may seem like a very special case optimization, it would > effectively implement UPDATE support for cached DataFrames, for both > optimal and non-optimal usage. > > I appreciate there's quite a lot here, so thank you for taking the > time to consider it. > > Cristian > > > > On 12 November 2015 at 15:49, Cristian O < > cristian.b.op...@googlemail.com> wrote: > >> Hi Reynold, >> >> Thanks for your reply. >> >> Parquet may very well be used as the underlying implementation, but >> this is more than about a particular storage representation. >> >> There are a few things here that are inter-related and open different >> possibilities, so it's hard to structure, but I'll give it a try: >> >> 1. Checkpointing DataFrames - while a DF can be saved locally as >> parquet, just using that as a checkpoint would currently require >> ex
Re: Support for local disk columnar storage for DataFrames
Hi, While these OSS efforts are interesting, they're for now quite unproven. Personally would be much more interested in seeing Spark incrementally moving towards supporting updating DataFrames on various storage substrates, and first of all locally, perhaps as an extension of cached DataFrames. However before we get full blown update support, I would suggest two enhancements that are fairly straightforward with the current design. If they make sense please let me know and I'll add them as Jiras: 1. Checkpoint support for DataFrames - as mentioned this can be as simple as saving to a parquet file or some other format, but would not require re-reading the file to alter the lineage, and would also prune the logical plan. Alternatively checkpointing a cached DataFrame can delegate to checkpointing the underlying RDD but again needs to prune the logical plan. 2. Efficient transformation of cached DataFrames to cached DataFrames - an efficient copy-on-write mechanism can be used to avoid unpacking CachedBatches (row groups) into InternalRows when building a cached DataFrame out of a source cached DataFrame through transformations (like an outer join) that only affect a small subset of rows. Statistics and partitioning information can be used to determine which row groups are affected and which can be copied *by reference* unchanged. This would effectively allow performing immutable updates of cached DataFrames in scenarios like Streaming or other iterative use cases like ML. Thanks, Cristian On 16 November 2015 at 08:30, Mark Hamstra wrote: > FiloDB is also closely reated. https://github.com/tuplejump/FiloDB > > On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath > wrote: > >> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop >> input/output format support: >> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java >> >> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin wrote: >> >>> This (updates) is something we are going to think about in the next >>> release or two. >>> >>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O < >>> cristian.b.op...@googlemail.com> wrote: >>> Sorry, apparently only replied to Reynold, meant to copy the list as well, so I'm self replying and taking the opportunity to illustrate with an example. Basically I want to conceptually do this: val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => (i, 1)).toDF("k", "v") val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => (i, 1)).toDF("k", "v") bigDf.cache() bigDf.registerTempTable("big") deltaDf.registerTempTable("delta") val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k") newBigDf.cache() bigDf.unpersist() This is essentially an update of keys "1" and "5" only, in a dataset of 1 million keys. This can be achieved efficiently if the join would preserve the cached blocks that have been unaffected, and only copy and mutate the 2 affected blocks corresponding to the matching join keys. Statistics can determine which blocks actually need mutating. Note also that shuffling is not required assuming both dataframes are pre-partitioned by the same key K. In SQL this could actually be expressed as an UPDATE statement or for a more generalized use as a MERGE UPDATE: https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx While this may seem like a very special case optimization, it would effectively implement UPDATE support for cached DataFrames, for both optimal and non-optimal usage. I appreciate there's quite a lot here, so thank you for taking the time to consider it. Cristian On 12 November 2015 at 15:49, Cristian O < cristian.b.op...@googlemail.com> wrote: > Hi Reynold, > > Thanks for your reply. > > Parquet may very well be used as the underlying implementation, but > this is more than about a particular storage representation. > > There are a few things here that are inter-related and open different > possibilities, so it's hard to structure, but I'll give it a try: > > 1. Checkpointing DataFrames - while a DF can be saved locally as > parquet, just using that as a checkpoint would currently require > explicitly > reading it back. A proper checkpoint implementation would just save > (perhaps asynchronously) and prune the logical plan while allowing to > continue using the same DF, now backed by the checkpoint. > > It's important to prune the logical plan to avoid all kinds of issues > that may arise from unbounded expansion with iterative use-cases, like > this > one I encounte
Re: Support for local disk columnar storage for DataFrames
FiloDB is also closely reated. https://github.com/tuplejump/FiloDB On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath wrote: > Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop > input/output format support: > https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java > > On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin wrote: > >> This (updates) is something we are going to think about in the next >> release or two. >> >> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O < >> cristian.b.op...@googlemail.com> wrote: >> >>> Sorry, apparently only replied to Reynold, meant to copy the list as >>> well, so I'm self replying and taking the opportunity to illustrate with an >>> example. >>> >>> Basically I want to conceptually do this: >>> >>> val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => >>> (i, 1)).toDF("k", "v") >>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => >>> (i, 1)).toDF("k", "v") >>> >>> bigDf.cache() >>> >>> bigDf.registerTempTable("big") >>> deltaDf.registerTempTable("delta") >>> >>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, >>> delta.v) FROM big LEFT JOIN delta on big.k = delta.k") >>> >>> newBigDf.cache() >>> bigDf.unpersist() >>> >>> >>> This is essentially an update of keys "1" and "5" only, in a dataset >>> of 1 million keys. >>> >>> This can be achieved efficiently if the join would preserve the cached >>> blocks that have been unaffected, and only copy and mutate the 2 affected >>> blocks corresponding to the matching join keys. >>> >>> Statistics can determine which blocks actually need mutating. Note also >>> that shuffling is not required assuming both dataframes are pre-partitioned >>> by the same key K. >>> >>> In SQL this could actually be expressed as an UPDATE statement or for a >>> more generalized use as a MERGE UPDATE: >>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx >>> >>> While this may seem like a very special case optimization, it would >>> effectively implement UPDATE support for cached DataFrames, for both >>> optimal and non-optimal usage. >>> >>> I appreciate there's quite a lot here, so thank you for taking the time >>> to consider it. >>> >>> Cristian >>> >>> >>> >>> On 12 November 2015 at 15:49, Cristian O < >>> cristian.b.op...@googlemail.com> wrote: >>> Hi Reynold, Thanks for your reply. Parquet may very well be used as the underlying implementation, but this is more than about a particular storage representation. There are a few things here that are inter-related and open different possibilities, so it's hard to structure, but I'll give it a try: 1. Checkpointing DataFrames - while a DF can be saved locally as parquet, just using that as a checkpoint would currently require explicitly reading it back. A proper checkpoint implementation would just save (perhaps asynchronously) and prune the logical plan while allowing to continue using the same DF, now backed by the checkpoint. It's important to prune the logical plan to avoid all kinds of issues that may arise from unbounded expansion with iterative use-cases, like this one I encountered recently: https://issues.apache.org/jira/browse/SPARK-11596 But really what I'm after here is: 2. Efficient updating of cached DataFrames - The main use case here is keeping a relatively large dataset cached and updating it iteratively from streaming. For example one would like to perform ad-hoc queries on an incrementally updated, cached DataFrame. I expect this is already becoming an increasingly common use case. Note that the dataset may require merging (like adding) or overrriding values by key, so simply appending is not sufficient. This is very similar in concept with updateStateByKey for regular RDDs, i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch level (the row blocks for the columnar representation). This can be currently simulated with UNION or (OUTER) JOINs however is very inefficient as it requires copying and recaching the entire dataset, and unpersisting the original one. There are also the aforementioned problems with unbounded logical plans (physical plans are fine) These two together, checkpointing and updating cached DataFrames, would give fault-tolerant efficient updating of DataFrames, meaning streaming apps can take advantage of the compact columnar representation and Tungsten optimisations. I'm not quite sure if something like this can be achieved by other means or has been investigated before, hence why I'm looking for feedback here. While one could use external data stores, they would have the added IO penalty, plus most of what's availab
Re: Support for local disk columnar storage for DataFrames
Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop input/output format support: https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin wrote: > This (updates) is something we are going to think about in the next > release or two. > > On Thu, Nov 12, 2015 at 8:57 AM, Cristian O < > cristian.b.op...@googlemail.com> wrote: > >> Sorry, apparently only replied to Reynold, meant to copy the list as >> well, so I'm self replying and taking the opportunity to illustrate with an >> example. >> >> Basically I want to conceptually do this: >> >> val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => (i, >> 1)).toDF("k", "v") >> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => >> (i, 1)).toDF("k", "v") >> >> bigDf.cache() >> >> bigDf.registerTempTable("big") >> deltaDf.registerTempTable("delta") >> >> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, >> delta.v) FROM big LEFT JOIN delta on big.k = delta.k") >> >> newBigDf.cache() >> bigDf.unpersist() >> >> >> This is essentially an update of keys "1" and "5" only, in a dataset >> of 1 million keys. >> >> This can be achieved efficiently if the join would preserve the cached >> blocks that have been unaffected, and only copy and mutate the 2 affected >> blocks corresponding to the matching join keys. >> >> Statistics can determine which blocks actually need mutating. Note also >> that shuffling is not required assuming both dataframes are pre-partitioned >> by the same key K. >> >> In SQL this could actually be expressed as an UPDATE statement or for a >> more generalized use as a MERGE UPDATE: >> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx >> >> While this may seem like a very special case optimization, it would >> effectively implement UPDATE support for cached DataFrames, for both >> optimal and non-optimal usage. >> >> I appreciate there's quite a lot here, so thank you for taking the time >> to consider it. >> >> Cristian >> >> >> >> On 12 November 2015 at 15:49, Cristian O > > wrote: >> >>> Hi Reynold, >>> >>> Thanks for your reply. >>> >>> Parquet may very well be used as the underlying implementation, but this >>> is more than about a particular storage representation. >>> >>> There are a few things here that are inter-related and open different >>> possibilities, so it's hard to structure, but I'll give it a try: >>> >>> 1. Checkpointing DataFrames - while a DF can be saved locally as >>> parquet, just using that as a checkpoint would currently require explicitly >>> reading it back. A proper checkpoint implementation would just save >>> (perhaps asynchronously) and prune the logical plan while allowing to >>> continue using the same DF, now backed by the checkpoint. >>> >>> It's important to prune the logical plan to avoid all kinds of issues >>> that may arise from unbounded expansion with iterative use-cases, like this >>> one I encountered recently: >>> https://issues.apache.org/jira/browse/SPARK-11596 >>> >>> But really what I'm after here is: >>> >>> 2. Efficient updating of cached DataFrames - The main use case here is >>> keeping a relatively large dataset cached and updating it iteratively from >>> streaming. For example one would like to perform ad-hoc queries on an >>> incrementally updated, cached DataFrame. I expect this is already becoming >>> an increasingly common use case. Note that the dataset may require merging >>> (like adding) or overrriding values by key, so simply appending is not >>> sufficient. >>> >>> This is very similar in concept with updateStateByKey for regular RDDs, >>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch >>> level (the row blocks for the columnar representation). >>> >>> This can be currently simulated with UNION or (OUTER) JOINs however is >>> very inefficient as it requires copying and recaching the entire dataset, >>> and unpersisting the original one. There are also the aforementioned >>> problems with unbounded logical plans (physical plans are fine) >>> >>> These two together, checkpointing and updating cached DataFrames, would >>> give fault-tolerant efficient updating of DataFrames, meaning streaming >>> apps can take advantage of the compact columnar representation and Tungsten >>> optimisations. >>> >>> I'm not quite sure if something like this can be achieved by other means >>> or has been investigated before, hence why I'm looking for feedback here. >>> >>> While one could use external data stores, they would have the added IO >>> penalty, plus most of what's available at the moment is either HDFS >>> (extremely inefficient for updates) or key-value stores that have 5-10x >>> space overhead over columnar formats. >>> >>> Thanks, >>> Cristian >>> >>> >>> >>> >>> >>> >>> On 12 November 2015 at 03:31, Reynold Xin wrote: >>> Thanks for the ema
Re: Support for local disk columnar storage for DataFrames
This (updates) is something we are going to think about in the next release or two. On Thu, Nov 12, 2015 at 8:57 AM, Cristian O wrote: > Sorry, apparently only replied to Reynold, meant to copy the list as well, > so I'm self replying and taking the opportunity to illustrate with an > example. > > Basically I want to conceptually do this: > > val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => (i, > 1)).toDF("k", "v") > val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => > (i, 1)).toDF("k", "v") > > bigDf.cache() > > bigDf.registerTempTable("big") > deltaDf.registerTempTable("delta") > > val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, > delta.v) FROM big LEFT JOIN delta on big.k = delta.k") > > newBigDf.cache() > bigDf.unpersist() > > > This is essentially an update of keys "1" and "5" only, in a dataset > of 1 million keys. > > This can be achieved efficiently if the join would preserve the cached > blocks that have been unaffected, and only copy and mutate the 2 affected > blocks corresponding to the matching join keys. > > Statistics can determine which blocks actually need mutating. Note also > that shuffling is not required assuming both dataframes are pre-partitioned > by the same key K. > > In SQL this could actually be expressed as an UPDATE statement or for a > more generalized use as a MERGE UPDATE: > https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx > > While this may seem like a very special case optimization, it would > effectively implement UPDATE support for cached DataFrames, for both > optimal and non-optimal usage. > > I appreciate there's quite a lot here, so thank you for taking the time to > consider it. > > Cristian > > > > On 12 November 2015 at 15:49, Cristian O > wrote: > >> Hi Reynold, >> >> Thanks for your reply. >> >> Parquet may very well be used as the underlying implementation, but this >> is more than about a particular storage representation. >> >> There are a few things here that are inter-related and open different >> possibilities, so it's hard to structure, but I'll give it a try: >> >> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet, >> just using that as a checkpoint would currently require explicitly reading >> it back. A proper checkpoint implementation would just save (perhaps >> asynchronously) and prune the logical plan while allowing to continue using >> the same DF, now backed by the checkpoint. >> >> It's important to prune the logical plan to avoid all kinds of issues >> that may arise from unbounded expansion with iterative use-cases, like this >> one I encountered recently: >> https://issues.apache.org/jira/browse/SPARK-11596 >> >> But really what I'm after here is: >> >> 2. Efficient updating of cached DataFrames - The main use case here is >> keeping a relatively large dataset cached and updating it iteratively from >> streaming. For example one would like to perform ad-hoc queries on an >> incrementally updated, cached DataFrame. I expect this is already becoming >> an increasingly common use case. Note that the dataset may require merging >> (like adding) or overrriding values by key, so simply appending is not >> sufficient. >> >> This is very similar in concept with updateStateByKey for regular RDDs, >> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch >> level (the row blocks for the columnar representation). >> >> This can be currently simulated with UNION or (OUTER) JOINs however is >> very inefficient as it requires copying and recaching the entire dataset, >> and unpersisting the original one. There are also the aforementioned >> problems with unbounded logical plans (physical plans are fine) >> >> These two together, checkpointing and updating cached DataFrames, would >> give fault-tolerant efficient updating of DataFrames, meaning streaming >> apps can take advantage of the compact columnar representation and Tungsten >> optimisations. >> >> I'm not quite sure if something like this can be achieved by other means >> or has been investigated before, hence why I'm looking for feedback here. >> >> While one could use external data stores, they would have the added IO >> penalty, plus most of what's available at the moment is either HDFS >> (extremely inefficient for updates) or key-value stores that have 5-10x >> space overhead over columnar formats. >> >> Thanks, >> Cristian >> >> >> >> >> >> >> On 12 November 2015 at 03:31, Reynold Xin wrote: >> >>> Thanks for the email. Can you explain what the difference is between >>> this and existing formats such as Parquet/ORC? >>> >>> >>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < >>> cristian.b.op...@googlemail.com> wrote: >>> Hi, I was wondering if there's any planned support for local disk columnar storage. This could be an extension of the in-memory columnar store, or possibly something similar to the recently add
Re: Support for local disk columnar storage for DataFrames
Sorry, apparently only replied to Reynold, meant to copy the list as well, so I'm self replying and taking the opportunity to illustrate with an example. Basically I want to conceptually do this: val bigDf = sqlContext.sparkContext.parallelize((1 to 100)).map(i => (i, 1)).toDF("k", "v") val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 5)).map(i => (i, 1)).toDF("k", "v") bigDf.cache() bigDf.registerTempTable("big") deltaDf.registerTempTable("delta") val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k") newBigDf.cache() bigDf.unpersist() This is essentially an update of keys "1" and "5" only, in a dataset of 1 million keys. This can be achieved efficiently if the join would preserve the cached blocks that have been unaffected, and only copy and mutate the 2 affected blocks corresponding to the matching join keys. Statistics can determine which blocks actually need mutating. Note also that shuffling is not required assuming both dataframes are pre-partitioned by the same key K. In SQL this could actually be expressed as an UPDATE statement or for a more generalized use as a MERGE UPDATE: https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx While this may seem like a very special case optimization, it would effectively implement UPDATE support for cached DataFrames, for both optimal and non-optimal usage. I appreciate there's quite a lot here, so thank you for taking the time to consider it. Cristian On 12 November 2015 at 15:49, Cristian O wrote: > Hi Reynold, > > Thanks for your reply. > > Parquet may very well be used as the underlying implementation, but this > is more than about a particular storage representation. > > There are a few things here that are inter-related and open different > possibilities, so it's hard to structure, but I'll give it a try: > > 1. Checkpointing DataFrames - while a DF can be saved locally as parquet, > just using that as a checkpoint would currently require explicitly reading > it back. A proper checkpoint implementation would just save (perhaps > asynchronously) and prune the logical plan while allowing to continue using > the same DF, now backed by the checkpoint. > > It's important to prune the logical plan to avoid all kinds of issues that > may arise from unbounded expansion with iterative use-cases, like this one > I encountered recently: https://issues.apache.org/jira/browse/SPARK-11596 > > But really what I'm after here is: > > 2. Efficient updating of cached DataFrames - The main use case here is > keeping a relatively large dataset cached and updating it iteratively from > streaming. For example one would like to perform ad-hoc queries on an > incrementally updated, cached DataFrame. I expect this is already becoming > an increasingly common use case. Note that the dataset may require merging > (like adding) or overrriding values by key, so simply appending is not > sufficient. > > This is very similar in concept with updateStateByKey for regular RDDs, > i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch > level (the row blocks for the columnar representation). > > This can be currently simulated with UNION or (OUTER) JOINs however is > very inefficient as it requires copying and recaching the entire dataset, > and unpersisting the original one. There are also the aforementioned > problems with unbounded logical plans (physical plans are fine) > > These two together, checkpointing and updating cached DataFrames, would > give fault-tolerant efficient updating of DataFrames, meaning streaming > apps can take advantage of the compact columnar representation and Tungsten > optimisations. > > I'm not quite sure if something like this can be achieved by other means > or has been investigated before, hence why I'm looking for feedback here. > > While one could use external data stores, they would have the added IO > penalty, plus most of what's available at the moment is either HDFS > (extremely inefficient for updates) or key-value stores that have 5-10x > space overhead over columnar formats. > > Thanks, > Cristian > > > > > > > On 12 November 2015 at 03:31, Reynold Xin wrote: > >> Thanks for the email. Can you explain what the difference is between this >> and existing formats such as Parquet/ORC? >> >> >> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < >> cristian.b.op...@googlemail.com> wrote: >> >>> Hi, >>> >>> I was wondering if there's any planned support for local disk columnar >>> storage. >>> >>> This could be an extension of the in-memory columnar store, or possibly >>> something similar to the recently added local checkpointing for RDDs >>> >>> This could also have the added benefit of enabling iterative usage for >>> DataFrames by pruning the query plan through local checkpoints. >>> >>> A further enhancement would be to add update support to the columnar >>> format (in the immutable copy-on-write sense o
Re: Support for local disk columnar storage for DataFrames
Relevant link: http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files On Wed, Nov 11, 2015 at 7:31 PM, Reynold Xin wrote: > Thanks for the email. Can you explain what the difference is between this > and existing formats such as Parquet/ORC? > > > On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < > cristian.b.op...@googlemail.com> wrote: > >> Hi, >> >> I was wondering if there's any planned support for local disk columnar >> storage. >> >> This could be an extension of the in-memory columnar store, or possibly >> something similar to the recently added local checkpointing for RDDs >> >> This could also have the added benefit of enabling iterative usage for >> DataFrames by pruning the query plan through local checkpoints. >> >> A further enhancement would be to add update support to the columnar >> format (in the immutable copy-on-write sense of course), by maintaining >> references to unchanged row blocks and only copying and mutating the ones >> that have changed. >> >> A use case here is streaming and merging updates in a large dataset that >> can be efficiently stored internally in a columnar format, rather than >> accessing a more inefficient external data store like HDFS or Cassandra. >> >> Thanks, >> Cristian >> > >
Re: Support for local disk columnar storage for DataFrames
Thanks for the email. Can you explain what the difference is between this and existing formats such as Parquet/ORC? On Wed, Nov 11, 2015 at 4:59 AM, Cristian O wrote: > Hi, > > I was wondering if there's any planned support for local disk columnar > storage. > > This could be an extension of the in-memory columnar store, or possibly > something similar to the recently added local checkpointing for RDDs > > This could also have the added benefit of enabling iterative usage for > DataFrames by pruning the query plan through local checkpoints. > > A further enhancement would be to add update support to the columnar > format (in the immutable copy-on-write sense of course), by maintaining > references to unchanged row blocks and only copying and mutating the ones > that have changed. > > A use case here is streaming and merging updates in a large dataset that > can be efficiently stored internally in a columnar format, rather than > accessing a more inefficient external data store like HDFS or Cassandra. > > Thanks, > Cristian >
Support for local disk columnar storage for DataFrames
Hi, I was wondering if there's any planned support for local disk columnar storage. This could be an extension of the in-memory columnar store, or possibly something similar to the recently added local checkpointing for RDDs This could also have the added benefit of enabling iterative usage for DataFrames by pruning the query plan through local checkpoints. A further enhancement would be to add update support to the columnar format (in the immutable copy-on-write sense of course), by maintaining references to unchanged row blocks and only copying and mutating the ones that have changed. A use case here is streaming and merging updates in a large dataset that can be efficiently stored internally in a columnar format, rather than accessing a more inefficient external data store like HDFS or Cassandra. Thanks, Cristian