Repository: spark
Updated Branches:
  refs/heads/master 902334fd5 -> 49702bd73


[SPARK-8890] [SQL] Fallback on sorting when writing many dynamic partitions

Previously, we would open a new file for each new dynamic written out using 
`HadoopFsRelation`.  For formats like parquet this is very costly due to the 
buffers required to get good compression.  In this PR I refactor the code 
allowing us to fall back on an external sort when many partitions are seen.  As 
such each task will open no more than `spark.sql.sources.maxFiles` files.  I 
also did the following cleanup:

 - Instead of keying the file HashMap on an expensive to compute string 
representation of the partition, we now use a fairly cheap UnsafeProjection 
that avoids heap allocations.
 - The control flow for instantiating and invoking a writer container has been 
simplified.  Now instead of switching in two places based on the use of 
partitioning, the specific writer container must implement a single method 
`writeRows` that is invoked using `runJob`.
 - `InternalOutputWriter` has been removed.  Instead we have a `private[sql]` 
method `writeInternal` that converts and calls the public method.  This method 
can be overridden by internal datasources to avoid the conversion.  This change 
remove a lot of code duplication and per-row `asInstanceOf` checks.
 - `commands.scala` has been split up.

Author: Michael Armbrust <mich...@databricks.com>

Closes #8010 from marmbrus/fsWriting and squashes the following commits:

00804fe [Michael Armbrust] use shuffleMemoryManager.pageSizeBytes
775cc49 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into 
fsWriting
17b690e [Michael Armbrust] remove comment
40f0372 [Michael Armbrust] address comments
f5675bd [Michael Armbrust] char -> string
7e2d0a4 [Michael Armbrust] make sure we close current writer
8100100 [Michael Armbrust] delete empty commands.scala
71cc717 [Michael Armbrust] update comment
8ec75ac [Michael Armbrust] [SPARK-8890][SQL] Fallback on sorting when writing 
many dynamic partitions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49702bd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49702bd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49702bd7

Branch: refs/heads/master
Commit: 49702bd738de681255a7177339510e0e1b25a8db
Parents: 902334f
Author: Michael Armbrust <mich...@databricks.com>
Authored: Fri Aug 7 16:24:50 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Fri Aug 7 16:24:50 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |   8 +-
 .../datasources/InsertIntoDataSource.scala      |  64 ++
 .../InsertIntoHadoopFsRelation.scala            | 165 +++++
 .../execution/datasources/WriterContainer.scala | 404 +++++++++++++
 .../sql/execution/datasources/commands.scala    | 606 -------------------
 .../apache/spark/sql/json/JSONRelation.scala    |   6 +-
 .../spark/sql/parquet/ParquetRelation.scala     |   6 +-
 .../apache/spark/sql/sources/interfaces.scala   |  17 +-
 .../sql/sources/PartitionedWriteSuite.scala     |  56 ++
 .../apache/spark/sql/hive/orc/OrcRelation.scala |   6 +-
 10 files changed, 715 insertions(+), 623 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 45d3d8c..e9de14f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -366,17 +366,21 @@ private[spark] object SQLConf {
       "storing additional schema information in Hive's metastore.",
     isPublic = false)
 
-  // Whether to perform partition discovery when loading external data 
sources.  Default to true.
   val PARTITION_DISCOVERY_ENABLED = 
booleanConf("spark.sql.sources.partitionDiscovery.enabled",
     defaultValue = Some(true),
     doc = "When true, automtically discover data partitions.")
 
-  // Whether to perform partition column type inference. Default to true.
   val PARTITION_COLUMN_TYPE_INFERENCE =
     booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
       defaultValue = Some(true),
       doc = "When true, automatically infer the data types for partitioned 
columns.")
 
+  val PARTITION_MAX_FILES =
+    intConf("spark.sql.sources.maxConcurrentWrites",
+      defaultValue = Some(5),
+      doc = "The maximum number of concurent files to open before falling back 
on sorting when " +
+            "writing out files using dynamic partitioning.")
+
   // The output committer class used by HadoopFsRelation. The specified class 
needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
   //

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
new file mode 100644
index 0000000..6ccde76
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.IOException
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConversions.asScalaIterator
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => 
MapReduceFileOutputCommitter, FileOutputFormat}
+import org.apache.spark._
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.util.{Utils, SerializableConfiguration}
+
+
+/**
+ * Inserts the results of `query` in to a relation that extends 
[[InsertableRelation]].
+ */
+private[sql] case class InsertIntoDataSource(
+    logicalRelation: LogicalRelation,
+    query: LogicalPlan,
+    overwrite: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
+    val data = DataFrame(sqlContext, query)
+    // Apply the schema of the existing table to the new data.
+    val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
+    relation.insert(df, overwrite)
+
+    // Invalidate the cache.
+    sqlContext.cacheManager.invalidateCache(logicalRelation)
+
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
new file mode 100644
index 0000000..735d52f
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.spark._
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
+import org.apache.spark.sql.sources._
+import org.apache.spark.util.Utils
+
+
+/**
+ * A command for writing data to a [[HadoopFsRelation]].  Supports both 
overwriting and appending.
+ * Writing to dynamic partitions is also supported.  Each 
[[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job.  Each concrete 
implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate 
unique file path for
+ * each task output file.  This UUID is passed to executor side via a property 
named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and 
[[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ *   1. Driver side setup, including output committer initialization and data 
source specific
+ *      preparation work for the write job to be issued.
+ *   2. Issues a write job consists of one or more executor side tasks, each 
of which writes all
+ *      rows within an RDD partition.
+ *   3. If no exception is thrown in a task, commits that task, otherwise 
aborts that task;  If any
+ *      exception is thrown during task commitment, also aborts that task.
+ *   4. If all tasks are committed, commit the job, otherwise aborts the job;  
If any exception is
+ *      thrown during job commitment, also aborts the job.
+ */
+private[sql] case class InsertIntoHadoopFsRelation(
+    @transient relation: HadoopFsRelation,
+    @transient query: LogicalPlan,
+    mode: SaveMode)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    require(
+      relation.paths.length == 1,
+      s"Cannot write to multiple destinations: 
${relation.paths.mkString(",")}")
+
+    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val outputPath = new Path(relation.paths.head)
+    val fs = outputPath.getFileSystem(hadoopConf)
+    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
+
+    val pathExists = fs.exists(qualifiedOutputPath)
+    val doInsertion = (mode, pathExists) match {
+      case (SaveMode.ErrorIfExists, true) =>
+        throw new AnalysisException(s"path $qualifiedOutputPath already 
exists.")
+      case (SaveMode.Overwrite, true) =>
+        Utils.tryOrIOException {
+          if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
+            throw new IOException(s"Unable to clear output " +
+              s"directory $qualifiedOutputPath prior to writing to it")
+          }
+        }
+        true
+      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | 
(SaveMode.ErrorIfExists, false) =>
+        true
+      case (SaveMode.Ignore, exists) =>
+        !exists
+      case (s, exists) =>
+        throw new IllegalStateException(s"unsupported save mode $s ($exists)")
+    }
+    // If we are appending data to an existing dir.
+    val isAppend = pathExists && (mode == SaveMode.Append)
+
+    if (doInsertion) {
+      val job = new Job(hadoopConf)
+      job.setOutputKeyClass(classOf[Void])
+      job.setOutputValueClass(classOf[InternalRow])
+      FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
+
+      // A partitioned relation schema's can be different from the input 
logicalPlan, since
+      // partition columns are all moved after data column. We Project to 
adjust the ordering.
+      // TODO: this belongs in the analyzer.
+      val project = Project(
+        relation.schema.map(field => UnresolvedAttribute.quoted(field.name)), 
query)
+      val queryExecution = DataFrame(sqlContext, project).queryExecution
+
+      SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
+        val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, 
relation.schema)
+        val partitionColumns = relation.partitionColumns.fieldNames
+
+        // Some pre-flight checks.
+        require(
+          df.schema == relation.schema,
+          s"""DataFrame must have the same schema as the relation to which is 
inserted.
+             |DataFrame schema: ${df.schema}
+             |Relation schema: ${relation.schema}
+          """.stripMargin)
+        val partitionColumnsInSpec = relation.partitionColumns.fieldNames
+        require(
+          partitionColumnsInSpec.sameElements(partitionColumns),
+          s"""Partition columns mismatch.
+             |Expected: ${partitionColumnsInSpec.mkString(", ")}
+             |Actual: ${partitionColumns.mkString(", ")}
+          """.stripMargin)
+
+        val writerContainer = if (partitionColumns.isEmpty) {
+          new DefaultWriterContainer(relation, job, isAppend)
+        } else {
+          val output = df.queryExecution.executedPlan.output
+          val (partitionOutput, dataOutput) =
+            output.partition(a => partitionColumns.contains(a.name))
+
+          new DynamicPartitionWriterContainer(
+            relation,
+            job,
+            partitionOutput,
+            dataOutput,
+            output,
+            PartitioningUtils.DEFAULT_PARTITION_NAME,
+            sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
+            isAppend)
+        }
+
+        // This call shouldn't be put into the `try` block below because it 
only initializes and
+        // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
+        writerContainer.driverSideSetup()
+
+        try {
+          sqlContext.sparkContext.runJob(df.queryExecution.toRdd, 
writerContainer.writeRows _)
+          writerContainer.commitJob()
+          relation.refresh()
+        } catch { case cause: Throwable =>
+          logError("Aborting job.", cause)
+          writerContainer.abortJob()
+          throw new SparkException("Job aborted.", cause)
+        }
+      }
+    } else {
+      logInfo("Skipping insertion into a relation that already exists.")
+    }
+
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
new file mode 100644
index 0000000..2f11f40
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => 
MapReduceFileOutputCommitter}
+import org.apache.spark._
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.UnsafeKVExternalSorter
+import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, 
OutputWriterFactory}
+import org.apache.spark.sql.types.{StructType, StringType}
+import org.apache.spark.util.SerializableConfiguration
+
+
+private[sql] abstract class BaseWriterContainer(
+    @transient val relation: HadoopFsRelation,
+    @transient job: Job,
+    isAppend: Boolean)
+  extends SparkHadoopMapReduceUtil
+  with Logging
+  with Serializable {
+
+  protected val dataSchema = relation.dataSchema
+
+  protected val serializableConf = new 
SerializableConfiguration(job.getConfiguration)
+
+  // This UUID is used to avoid output file name collision between different 
appending write jobs.
+  // These jobs may belong to different SparkContext instances. Concrete data 
source implementations
+  // may use this UUID to generate unique file names (e.g., 
`part-r-<task-id>-<job-uuid>.parquet`).
+  //  The reason why this ID is used to identify a job rather than a single 
task output file is
+  // that, speculative tasks must generate the same output file name as the 
original task.
+  private val uniqueWriteJobId = UUID.randomUUID()
+
+  // This is only used on driver side.
+  @transient private val jobContext: JobContext = job
+
+  // The following fields are initialized and used on both driver and executor 
side.
+  @transient protected var outputCommitter: OutputCommitter = _
+  @transient private var jobId: JobID = _
+  @transient private var taskId: TaskID = _
+  @transient private var taskAttemptId: TaskAttemptID = _
+  @transient protected var taskAttemptContext: TaskAttemptContext = _
+
+  protected val outputPath: String = {
+    assert(
+      relation.paths.length == 1,
+      s"Cannot write to multiple destinations: 
${relation.paths.mkString(",")}")
+    relation.paths.head
+  }
+
+  protected var outputWriterFactory: OutputWriterFactory = _
+
+  private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
+
+  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit
+
+  def driverSideSetup(): Unit = {
+    setupIDs(0, 0, 0)
+    setupConf()
+
+    // This UUID is sent to executor side together with the serialized 
`Configuration` object within
+    // the `Job` instance.  `OutputWriters` on the executor side should use 
this UUID to generate
+    // unique task output files.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
uniqueWriteJobId.toString)
+
+    // Order of the following two lines is important.  For Hadoop 1, 
TaskAttemptContext constructor
+    // clones the Configuration object passed in.  If we initialize the 
TaskAttemptContext first,
+    // configurations made in prepareJobForWrite(job) are not populated into 
the TaskAttemptContext.
+    //
+    // Also, the `prepareJobForWrite` call must happen before initializing 
output format and output
+    // committer, since their initialization involve the job configuration, 
which can be potentially
+    // decorated in `prepareJobForWrite`.
+    outputWriterFactory = relation.prepareJobForWrite(job)
+    taskAttemptContext = newTaskAttemptContext(serializableConf.value, 
taskAttemptId)
+
+    outputFormatClass = job.getOutputFormatClass
+    outputCommitter = newOutputCommitter(taskAttemptContext)
+    outputCommitter.setupJob(jobContext)
+  }
+
+  def executorSideSetup(taskContext: TaskContext): Unit = {
+    setupIDs(taskContext.stageId(), taskContext.partitionId(), 
taskContext.attemptNumber())
+    setupConf()
+    taskAttemptContext = newTaskAttemptContext(serializableConf.value, 
taskAttemptId)
+    outputCommitter = newOutputCommitter(taskAttemptContext)
+    outputCommitter.setupTask(taskAttemptContext)
+  }
+
+  protected def getWorkPath: String = {
+    outputCommitter match {
+      // FileOutputCommitter writes to a temporary location returned by 
`getWorkPath`.
+      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
+      case _ => outputPath
+    }
+  }
+
+  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
+    val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
+
+    if (isAppend) {
+      // If we are appending data to an existing dir, we will only use the 
output committer
+      // associated with the file output format since it is not safe to use a 
custom
+      // committer for appending. For example, in S3, direct parquet output 
committer may
+      // leave partial data in the destination dir when the the appending job 
fails.
+      logInfo(
+        s"Using output committer class 
${defaultOutputCommitter.getClass.getCanonicalName} " +
+          "for appending.")
+      defaultOutputCommitter
+    } else {
+      val committerClass = context.getConfiguration.getClass(
+        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+      Option(committerClass).map { clazz =>
+        logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
+
+        // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
+        // has an associated output committer. To override this output 
committer,
+        // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
+        // If a data source needs to override the output committer, it needs 
to set the
+        // output committer in prepareForWrite method.
+        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+          // The specified output committer is a FileOutputCommitter.
+          // So, we will use the FileOutputCommitter-specified constructor.
+          val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
+          ctor.newInstance(new Path(outputPath), context)
+        } else {
+          // The specified output committer is just a OutputCommitter.
+          // So, we will use the no-argument constructor.
+          val ctor = clazz.getDeclaredConstructor()
+          ctor.newInstance()
+        }
+      }.getOrElse {
+        // If output committer class is not set, we will use the one 
associated with the
+        // file output format.
+        logInfo(
+          s"Using output committer class 
${defaultOutputCommitter.getClass.getCanonicalName}")
+        defaultOutputCommitter
+      }
+    }
+  }
+
+  private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
+    this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
+    this.taskId = new TaskID(this.jobId, true, splitId)
+    this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
+  }
+
+  private def setupConf(): Unit = {
+    serializableConf.value.set("mapred.job.id", jobId.toString)
+    serializableConf.value.set("mapred.tip.id", 
taskAttemptId.getTaskID.toString)
+    serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
+    serializableConf.value.setBoolean("mapred.task.is.map", true)
+    serializableConf.value.setInt("mapred.task.partition", 0)
+  }
+
+  def commitTask(): Unit = {
+    SparkHadoopMapRedUtil.commitTask(
+      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, 
taskAttemptId.getId)
+  }
+
+  def abortTask(): Unit = {
+    if (outputCommitter != null) {
+      outputCommitter.abortTask(taskAttemptContext)
+    }
+    logError(s"Task attempt $taskAttemptId aborted.")
+  }
+
+  def commitJob(): Unit = {
+    outputCommitter.commitJob(jobContext)
+    logInfo(s"Job $jobId committed.")
+  }
+
+  def abortJob(): Unit = {
+    if (outputCommitter != null) {
+      outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
+    }
+    logError(s"Job $jobId aborted.")
+  }
+}
+
+/**
+ * A writer that writes all of the rows in a partition to a single file.
+ */
+private[sql] class DefaultWriterContainer(
+    @transient relation: HadoopFsRelation,
+    @transient job: Job,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
+
+  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
+    executorSideSetup(taskContext)
+    taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", 
outputPath)
+    val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, 
taskAttemptContext)
+    writer.initConverter(dataSchema)
+
+    // If anything below fails, we should abort the task.
+    try {
+      while (iterator.hasNext) {
+        val internalRow = iterator.next()
+        writer.writeInternal(internalRow)
+      }
+
+      commitTask()
+    } catch {
+      case cause: Throwable =>
+        logError("Aborting task.", cause)
+        abortTask()
+        throw new SparkException("Task failed while writing rows.", cause)
+    }
+
+    def commitTask(): Unit = {
+      try {
+        assert(writer != null, "OutputWriter instance should have been 
initialized")
+        writer.close()
+        super.commitTask()
+      } catch {
+        case cause: Throwable =>
+          // This exception will be handled in 
`InsertIntoHadoopFsRelation.insert$writeRows`, and
+          // will cause `abortTask()` to be invoked.
+          throw new RuntimeException("Failed to commit task", cause)
+      }
+    }
+
+    def abortTask(): Unit = {
+      try {
+        writer.close()
+      } finally {
+        super.abortTask()
+      }
+    }
+  }
+}
+
+/**
+ * A writer that dynamically opens files based on the given partition columns. 
 Internally this is
+ * done by maintaining a HashMap of open files until `maxFiles` is reached.  
If this occurs, the
+ * writer externally sorts the remaining rows and then writes out them out one 
file at a time.
+ */
+private[sql] class DynamicPartitionWriterContainer(
+    @transient relation: HadoopFsRelation,
+    @transient job: Job,
+    partitionColumns: Seq[Attribute],
+    dataColumns: Seq[Attribute],
+    inputSchema: Seq[Attribute],
+    defaultPartitionName: String,
+    maxOpenFiles: Int,
+    isAppend: Boolean)
+  extends BaseWriterContainer(relation, job, isAppend) {
+
+  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
+    val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
+    executorSideSetup(taskContext)
+
+    // Returns the partition key given an input row
+    val getPartitionKey = UnsafeProjection.create(partitionColumns, 
inputSchema)
+    // Returns the data columns to be written given an input row
+    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
+
+    // Expressions that given a partition key build a string like: 
col1=val/col2=val/...
+    val partitionStringExpression = partitionColumns.zipWithIndex.flatMap { 
case (c, i) =>
+      val escaped =
+        ScalaUDF(
+          PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, 
StringType)), Seq(StringType))
+      val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
+      val partitionName = Literal(c.name + "=") :: str :: Nil
+      if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: 
partitionName
+    }
+
+    // Returns the partition path given a partition key.
+    val getPartitionString =
+      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, 
partitionColumns)
+
+    // If anything below fails, we should abort the task.
+    try {
+      // This will be filled in if we have to fall back on sorting.
+      var sorter: UnsafeKVExternalSorter = null
+      while (iterator.hasNext && sorter == null) {
+        val inputRow = iterator.next()
+        val currentKey = getPartitionKey(inputRow)
+        var currentWriter = outputWriters.get(currentKey)
+
+        if (currentWriter == null) {
+          if (outputWriters.size < maxOpenFiles) {
+            currentWriter = newOutputWriter(currentKey)
+            outputWriters.put(currentKey.copy(), currentWriter)
+            currentWriter.writeInternal(getOutputRow(inputRow))
+          } else {
+            logInfo(s"Maximum partitions reached, falling back on sorting.")
+            sorter = new UnsafeKVExternalSorter(
+              StructType.fromAttributes(partitionColumns),
+              StructType.fromAttributes(dataColumns),
+              SparkEnv.get.blockManager,
+              SparkEnv.get.shuffleMemoryManager,
+              SparkEnv.get.shuffleMemoryManager.pageSizeBytes)
+            sorter.insertKV(currentKey, getOutputRow(inputRow))
+          }
+        } else {
+          currentWriter.writeInternal(getOutputRow(inputRow))
+        }
+      }
+
+      // If the sorter is not null that means that we reached the maxFiles 
above and need to finish
+      // using external sort.
+      if (sorter != null) {
+        while (iterator.hasNext) {
+          val currentRow = iterator.next()
+          sorter.insertKV(getPartitionKey(currentRow), 
getOutputRow(currentRow))
+        }
+
+        logInfo(s"Sorting complete. Writing out partition files one at a 
time.")
+
+        val sortedIterator = sorter.sortedIterator()
+        var currentKey: InternalRow = null
+        var currentWriter: OutputWriter = null
+        try {
+          while (sortedIterator.next()) {
+            if (currentKey != sortedIterator.getKey) {
+              if (currentWriter != null) {
+                currentWriter.close()
+              }
+              currentKey = sortedIterator.getKey.copy()
+              logDebug(s"Writing partition: $currentKey")
+
+              // Either use an existing file from before, or open a new one.
+              currentWriter = outputWriters.remove(currentKey)
+              if (currentWriter == null) {
+                currentWriter = newOutputWriter(currentKey)
+              }
+            }
+
+            currentWriter.writeInternal(sortedIterator.getValue)
+          }
+        } finally {
+          if (currentWriter != null) { currentWriter.close() }
+        }
+      }
+
+      commitTask()
+    } catch {
+      case cause: Throwable =>
+        logError("Aborting task.", cause)
+        abortTask()
+        throw new SparkException("Task failed while writing rows.", cause)
+    }
+
+    /** Open and returns a new OutputWriter given a partition key. */
+    def newOutputWriter(key: InternalRow): OutputWriter = {
+      val partitionPath = getPartitionString(key).getString(0)
+      val path = new Path(getWorkPath, partitionPath)
+      taskAttemptContext.getConfiguration.set(
+        "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
+      val newWriter = outputWriterFactory.newInstance(path.toString, 
dataSchema, taskAttemptContext)
+      newWriter.initConverter(dataSchema)
+      newWriter
+    }
+
+    def clearOutputWriters(): Unit = {
+      outputWriters.asScala.values.foreach(_.close())
+      outputWriters.clear()
+    }
+
+    def commitTask(): Unit = {
+      try {
+        clearOutputWriters()
+        super.commitTask()
+      } catch {
+        case cause: Throwable =>
+          throw new RuntimeException("Failed to commit task", cause)
+      }
+    }
+
+    def abortTask(): Unit = {
+      try {
+        clearOutputWriters()
+      } finally {
+        super.abortTask()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
deleted file mode 100644
index 4266897..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.IOException
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConversions.asScalaIterator
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => 
MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.spark._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.{Utils, SerializableConfiguration}
-
-
-private[sql] case class InsertIntoDataSource(
-    logicalRelation: LogicalRelation,
-    query: LogicalPlan,
-    overwrite: Boolean)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
-    val data = DataFrame(sqlContext, query)
-    // Apply the schema of the existing table to the new data.
-    val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
-    relation.insert(df, overwrite)
-
-    // Invalidate the cache.
-    sqlContext.cacheManager.invalidateCache(logicalRelation)
-
-    Seq.empty[Row]
-  }
-}
-
-/**
- * A command for writing data to a [[HadoopFsRelation]].  Supports both 
overwriting and appending.
- * Writing to dynamic partitions is also supported.  Each 
[[InsertIntoHadoopFsRelation]] issues a
- * single write job, and owns a UUID that identifies this job.  Each concrete 
implementation of
- * [[HadoopFsRelation]] should use this UUID together with task id to generate 
unique file path for
- * each task output file.  This UUID is passed to executor side via a property 
named
- * `spark.sql.sources.writeJobUUID`.
- *
- * Different writer containers, [[DefaultWriterContainer]] and 
[[DynamicPartitionWriterContainer]]
- * are used to write to normal tables and tables with dynamic partitions.
- *
- * Basic work flow of this command is:
- *
- *   1. Driver side setup, including output committer initialization and data 
source specific
- *      preparation work for the write job to be issued.
- *   2. Issues a write job consists of one or more executor side tasks, each 
of which writes all
- *      rows within an RDD partition.
- *   3. If no exception is thrown in a task, commits that task, otherwise 
aborts that task;  If any
- *      exception is thrown during task commitment, also aborts that task.
- *   4. If all tasks are committed, commit the job, otherwise aborts the job;  
If any exception is
- *      thrown during job commitment, also aborts the job.
- */
-private[sql] case class InsertIntoHadoopFsRelation(
-    @transient relation: HadoopFsRelation,
-    @transient query: LogicalPlan,
-    mode: SaveMode)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    require(
-      relation.paths.length == 1,
-      s"Cannot write to multiple destinations: 
${relation.paths.mkString(",")}")
-
-    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
-    val outputPath = new Path(relation.paths.head)
-    val fs = outputPath.getFileSystem(hadoopConf)
-    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-
-    val pathExists = fs.exists(qualifiedOutputPath)
-    val doInsertion = (mode, pathExists) match {
-      case (SaveMode.ErrorIfExists, true) =>
-        throw new AnalysisException(s"path $qualifiedOutputPath already 
exists.")
-      case (SaveMode.Overwrite, true) =>
-        Utils.tryOrIOException {
-          if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
-            throw new IOException(s"Unable to clear output " +
-              s"directory $qualifiedOutputPath prior to writing to it")
-          }
-        }
-        true
-      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | 
(SaveMode.ErrorIfExists, false) =>
-        true
-      case (SaveMode.Ignore, exists) =>
-        !exists
-      case (s, exists) =>
-        throw new IllegalStateException(s"unsupported save mode $s ($exists)")
-    }
-    // If we are appending data to an existing dir.
-    val isAppend = pathExists && (mode == SaveMode.Append)
-
-    if (doInsertion) {
-      val job = new Job(hadoopConf)
-      job.setOutputKeyClass(classOf[Void])
-      job.setOutputValueClass(classOf[InternalRow])
-      FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
-
-      // We create a DataFrame by applying the schema of relation to the data 
to make sure.
-      // We are writing data based on the expected schema,
-
-      // For partitioned relation r, r.schema's column ordering can be 
different from the column
-      // ordering of data.logicalPlan (partition columns are all moved after 
data column). We
-      // need a Project to adjust the ordering, so that inside 
InsertIntoHadoopFsRelation, we can
-      // safely apply the schema of r.schema to the data.
-      val project = Project(
-        relation.schema.map(field => new 
UnresolvedAttribute(Seq(field.name))), query)
-
-      val queryExecution = DataFrame(sqlContext, project).queryExecution
-      SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
-        val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, 
relation.schema)
-
-        val partitionColumns = relation.partitionColumns.fieldNames
-        if (partitionColumns.isEmpty) {
-          insert(new DefaultWriterContainer(relation, job, isAppend), df)
-        } else {
-          val writerContainer = new DynamicPartitionWriterContainer(
-            relation, job, partitionColumns, 
PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
-          insertWithDynamicPartitions(sqlContext, writerContainer, df, 
partitionColumns)
-        }
-      }
-    }
-
-    Seq.empty[Row]
-  }
-
-  /**
-   * Inserts the content of the [[DataFrame]] into a table without any 
partitioning columns.
-   */
-  private def insert(writerContainer: BaseWriterContainer, df: DataFrame): 
Unit = {
-    // Uses local vals for serialization
-    val needsConversion = relation.needConversion
-    val dataSchema = relation.dataSchema
-
-    // This call shouldn't be put into the `try` block below because it only 
initializes and
-    // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
-    writerContainer.driverSideSetup()
-
-    try {
-      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
-      writerContainer.commitJob()
-      relation.refresh()
-    } catch { case cause: Throwable =>
-      logError("Aborting job.", cause)
-      writerContainer.abortJob()
-      throw new SparkException("Job aborted.", cause)
-    }
-
-    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
-      // If anything below fails, we should abort the task.
-      try {
-        writerContainer.executorSideSetup(taskContext)
-
-        if (needsConversion) {
-          val converter = 
CatalystTypeConverters.createToScalaConverter(dataSchema)
-            .asInstanceOf[InternalRow => Row]
-          while (iterator.hasNext) {
-            val internalRow = iterator.next()
-            
writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
-          }
-        } else {
-          while (iterator.hasNext) {
-            val internalRow = iterator.next()
-            writerContainer.outputWriterForRow(internalRow)
-              .asInstanceOf[OutputWriterInternal].writeInternal(internalRow)
-          }
-        }
-
-        writerContainer.commitTask()
-      } catch { case cause: Throwable =>
-        logError("Aborting task.", cause)
-        writerContainer.abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
-      }
-    }
-  }
-
-  /**
-   * Inserts the content of the [[DataFrame]] into a table with partitioning 
columns.
-   */
-  private def insertWithDynamicPartitions(
-      sqlContext: SQLContext,
-      writerContainer: BaseWriterContainer,
-      df: DataFrame,
-      partitionColumns: Array[String]): Unit = {
-    // Uses a local val for serialization
-    val needsConversion = relation.needConversion
-    val dataSchema = relation.dataSchema
-
-    require(
-      df.schema == relation.schema,
-      s"""DataFrame must have the same schema as the relation to which is 
inserted.
-         |DataFrame schema: ${df.schema}
-         |Relation schema: ${relation.schema}
-       """.stripMargin)
-
-    val partitionColumnsInSpec = relation.partitionColumns.fieldNames
-    require(
-      partitionColumnsInSpec.sameElements(partitionColumns),
-      s"""Partition columns mismatch.
-         |Expected: ${partitionColumnsInSpec.mkString(", ")}
-         |Actual: ${partitionColumns.mkString(", ")}
-       """.stripMargin)
-
-    val output = df.queryExecution.executedPlan.output
-    val (partitionOutput, dataOutput) = output.partition(a => 
partitionColumns.contains(a.name))
-    val codegenEnabled = df.sqlContext.conf.codegenEnabled
-
-    // This call shouldn't be put into the `try` block below because it only 
initializes and
-    // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
-    writerContainer.driverSideSetup()
-
-    try {
-      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
-      writerContainer.commitJob()
-      relation.refresh()
-    } catch { case cause: Throwable =>
-      logError("Aborting job.", cause)
-      writerContainer.abortJob()
-      throw new SparkException("Job aborted.", cause)
-    }
-
-    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
-      // If anything below fails, we should abort the task.
-      try {
-        writerContainer.executorSideSetup(taskContext)
-
-        // Projects all partition columns and casts them to strings to build 
partition directories.
-        val partitionCasts = partitionOutput.map(Cast(_, StringType))
-        val partitionProj = newProjection(codegenEnabled, partitionCasts, 
output)
-        val dataProj = newProjection(codegenEnabled, dataOutput, output)
-
-        if (needsConversion) {
-          val converter = 
CatalystTypeConverters.createToScalaConverter(dataSchema)
-            .asInstanceOf[InternalRow => Row]
-          while (iterator.hasNext) {
-            val internalRow = iterator.next()
-            val partitionPart = partitionProj(internalRow)
-            val dataPart = converter(dataProj(internalRow))
-            writerContainer.outputWriterForRow(partitionPart).write(dataPart)
-          }
-        } else {
-          while (iterator.hasNext) {
-            val internalRow = iterator.next()
-            val partitionPart = partitionProj(internalRow)
-            val dataPart = dataProj(internalRow)
-            writerContainer.outputWriterForRow(partitionPart)
-              .asInstanceOf[OutputWriterInternal].writeInternal(dataPart)
-          }
-        }
-
-        writerContainer.commitTask()
-      } catch { case cause: Throwable =>
-        logError("Aborting task.", cause)
-        writerContainer.abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
-      }
-    }
-  }
-
-  // This is copied from SparkPlan, probably should move this to a more 
general place.
-  private def newProjection(
-      codegenEnabled: Boolean,
-      expressions: Seq[Expression],
-      inputSchema: Seq[Attribute]): Projection = {
-    log.debug(
-      s"Creating Projection: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled")
-    if (codegenEnabled) {
-
-      try {
-        GenerateProjection.generate(expressions, inputSchema)
-      } catch {
-        case e: Exception =>
-          if (sys.props.contains("spark.testing")) {
-            throw e
-          } else {
-            log.error("failed to generate projection, fallback to 
interpreted", e)
-            new InterpretedProjection(expressions, inputSchema)
-          }
-      }
-    } else {
-      new InterpretedProjection(expressions, inputSchema)
-    }
-  }
-}
-
-private[sql] abstract class BaseWriterContainer(
-    @transient val relation: HadoopFsRelation,
-    @transient job: Job,
-    isAppend: Boolean)
-  extends SparkHadoopMapReduceUtil
-  with Logging
-  with Serializable {
-
-  protected val serializableConf = new 
SerializableConfiguration(job.getConfiguration)
-
-  // This UUID is used to avoid output file name collision between different 
appending write jobs.
-  // These jobs may belong to different SparkContext instances. Concrete data 
source implementations
-  // may use this UUID to generate unique file names (e.g., 
`part-r-<task-id>-<job-uuid>.parquet`).
-  //  The reason why this ID is used to identify a job rather than a single 
task output file is
-  // that, speculative tasks must generate the same output file name as the 
original task.
-  private val uniqueWriteJobId = UUID.randomUUID()
-
-  // This is only used on driver side.
-  @transient private val jobContext: JobContext = job
-
-  // The following fields are initialized and used on both driver and executor 
side.
-  @transient protected var outputCommitter: OutputCommitter = _
-  @transient private var jobId: JobID = _
-  @transient private var taskId: TaskID = _
-  @transient private var taskAttemptId: TaskAttemptID = _
-  @transient protected var taskAttemptContext: TaskAttemptContext = _
-
-  protected val outputPath: String = {
-    assert(
-      relation.paths.length == 1,
-      s"Cannot write to multiple destinations: 
${relation.paths.mkString(",")}")
-    relation.paths.head
-  }
-
-  protected val dataSchema = relation.dataSchema
-
-  protected var outputWriterFactory: OutputWriterFactory = _
-
-  private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
-
-  def driverSideSetup(): Unit = {
-    setupIDs(0, 0, 0)
-    setupConf()
-
-    // This UUID is sent to executor side together with the serialized 
`Configuration` object within
-    // the `Job` instance.  `OutputWriters` on the executor side should use 
this UUID to generate
-    // unique task output files.
-    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
uniqueWriteJobId.toString)
-
-    // Order of the following two lines is important.  For Hadoop 1, 
TaskAttemptContext constructor
-    // clones the Configuration object passed in.  If we initialize the 
TaskAttemptContext first,
-    // configurations made in prepareJobForWrite(job) are not populated into 
the TaskAttemptContext.
-    //
-    // Also, the `prepareJobForWrite` call must happen before initializing 
output format and output
-    // committer, since their initialization involve the job configuration, 
which can be potentially
-    // decorated in `prepareJobForWrite`.
-    outputWriterFactory = relation.prepareJobForWrite(job)
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, 
taskAttemptId)
-
-    outputFormatClass = job.getOutputFormatClass
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupJob(jobContext)
-  }
-
-  def executorSideSetup(taskContext: TaskContext): Unit = {
-    setupIDs(taskContext.stageId(), taskContext.partitionId(), 
taskContext.attemptNumber())
-    setupConf()
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, 
taskAttemptId)
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupTask(taskAttemptContext)
-    initWriters()
-  }
-
-  protected def getWorkPath: String = {
-    outputCommitter match {
-      // FileOutputCommitter writes to a temporary location returned by 
`getWorkPath`.
-      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
-      case _ => outputPath
-    }
-  }
-
-  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
-    val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
-
-    if (isAppend) {
-      // If we are appending data to an existing dir, we will only use the 
output committer
-      // associated with the file output format since it is not safe to use a 
custom
-      // committer for appending. For example, in S3, direct parquet output 
committer may
-      // leave partial data in the destination dir when the the appending job 
fails.
-      logInfo(
-        s"Using output committer class 
${defaultOutputCommitter.getClass.getCanonicalName} " +
-        "for appending.")
-      defaultOutputCommitter
-    } else {
-      val committerClass = context.getConfiguration.getClass(
-        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
-      Option(committerClass).map { clazz =>
-        logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
-
-        // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
-        // has an associated output committer. To override this output 
committer,
-        // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
-        // If a data source needs to override the output committer, it needs 
to set the
-        // output committer in prepareForWrite method.
-        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-          // The specified output committer is a FileOutputCommitter.
-          // So, we will use the FileOutputCommitter-specified constructor.
-          val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
-          ctor.newInstance(new Path(outputPath), context)
-        } else {
-          // The specified output committer is just a OutputCommitter.
-          // So, we will use the no-argument constructor.
-          val ctor = clazz.getDeclaredConstructor()
-          ctor.newInstance()
-        }
-      }.getOrElse {
-        // If output committer class is not set, we will use the one 
associated with the
-        // file output format.
-        logInfo(
-          s"Using output committer class 
${defaultOutputCommitter.getClass.getCanonicalName}")
-        defaultOutputCommitter
-      }
-    }
-  }
-
-  private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
-    this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
-    this.taskId = new TaskID(this.jobId, true, splitId)
-    this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
-  }
-
-  private def setupConf(): Unit = {
-    serializableConf.value.set("mapred.job.id", jobId.toString)
-    serializableConf.value.set("mapred.tip.id", 
taskAttemptId.getTaskID.toString)
-    serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
-    serializableConf.value.setBoolean("mapred.task.is.map", true)
-    serializableConf.value.setInt("mapred.task.partition", 0)
-  }
-
-  // Called on executor side when writing rows
-  def outputWriterForRow(row: InternalRow): OutputWriter
-
-  protected def initWriters(): Unit
-
-  def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(
-      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, 
taskAttemptId.getId)
-  }
-
-  def abortTask(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortTask(taskAttemptContext)
-    }
-    logError(s"Task attempt $taskAttemptId aborted.")
-  }
-
-  def commitJob(): Unit = {
-    outputCommitter.commitJob(jobContext)
-    logInfo(s"Job $jobId committed.")
-  }
-
-  def abortJob(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
-    }
-    logError(s"Job $jobId aborted.")
-  }
-}
-
-private[sql] class DefaultWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  @transient private var writer: OutputWriter = _
-
-  override protected def initWriters(): Unit = {
-    taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", 
outputPath)
-    writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, 
taskAttemptContext)
-  }
-
-  override def outputWriterForRow(row: InternalRow): OutputWriter = writer
-
-  override def commitTask(): Unit = {
-    try {
-      assert(writer != null, "OutputWriter instance should have been 
initialized")
-      writer.close()
-      super.commitTask()
-    } catch { case cause: Throwable =>
-      // This exception will be handled in 
`InsertIntoHadoopFsRelation.insert$writeRows`, and will
-      // cause `abortTask()` to be invoked.
-      throw new RuntimeException("Failed to commit task", cause)
-    }
-  }
-
-  override def abortTask(): Unit = {
-    try {
-      // It's possible that the task fails before `writer` gets initialized
-      if (writer != null) {
-        writer.close()
-      }
-    } finally {
-      super.abortTask()
-    }
-  }
-}
-
-private[sql] class DynamicPartitionWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
-    partitionColumns: Array[String],
-    defaultPartitionName: String,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  // All output writers are created on executor side.
-  @transient protected var outputWriters: java.util.HashMap[String, 
OutputWriter] = _
-
-  override protected def initWriters(): Unit = {
-    outputWriters = new java.util.HashMap[String, OutputWriter]
-  }
-
-  // The `row` argument is supposed to only contain partition column values 
which have been casted
-  // to strings.
-  override def outputWriterForRow(row: InternalRow): OutputWriter = {
-    val partitionPath = {
-      val partitionPathBuilder = new StringBuilder
-      var i = 0
-
-      while (i < partitionColumns.length) {
-        val col = partitionColumns(i)
-        val partitionValueString = {
-          val string = row.getUTF8String(i)
-          if (string.eq(null)) {
-            defaultPartitionName
-          } else {
-            PartitioningUtils.escapePathName(string.toString)
-          }
-        }
-
-        if (i > 0) {
-          partitionPathBuilder.append(Path.SEPARATOR_CHAR)
-        }
-
-        partitionPathBuilder.append(s"$col=$partitionValueString")
-        i += 1
-      }
-
-      partitionPathBuilder.toString()
-    }
-
-    val writer = outputWriters.get(partitionPath)
-    if (writer.eq(null)) {
-      val path = new Path(getWorkPath, partitionPath)
-      taskAttemptContext.getConfiguration.set(
-        "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
-      val newWriter = outputWriterFactory.newInstance(path.toString, 
dataSchema, taskAttemptContext)
-      outputWriters.put(partitionPath, newWriter)
-      newWriter
-    } else {
-      writer
-    }
-  }
-
-  private def clearOutputWriters(): Unit = {
-    if (!outputWriters.isEmpty) {
-      asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
-      outputWriters.clear()
-    }
-  }
-
-  override def commitTask(): Unit = {
-    try {
-      clearOutputWriters()
-      super.commitTask()
-    } catch { case cause: Throwable =>
-      throw new RuntimeException("Failed to commit task", cause)
-    }
-  }
-
-  override def abortTask(): Unit = {
-    try {
-      clearOutputWriters()
-    } finally {
-      super.abortTask()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 5d37140..10f1367 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -152,7 +152,7 @@ private[json] class JsonOutputWriter(
     path: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
-  extends OutputWriterInternal with SparkHadoopMapRedUtil with Logging {
+  extends OutputWriter with SparkHadoopMapRedUtil with Logging {
 
   val writer = new CharArrayWriter()
   // create the Generator without separator inserted between 2 records
@@ -170,7 +170,9 @@ private[json] class JsonOutputWriter(
     }.getRecordWriter(context)
   }
 
-  override def writeInternal(row: InternalRow): Unit = {
+  override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
+
+  override protected[sql] def writeInternal(row: InternalRow): Unit = {
     JacksonGenerator(dataSchema, gen, row)
     gen.flush()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 29c388c..48009b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -62,7 +62,7 @@ private[sql] class DefaultSource extends 
HadoopFsRelationProvider {
 
 // NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
 private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
-  extends OutputWriterInternal {
+  extends OutputWriter {
 
   private val recordWriter: RecordWriter[Void, InternalRow] = {
     val outputFormat = {
@@ -87,7 +87,9 @@ private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext
     outputFormat.getRecordWriter(context)
   }
 
-  override def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
+  override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
+
+  override protected[sql] def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
 
   override def close(): Unit = recordWriter.close(context)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 0b29296..c5b7ee7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -342,18 +342,17 @@ abstract class OutputWriter {
    * @since 1.4.0
    */
   def close(): Unit
-}
 
-/**
- * This is an internal, private version of [[OutputWriter]] with an 
writeInternal method that
- * accepts an [[InternalRow]] rather than an [[Row]]. Data sources that return 
this must have
- * the conversion flag set to false.
- */
-private[sql] abstract class OutputWriterInternal extends OutputWriter {
+  private var converter: InternalRow => Row = _
 
-  override def write(row: Row): Unit = throw new UnsupportedOperationException
+  protected[sql] def initConverter(dataSchema: StructType) = {
+    converter =
+      
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow
 => Row]
+  }
 
-  def writeInternal(row: InternalRow): Unit
+  protected[sql] def writeInternal(row: InternalRow): Unit = {
+    write(converter(row))
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
new file mode 100644
index 0000000..c86ddd7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.util.Utils
+
+class PartitionedWriteSuite extends QueryTest {
+  import TestSQLContext.implicits._
+
+  test("write many partitions") {
+    val path = Utils.createTempDir()
+    path.delete()
+
+    val df = TestSQLContext.range(100).select($"id", lit(1).as("data"))
+    df.write.partitionBy("id").save(path.getCanonicalPath)
+
+    checkAnswer(
+      TestSQLContext.read.load(path.getCanonicalPath),
+      (0 to 99).map(Row(1, _)).toSeq)
+
+    Utils.deleteRecursively(path)
+  }
+
+  test("write many partitions with repeats") {
+    val path = Utils.createTempDir()
+    path.delete()
+
+    val base = TestSQLContext.range(100)
+    val df = base.unionAll(base).select($"id", lit(1).as("data"))
+    df.write.partitionBy("id").save(path.getCanonicalPath)
+
+    checkAnswer(
+      TestSQLContext.read.load(path.getCanonicalPath),
+      (0 to 99).map(Row(1, _)).toSeq ++ (0 to 99).map(Row(1, _)).toSeq)
+
+    Utils.deleteRecursively(path)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/49702bd7/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4a310ff..7c8704b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -66,7 +66,7 @@ private[orc] class OrcOutputWriter(
     path: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
-  extends OutputWriterInternal with SparkHadoopMapRedUtil with HiveInspectors {
+  extends OutputWriter with SparkHadoopMapRedUtil with HiveInspectors {
 
   private val serializer = {
     val table = new Properties()
@@ -120,7 +120,9 @@ private[orc] class OrcOutputWriter(
     ).asInstanceOf[RecordWriter[NullWritable, Writable]]
   }
 
-  override def writeInternal(row: InternalRow): Unit = {
+  override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
+
+  override protected[sql] def writeInternal(row: InternalRow): Unit = {
     var i = 0
     while (i < row.numFields) {
       reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to