Repository: spark
Updated Branches:
  refs/heads/master 7fb5ae502 -> 111d6b9b8


[SPARK-8139] [SQL] Updates docs and comments of data sources and Parquet output 
committer options

This PR only applies to master branch (1.5.0-SNAPSHOT) since it references 
`org.apache.parquet` classes which only appear in Parquet 1.7.0.

Author: Cheng Lian <l...@databricks.com>

Closes #6683 from liancheng/output-committer-docs and squashes the following 
commits:

b4648b8 [Cheng Lian] Removes spark.sql.sources.outputCommitterClass as it's not 
a public option
ee63923 [Cheng Lian] Updates docs and comments of data sources and Parquet 
output committer options


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/111d6b9b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/111d6b9b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/111d6b9b

Branch: refs/heads/master
Commit: 111d6b9b8a584b962b6ae80c7aa8c45845ce0099
Parents: 7fb5ae5
Author: Cheng Lian <l...@databricks.com>
Authored: Tue Jun 23 17:24:26 2015 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Jun 23 17:24:26 2015 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   | 30 ++++++++++++++++-
 .../scala/org/apache/spark/sql/SQLConf.scala    | 30 +++++++++++++----
 .../parquet/DirectParquetOutputCommitter.scala  | 34 ++++++++++++++------
 .../apache/spark/sql/parquet/newParquet.scala   |  4 +--
 4 files changed, 78 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/111d6b9b/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 9107c9b..2786e3d 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1348,6 +1348,34 @@ Configuration of Parquet can be done using the `setConf` 
method on `SQLContext`
     support.
   </td>
 </tr>
+<tr>
+  <td><code>spark.sql.parquet.output.committer.class</code></td>
+  <td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
+  <td>
+    <p>
+      The output committer class used by Parquet. The specified class needs to 
be a subclass of
+      <code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>.  
Typically, it's also a
+      subclass of 
<code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
+    </p>
+    <p>
+      <b>Note:</b>
+      <ul>
+        <li>
+          This option must be set via Hadoop <code>Configuration</code> rather 
than Spark
+          <code>SQLConf</code>.
+        </li>
+        <li>
+          This option overrides <code>spark.sql.sources.<br 
/>outputCommitterClass</code>.
+        </li>
+      </ul>
+    </p>
+    <p>
+      Spark SQL comes with a builtin
+      <code>org.apache.spark.sql.<br 
/>parquet.DirectParquetOutputCommitter</code>, which can be more
+      efficient then the default Parquet output committer when writing data to 
S3.
+    </p>
+  </td>
+</tr>
 </table>
 
 ## JSON Datasets
@@ -1876,7 +1904,7 @@ that these options will be deprecated in future release 
as more optimizations ar
       Configures the number of partitions to use when shuffling data for joins 
or aggregations.
     </td>
   </tr>
-   <tr>
+  <tr>
     <td><code>spark.sql.planner.externalSort</code></td>
     <td>false</td>
     <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/111d6b9b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 16493c3..2653526 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,6 +22,8 @@ import java.util.Properties
 import scala.collection.immutable
 import scala.collection.JavaConversions._
 
+import org.apache.parquet.hadoop.ParquetOutputCommitter
+
 import org.apache.spark.sql.catalyst.CatalystConf
 
 private[spark] object SQLConf {
@@ -252,9 +254,9 @@ private[spark] object SQLConf {
 
   val PARQUET_FILTER_PUSHDOWN_ENABLED = 
booleanConf("spark.sql.parquet.filterPushdown",
     defaultValue = Some(false),
-    doc = "Turn on Parquet filter pushdown optimization. This feature is 
turned off by default" +
-      " because of a known bug in Paruet 1.6.0rc3 " +
-      "(<a 
href=\"https://issues.apache.org/jira/browse/PARQUET-136\";>PARQUET-136</a>). 
However, " +
+    doc = "Turn on Parquet filter pushdown optimization. This feature is 
turned off by default " +
+      "because of a known bug in Parquet 1.6.0rc3 " +
+      "(PARQUET-136, https://issues.apache.org/jira/browse/PARQUET-136). 
However, " +
       "if your table doesn't contain any nullable string or binary columns, 
it's still safe to " +
       "turn this feature on.")
 
@@ -262,11 +264,21 @@ private[spark] object SQLConf {
     defaultValue = Some(true),
     doc = "<TODO>")
 
+  val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
+    key = "spark.sql.parquet.output.committer.class",
+    defaultValue = Some(classOf[ParquetOutputCommitter].getName),
+    doc = "The output committer class used by Parquet. The specified class 
needs to be a " +
+      "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, 
it's also a subclass " +
+      "of org.apache.parquet.hadoop.ParquetOutputCommitter.  NOTE: 1. Instead 
of SQLConf, this " +
+      "option must be set in Hadoop Configuration.  2. This option overrides " 
+
+      "\"spark.sql.sources.outputCommitterClass\"."
+  )
+
   val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
     defaultValue = Some(false),
     doc = "<TODO>")
 
-  val HIVE_VERIFY_PARTITIONPATH = 
booleanConf("spark.sql.hive.verifyPartitionPath",
+  val HIVE_VERIFY_PARTITION_PATH = 
booleanConf("spark.sql.hive.verifyPartitionPath",
     defaultValue = Some(true),
     doc = "<TODO>")
 
@@ -325,9 +337,13 @@ private[spark] object SQLConf {
       defaultValue = Some(true),
       doc = "<TODO>")
 
-  // The output committer class used by FSBasedRelation. The specified class 
needs to be a
+  // The output committer class used by HadoopFsRelation. The specified class 
needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
-  // NOTE: This property should be set in Hadoop `Configuration` rather than 
Spark `SQLConf`
+  //
+  // NOTE:
+  //
+  //  1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
+  //  2. This option can be overriden by 
"spark.sql.parquet.output.committer.class".
   val OUTPUT_COMMITTER_CLASS =
     stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
 
@@ -415,7 +431,7 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
   private[spark] def orcFilterPushDown: Boolean = 
getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   /** When true uses verifyPartitionPath to prune the path which is not 
exists. */
-  private[spark] def verifyPartitionPath: Boolean = 
getConf(HIVE_VERIFY_PARTITIONPATH)
+  private[spark] def verifyPartitionPath: Boolean = 
getConf(HIVE_VERIFY_PARTITION_PATH)
 
   /** When true the planner will use the external sort, which may spill to 
disk. */
   private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)

http://git-wip-us.apache.org/repos/asf/spark/blob/111d6b9b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
index 62c4e92..1551afd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
@@ -17,19 +17,35 @@
 
 package org.apache.spark.sql.parquet
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.parquet.Log
 import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, 
ParquetOutputCommitter, ParquetOutputFormat}
 
+/**
+ * An output committer for writing Parquet files.  In stead of writing to the 
`_temporary` folder
+ * like what [[ParquetOutputCommitter]] does, this output committer writes 
data directly to the
+ * destination folder.  This can be useful for data stored in S3, where 
directory operations are
+ * relatively expensive.
+ *
+ * To enable this output committer, users may set the 
"spark.sql.parquet.output.committer.class"
+ * property via Hadoop [[Configuration]].  Not that this property overrides
+ * "spark.sql.sources.outputCommitterClass".
+ *
+ * *NOTE*
+ *
+ *   NEVER use [[DirectParquetOutputCommitter]] when appending data, because 
currently there's
+ *   no safe way undo a failed appending job (that's why both `abortTask()` 
and `abortJob()` are
+ *   left * empty).
+ */
 private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
   extends ParquetOutputCommitter(outputPath, context) {
   val LOG = Log.getLog(classOf[ParquetOutputCommitter])
 
-  override def getWorkPath(): Path = outputPath
+  override def getWorkPath: Path = outputPath
   override def abortTask(taskContext: TaskAttemptContext): Unit = {}
   override def commitTask(taskContext: TaskAttemptContext): Unit = {}
   override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
@@ -46,13 +62,11 @@ private[parquet] class 
DirectParquetOutputCommitter(outputPath: Path, context: T
         val footers = 
ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
         try {
           ParquetFileWriter.writeMetadataFile(configuration, outputPath, 
footers)
-        } catch {
-          case e: Exception => {
-            LOG.warn("could not write summary file for " + outputPath, e)
-            val metadataPath = new Path(outputPath, 
ParquetFileWriter.PARQUET_METADATA_FILE)
-            if (fileSystem.exists(metadataPath)) {
-              fileSystem.delete(metadataPath, true)
-            }
+        } catch { case e: Exception =>
+          LOG.warn("could not write summary file for " + outputPath, e)
+          val metadataPath = new Path(outputPath, 
ParquetFileWriter.PARQUET_METADATA_FILE)
+          if (fileSystem.exists(metadataPath)) {
+            fileSystem.delete(metadataPath, true)
           }
         }
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/111d6b9b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e049d54..1d353bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -178,11 +178,11 @@ private[sql] class ParquetRelation2(
 
     val committerClass =
       conf.getClass(
-        "spark.sql.parquet.output.committer.class",
+        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
         classOf[ParquetOutputCommitter],
         classOf[ParquetOutputCommitter])
 
-    if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+    if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
       logInfo("Using default output committer for Parquet: " +
         classOf[ParquetOutputCommitter].getCanonicalName)
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to