[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user liancheng closed the pull request at: https://github.com/apache/spark/pull/829 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user kanzhang commented on the pull request: https://github.com/apache/spark/pull/829#issuecomment-44921900 @marmbrus you are probably right. I didn't realize ```cachedColumnBuffers``` are themself RDDs and was thinking in the wrong direction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/829#issuecomment-44898780 I don't think there are any advantages to the compressed row format. It is incredibly inefficient compared to the columnar compression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user kanzhang commented on the pull request: https://github.com/apache/spark/pull/829#issuecomment-44786769 How about we keep the row format behavior and add columnar format behavior separately? For example, ```SchemaRDD.cache()``` and ```unpersist()``` stay as is and they operate on the row format. Independently, we add SchemaRDD.registerAsTable(tableName) and SchemaRDD.cacheAsTable(tableName), which operates on the columnar format. The latter will automatically register it as table and cache the table as ```InMemoryColumnarTableScan```. It can happen that a SchemaRDD is cached in both row and columnar formats, if the user so chooses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/829#issuecomment-43569849 Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15081/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/829#issuecomment-43569844 Merged build finished. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/829#issuecomment-43562341 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/829#issuecomment-43562334 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1669 & SPARK-1379][SQL][WIP] Made Schem...
GitHub user liancheng opened a pull request: https://github.com/apache/spark/pull/829 [SPARK-1669 & SPARK-1379][SQL][WIP] Made SchemaRDD.cache() to leverage InMemoryColumnarTableScan Corresponding JIRA issues: [SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669), [SPARK-1379](https://issues.apache.org/jira/browse/SPARK-1379) (To @rxin @marmbrus: this change seems to bring some subtleties, and makes me doubt whether it's a good idea to make `SchemaRDD.cache()` to leverage in-memory columnar storage. See the "Open issues" section below. Maybe we should simply throw an exception in `SchemaRDD.cache()` to inform people use `cacheTable` instead. Or maybe there are simpler and more intuitive ways that I failed to discover.) In this PR, we overrode cache related RDD operations in `SchemaRDD` to make `.cache()` to leverage `InMemoryColumnarTableScan`, and made table caching operation idempotent (calling `cacheTable()` multiple times gives the same result). ### Basic ideas When calling `.cache()` or `.persist()` on a `SchemaRDD`, an `InMemoryColumnarTableScan` is constructed, and the parent RDD of the `SchemaRDD` is replaced with the cached columns RDD returned by `InMemoryColumnarTableScan.execute()`. Notice that in this way, from the viewpoint of Spark runtime, the RDD being cached is the cached columns RDD rather than the `SchemaRDD` itself. ### Updated APIs - `SchemaRDD.persist(newLevel: StorageLevel)` Builds in-memory columnar cache and replace the parent RDD if `newLevel.useMemory` is true, otherwise, fallback to the normal caching mechanism. - `SchemaRDD.unpersist(blocking: Boolea)` Unpersist the in-memory columnar cache and restore the parent RDD if necessary. - `SchemaRDD.logicalPlan` Returns the in-memory columnar version of logical plan if the `SchemaRDD` is cached in memory. - `SQLContext.cacheTable(tableName: String)` Forwards to `SQLContext.persistTable(tableName: String, newLevel: StorageLevel)` and leverages in-memory columnar storage if necessary. - `SQLContext.uncacheTable(tableName: String)` Forwards to `SQLContext.unpersistTable(tableName: String, blocking: Boolean)`, and re-registers the uncached table if it's backed with an existing RDD. ### New APIs introduced - `SQLContext.persistTable(tableName: String, newLevel: StorageLevel)` Equivalents to `table(tableName).persist(newLevel).registerAsTable(tableName)`. - `SQLContext.unpersistTable(blocking: Boolean)` Similar to `table(tableName).unpersist(blocking)`, despite that it re-registers uncached table if necessary. ### Open issues Now we can cache / uncache a table in two styles. In the plain old way: ```scala val schemaRdd = ... schemaRdd.registerAsTable("t") // Caching cacheTable("t") // Uncaching uncacheTable("t") ``` In this way, the name of the cached table is guaranteed to be the same as the original table. The new, more RDD-style way: ```scala val schemaRdd = ... schemaRdd.registerAsTable("t") // Caching schemaRdd.cache().registerAsTable("t_cached") // Uncaching (plan A: OK) schemaRdd.uncache() // Uncaching (plan B: OK? A little complicated...) table("t_cached").uncache() // Uncaching (plan C: probably WRONG) uncacheTable("t_cached") ``` A major difference here is that in the new way, the name of the cached table is not necessary the same as the original one. So plan C is not OK because `uncacheTable` tries to re-register the original RDD with the wrong table name, and there ends up two uncached tables named `t` and `t_cached` respectively. In plan B, `table("t_cached")` returns a new RDD, although the memory occupied by the columnar storage is released, `schemaRdd` is left untouched and will be cached again in subsequent jobs even if no `.cache()` is not called. Another confusion is: ```scala val productRdd: RDD[SomeCaseClass] = ... productRdd.cache().registerAsTable("t") ``` Although looks very similar to the second way of caching a `SchemaRDD` table, in this case, no table is cached, since `productRdd` is not a `SchemaRDD`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liancheng/spark schemaRddCache Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/829.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #829 commit fe4ba9eb4c089ac20235af7e9980b94691c1fc0a Author: Cheng Lian Date: 2014-05-16T13:01:35Z SchemaRDD.cache() now leverages InMemoryColumnarTableScan commit 919966