Repository: spark
Updated Branches:
  refs/heads/master e83f13e8d -> 25bef7e69


[SQL] More aggressive defaults

 - Turns on compression for in-memory cached data by default
 - Changes the default parquet compression format back to gzip (we have seen 
more OOMs with production workloads due to the way Snappy allocates memory)
 - Ups the batch size to 10,000 rows
 - Increases the broadcast threshold to 10mb.
 - Uses our parquet implementation instead of the hive one by default.
 - Cache parquet metadata by default.

Author: Michael Armbrust <mich...@databricks.com>

Closes #3064 from marmbrus/fasterDefaults and squashes the following commits:

97ee9f8 [Michael Armbrust] parquet codec docs
e641694 [Michael Armbrust] Remote also
a12866a [Michael Armbrust] Cache metadata.
2d73acc [Michael Armbrust] Update docs defaults.
d63d2d5 [Michael Armbrust] document parquet option
da373f9 [Michael Armbrust] More aggressive defaults


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25bef7e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25bef7e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25bef7e6

Branch: refs/heads/master
Commit: 25bef7e6951301e93004567fc0cef96bf8d1a224
Parents: e83f13e
Author: Michael Armbrust <mich...@databricks.com>
Authored: Mon Nov 3 14:08:27 2014 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Nov 3 14:08:27 2014 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                     | 18 +++++++++++++-----
 .../main/scala/org/apache/spark/sql/SQLConf.scala | 10 +++++-----
 .../sql/parquet/ParquetTableOperations.scala      |  6 +++---
 .../org/apache/spark/sql/hive/HiveContext.scala   |  2 +-
 4 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d4ade93..e399fec 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -582,19 +582,27 @@ Configuration of Parquet can be done using the `setConf` 
method on SQLContext or
 </tr>
 <tr>
   <td><code>spark.sql.parquet.cacheMetadata</code></td>
-  <td>false</td>
+  <td>true</td>
   <td>
     Turns on caching of Parquet schema metadata.  Can speed up querying of 
static data.
   </td>
 </tr>
 <tr>
   <td><code>spark.sql.parquet.compression.codec</code></td>
-  <td>snappy</td>
+  <td>gzip</td>
   <td>
     Sets the compression codec use when writing Parquet files. Acceptable 
values include: 
     uncompressed, snappy, gzip, lzo.
   </td>
 </tr>
+<tr>
+  <td><code>spark.sql.hive.convertMetastoreParquet</code></td>
+  <td>true</td>
+  <td>
+    When set to false, Spark SQL will use the Hive SerDe for parquet tables 
instead of the built in
+    support.
+  </td>
+</tr>
 </table>
 
 ## JSON Datasets
@@ -815,7 +823,7 @@ Configuration of in-memory caching can be done using the 
`setConf` method on SQL
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
   <td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
-  <td>false</td>
+  <td>true</td>
   <td>
     When set to true Spark SQL will automatically select a compression codec 
for each column based
     on statistics of the data.
@@ -823,7 +831,7 @@ Configuration of in-memory caching can be done using the 
`setConf` method on SQL
 </tr>
 <tr>
   <td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td>
-  <td>1000</td>
+  <td>10000</td>
   <td>
     Controls the size of batches for columnar caching.  Larger batch sizes can 
improve memory utilization
     and compression, but risk OOMs when caching data.
@@ -841,7 +849,7 @@ that these options will be deprecated in future release as 
more optimizations ar
   <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
   <tr>
     <td><code>spark.sql.autoBroadcastJoinThreshold</code></td>
-    <td>10000</td>
+    <td>10485760 (10 MB)</td>
     <td>
       Configures the maximum size in bytes for a table that will be broadcast 
to all worker nodes when
       performing a join.  By setting this value to -1 broadcasting can be 
disabled.  Note that currently

http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 07e6e2e..279495a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -79,13 +79,13 @@ private[sql] trait SQLConf {
   private[spark] def dialect: String = getConf(DIALECT, "sql")
 
   /** When true tables cached using the in-memory columnar caching will be 
compressed. */
-  private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, 
"false").toBoolean
+  private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, 
"true").toBoolean
 
   /** The compression codec for writing to a Parquetfile */
-  private[spark] def parquetCompressionCodec: String = 
getConf(PARQUET_COMPRESSION, "snappy")
+  private[spark] def parquetCompressionCodec: String = 
getConf(PARQUET_COMPRESSION, "gzip")
 
   /** The number of rows that will be  */
-  private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, 
"1000").toInt
+  private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, 
"10000").toInt
 
   /** Number of partitions to use for shuffle operators. */
   private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, 
"200").toInt
@@ -106,10 +106,10 @@ private[sql] trait SQLConf {
    * a broadcast value during the physical executions of join operations.  
Setting this to -1
    * effectively disables auto conversion.
    *
-   * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose 
default value is also 10000.
+   * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose 
default value is 10000.
    */
   private[spark] def autoBroadcastJoinThreshold: Int =
-    getConf(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
+    getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt
 
   /**
    * The default size in bytes to assign to a logical operator's estimation 
statistics.  By default,

http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 9664c56..d00860a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -123,7 +123,7 @@ case class ParquetTableScan(
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
     conf.set(
       SQLConf.PARQUET_CACHE_METADATA,
-      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
+      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
 
     val baseRDD =
       new org.apache.spark.rdd.NewHadoopRDD(
@@ -394,7 +394,7 @@ private[parquet] class FilteringParquetRowInputFormat
 
     if (footers eq null) {
       val conf = ContextUtil.getConfiguration(jobContext)
-      val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, 
false)
+      val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
       val statuses = listStatus(jobContext)
       fileStatuses = statuses.map(file => file.getPath -> file).toMap
       if (statuses.isEmpty) {
@@ -493,7 +493,7 @@ private[parquet] class FilteringParquetRowInputFormat
     import parquet.filter2.compat.FilterCompat.Filter;
     import parquet.filter2.compat.RowGroupFilter;
    
-    val cacheMetadata = 
configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
+    val cacheMetadata = 
configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
 
     val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
     val filter: Filter = ParquetInputFormat.getFilter(configuration)

http://git-wip-us.apache.org/repos/asf/spark/blob/25bef7e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index f025169..e88afaa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -90,7 +90,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    * SerDe.
    */
   private[spark] def convertMetastoreParquet: Boolean =
-    getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
+    getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
 
   override protected[sql] def executePlan(plan: LogicalPlan): 
this.QueryExecution =
     new this.QueryExecution { val logical = plan }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to