GitHub user liancheng opened a pull request:

    https://github.com/apache/spark/pull/829

    [SPARK-1669 & SPARK-1379][SQL][WIP] Made SchemaRDD.cache() to leverage 
InMemoryColumnarTableScan

    Corresponding JIRA issues: 
[SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669), 
[SPARK-1379](https://issues.apache.org/jira/browse/SPARK-1379)
    
    (To @rxin @marmbrus: this change seems to bring some subtleties, and makes 
me doubt whether it's a good idea to make `SchemaRDD.cache()` to leverage 
in-memory columnar storage. See the "Open issues" section below. Maybe we 
should simply throw an exception in `SchemaRDD.cache()` to inform people use 
`cacheTable` instead. Or maybe there are simpler and more intuitive ways that I 
failed to discover.)
    
    In this PR, we overrode cache related RDD operations in `SchemaRDD` to make 
`.cache()` to leverage `InMemoryColumnarTableScan`, and made table caching 
operation idempotent (calling `cacheTable()` multiple times gives the same 
result).
    
    ### Basic ideas
    
    When calling `.cache()` or `.persist()` on a `SchemaRDD`, an 
`InMemoryColumnarTableScan` is constructed, and the parent RDD of the 
`SchemaRDD` is replaced with the cached columns RDD returned by 
`InMemoryColumnarTableScan.execute()`.
    
    Notice that in this way, from the viewpoint of Spark runtime, the RDD being 
cached is the cached columns RDD rather than the `SchemaRDD` itself.
    
    ### Updated APIs
    
    - `SchemaRDD.persist(newLevel: StorageLevel)`
    
      Builds in-memory columnar cache and replace the parent RDD if 
`newLevel.useMemory` is true, otherwise, fallback to the normal caching 
mechanism.
    
    - `SchemaRDD.unpersist(blocking: Boolea)`
    
      Unpersist the in-memory columnar cache and restore the parent RDD if 
necessary.
    
    - `SchemaRDD.logicalPlan`
    
      Returns the in-memory columnar version of logical plan if the `SchemaRDD` 
is cached in memory.
    
    - `SQLContext.cacheTable(tableName: String)`
    
      Forwards to `SQLContext.persistTable(tableName: String, newLevel: 
StorageLevel)` and leverages in-memory columnar storage if necessary.
    
    - `SQLContext.uncacheTable(tableName: String)`
    
      Forwards to `SQLContext.unpersistTable(tableName: String, blocking: 
Boolean)`, and re-registers the uncached table if it's backed with an existing 
RDD.
    
    ### New APIs introduced
    
    - `SQLContext.persistTable(tableName: String, newLevel: StorageLevel)`
    
      Equivalents to 
`table(tableName).persist(newLevel).registerAsTable(tableName)`.
    
    - `SQLContext.unpersistTable(blocking: Boolean)`
    
      Similar to `table(tableName).unpersist(blocking)`, despite that it 
re-registers uncached table if necessary.
    
    ### Open issues
    
    Now we can cache / uncache a table in two styles. In the plain old way:
    
    ```scala
    val schemaRdd = ...
    schemaRdd.registerAsTable("t")
    // Caching
    cacheTable("t")
    // Uncaching
    uncacheTable("t")
    ```
    
    In this way, the name of the cached table is guaranteed to be the same as 
the original table.
    
    The new, more RDD-style way:
    
    ```scala
    val schemaRdd = ...
    schemaRdd.registerAsTable("t")
    // Caching
    schemaRdd.cache().registerAsTable("t_cached")
    // Uncaching (plan A: OK)
    schemaRdd.uncache()
    // Uncaching (plan B: OK? A little complicated...)
    table("t_cached").uncache()
    // Uncaching (plan C: probably WRONG)
    uncacheTable("t_cached")
    ```
    
    A major difference here is that in the new way, the name of the cached 
table is not necessary the same as the original one. So plan C is not OK 
because `uncacheTable` tries to re-register the original RDD with the wrong 
table name, and there ends up two uncached tables named `t` and `t_cached` 
respectively.
    
    In plan B, `table("t_cached")` returns a new RDD, although the memory 
occupied by the columnar storage is released, `schemaRdd` is left untouched and 
will be cached again in subsequent jobs even if no `.cache()` is not called.
    
    Another confusion is:
    
    ```scala
    val productRdd: RDD[SomeCaseClass] = ...
    productRdd.cache().registerAsTable("t")
    ```
    
    Although looks very similar to the second way of caching a `SchemaRDD` 
table, in this case, no table is cached, since `productRdd` is not a 
`SchemaRDD`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liancheng/spark schemaRddCache

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/829.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #829
    
----
commit fe4ba9eb4c089ac20235af7e9980b94691c1fc0a
Author: Cheng Lian <lian.cs....@gmail.com>
Date:   2014-05-16T13:01:35Z

    SchemaRDD.cache() now leverages InMemoryColumnarTableScan

commit 919966769bed545c8efba25c66b4e52762b61417
Author: Cheng Lian <lian.cs....@gmail.com>
Date:   2014-05-19T02:06:00Z

    SchemaRDD.cache() now leverages InMemoryColumnarTableScan

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to