Repository: spark
Updated Branches:
  refs/heads/master 4b2011ec9 -> f313117bc


[SPARK-18012][SQL] Simplify WriterContainer

## What changes were proposed in this pull request?
This patch refactors WriterContainer to simplify the logic and make control 
flow more obvious.The previous code setup made it pretty difficult to track the 
actual dependencies on variables and setups because the driver side and the 
executor side were using the same set of variables.

## How was this patch tested?
N/A - this should be covered by existing tests.

Author: Reynold Xin <r...@databricks.com>

Closes #15551 from rxin/writercontainer-refactor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f313117b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f313117b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f313117b

Branch: refs/heads/master
Commit: f313117bc93b0bf560528b316d3e6947caa96296
Parents: 4b2011e
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Oct 19 22:22:35 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Oct 19 22:22:35 2016 -0700

----------------------------------------------------------------------
 .../InsertIntoHadoopFsRelationCommand.scala     |  79 +--
 .../sql/execution/datasources/WriteOutput.scala | 480 +++++++++++++++++++
 .../execution/datasources/WriterContainer.scala | 445 -----------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   9 -
 4 files changed, 492 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 99ca3df..22dbe71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -20,18 +20,12 @@ package org.apache.spark.sql.execution.datasources
 import java.io.IOException
 
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 
-import org.apache.spark._
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A command for writing data to a [[HadoopFsRelation]].  Supports both 
overwriting and appending.
@@ -40,20 +34,6 @@ import org.apache.spark.sql.internal.SQLConf
  * implementation of [[HadoopFsRelation]] should use this UUID together with 
task id to generate
  * unique file path for each task output file.  This UUID is passed to 
executor side via a
  * property named `spark.sql.sources.writeJobUUID`.
- *
- * Different writer containers, [[DefaultWriterContainer]] and 
[[DynamicPartitionWriterContainer]]
- * are used to write to normal tables and tables with dynamic partitions.
- *
- * Basic work flow of this command is:
- *
- *   1. Driver side setup, including output committer initialization and data 
source specific
- *      preparation work for the write job to be issued.
- *   2. Issues a write job consists of one or more executor side tasks, each 
of which writes all
- *      rows within an RDD partition.
- *   3. If no exception is thrown in a task, commits that task, otherwise 
aborts that task;  If any
- *      exception is thrown during task commitment, also aborts that task.
- *   4. If all tasks are committed, commit the job, otherwise aborts the job;  
If any exception is
- *      thrown during job commitment, also aborts the job.
  */
 case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
@@ -103,52 +83,17 @@ case class InsertIntoHadoopFsRelationCommand(
     val isAppend = pathExists && (mode == SaveMode.Append)
 
     if (doInsertion) {
-      val job = Job.getInstance(hadoopConf)
-      job.setOutputKeyClass(classOf[Void])
-      job.setOutputValueClass(classOf[InternalRow])
-      FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
-
-      val partitionSet = AttributeSet(partitionColumns)
-      val dataColumns = query.output.filterNot(partitionSet.contains)
-
-      val queryExecution = Dataset.ofRows(sparkSession, query).queryExecution
-      SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
-        val relation =
-          WriteRelation(
-            sparkSession,
-            dataColumns.toStructType,
-            qualifiedOutputPath.toString,
-            fileFormat.prepareWrite(sparkSession, _, options, 
dataColumns.toStructType),
-            bucketSpec)
-
-        val writerContainer = if (partitionColumns.isEmpty && 
bucketSpec.isEmpty) {
-          new DefaultWriterContainer(relation, job, isAppend)
-        } else {
-          new DynamicPartitionWriterContainer(
-            relation,
-            job,
-            partitionColumns = partitionColumns,
-            dataColumns = dataColumns,
-            inputSchema = query.output,
-            PartitioningUtils.DEFAULT_PARTITION_NAME,
-            sparkSession.sessionState.conf.partitionMaxFiles,
-            isAppend)
-        }
-
-        // This call shouldn't be put into the `try` block below because it 
only initializes and
-        // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
-        writerContainer.driverSideSetup()
-
-        try {
-          sparkSession.sparkContext.runJob(queryExecution.toRdd, 
writerContainer.writeRows _)
-          writerContainer.commitJob()
-          refreshFunction()
-        } catch { case cause: Throwable =>
-          logError("Aborting job.", cause)
-          writerContainer.abortJob()
-          throw new SparkException("Job aborted.", cause)
-        }
-      }
+      WriteOutput.write(
+        sparkSession,
+        query,
+        fileFormat,
+        qualifiedOutputPath,
+        hadoopConf,
+        partitionColumns,
+        bucketSpec,
+        refreshFunction,
+        options,
+        isAppend)
     } else {
       logInfo("Skipping insertion into a relation that already exists.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
new file mode 100644
index 0000000..54d0f3b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
FileOutputFormat}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+
+
+/** A helper object for writing data out to a location. */
+object WriteOutput extends Logging {
+
+  /** A shared job description for all the write tasks. */
+  private class WriteJobDescription(
+      val serializableHadoopConf: SerializableConfiguration,
+      val outputWriterFactory: OutputWriterFactory,
+      val allColumns: Seq[Attribute],
+      val partitionColumns: Seq[Attribute],
+      val nonPartitionColumns: Seq[Attribute],
+      val bucketSpec: Option[BucketSpec],
+      val isAppend: Boolean,
+      val path: String,
+      val outputFormatClass: Class[_ <: OutputFormat[_, _]])
+    extends Serializable {
+
+    assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ 
nonPartitionColumns),
+      s"""
+         |All columns: ${allColumns.mkString(", ")}
+         |Partition columns: ${partitionColumns.mkString(", ")}
+         |Non-partition columns: ${nonPartitionColumns.mkString(", ")}
+       """.stripMargin)
+  }
+
+  /**
+   * Basic work flow of this command is:
+   * 1. Driver side setup, including output committer initialization and data 
source specific
+   *    preparation work for the write job to be issued.
+   * 2. Issues a write job consists of one or more executor side tasks, each 
of which writes all
+   *    rows within an RDD partition.
+   * 3. If no exception is thrown in a task, commits that task, otherwise 
aborts that task;  If any
+   *    exception is thrown during task commitment, also aborts that task.
+   * 4. If all tasks are committed, commit the job, otherwise aborts the job;  
If any exception is
+   *    thrown during job commitment, also aborts the job.
+   */
+  def write(
+      sparkSession: SparkSession,
+      plan: LogicalPlan,
+      fileFormat: FileFormat,
+      outputPath: Path,
+      hadoopConf: Configuration,
+      partitionColumns: Seq[Attribute],
+      bucketSpec: Option[BucketSpec],
+      refreshFunction: () => Unit,
+      options: Map[String, String],
+      isAppend: Boolean): Unit = {
+
+    val job = Job.getInstance(hadoopConf)
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    FileOutputFormat.setOutputPath(job, outputPath)
+
+    val partitionSet = AttributeSet(partitionColumns)
+    val dataColumns = plan.output.filterNot(partitionSet.contains)
+    val queryExecution = Dataset.ofRows(sparkSession, plan).queryExecution
+
+    // Note: prepareWrite has side effect. It sets "job".
+    val outputWriterFactory =
+      fileFormat.prepareWrite(sparkSession, job, options, 
dataColumns.toStructType)
+
+    val description = new WriteJobDescription(
+      serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
+      outputWriterFactory = outputWriterFactory,
+      allColumns = plan.output,
+      partitionColumns = partitionColumns,
+      nonPartitionColumns = dataColumns,
+      bucketSpec = bucketSpec,
+      isAppend = isAppend,
+      path = outputPath.toString,
+      outputFormatClass = job.getOutputFormatClass)
+
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+      // This call shouldn't be put into the `try` block below because it only 
initializes and
+      // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
+      val committer = setupDriverCommitter(job, outputPath.toString, isAppend)
+
+      try {
+        sparkSession.sparkContext.runJob(queryExecution.toRdd,
+          (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
+            executeTask(
+              description = description,
+              sparkStageId = taskContext.stageId(),
+              sparkPartitionId = taskContext.partitionId(),
+              sparkAttemptNumber = taskContext.attemptNumber(),
+              iterator = iter)
+          })
+
+        committer.commitJob(job)
+        logInfo(s"Job ${job.getJobID} committed.")
+        refreshFunction()
+      } catch { case cause: Throwable =>
+        logError(s"Aborting job ${job.getJobID}.", cause)
+        committer.abortJob(job, JobStatus.State.FAILED)
+        throw new SparkException("Job aborted.", cause)
+      }
+    }
+  }
+
+  /** Writes data out in a single Spark task. */
+  private def executeTask(
+      description: WriteJobDescription,
+      sparkStageId: Int,
+      sparkPartitionId: Int,
+      sparkAttemptNumber: Int,
+      iterator: Iterator[InternalRow]): Unit = {
+
+    val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
+    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+    // Set up the attempt context required to use in the output committer.
+    val taskAttemptContext: TaskAttemptContext = {
+      // Set up the configuration object
+      val hadoopConf = description.serializableHadoopConf.value
+      hadoopConf.set("mapred.job.id", jobId.toString)
+      hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+      hadoopConf.set("mapred.task.id", taskAttemptId.toString)
+      hadoopConf.setBoolean("mapred.task.is.map", true)
+      hadoopConf.setInt("mapred.task.partition", 0)
+
+      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+    }
+
+    val committer = newOutputCommitter(
+      description.outputFormatClass, taskAttemptContext, description.path, 
description.isAppend)
+    committer.setupTask(taskAttemptContext)
+
+    // Figure out where we need to write data to for staging.
+    // For FileOutputCommitter it has its own staging path called "work path".
+    val stagingPath = committer match {
+      case f: FileOutputCommitter => f.getWorkPath.toString
+      case _ => description.path
+    }
+
+    val writeTask =
+      if (description.partitionColumns.isEmpty && 
description.bucketSpec.isEmpty) {
+        new SingleDirectoryWriteTask(description, taskAttemptContext, 
stagingPath)
+      } else {
+        new DynamicPartitionWriteTask(description, taskAttemptContext, 
stagingPath)
+      }
+
+    try {
+      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+        // Execute the task to write rows out
+        writeTask.execute(iterator)
+        writeTask.releaseResources()
+
+        // Commit the task
+        SparkHadoopMapRedUtil.commitTask(committer, taskAttemptContext, 
jobId.getId, taskId.getId)
+      })(catchBlock = {
+        // If there is an error, release resource and then abort the task
+        try {
+          writeTask.releaseResources()
+        } finally {
+          committer.abortTask(taskAttemptContext)
+          logError(s"Job $jobId aborted.")
+        }
+      })
+    } catch {
+      case t: Throwable =>
+        throw new SparkException("Task failed while writing rows", t)
+    }
+  }
+
+  /**
+   * A simple trait for writing out data in a single Spark task, without any 
concerns about how
+   * to commit or abort tasks. Exceptions thrown by the implementation of this 
trait will
+   * automatically trigger task aborts.
+   */
+  private trait ExecuteWriteTask {
+    def execute(iterator: Iterator[InternalRow]): Unit
+    def releaseResources(): Unit
+  }
+
+  /** Writes data to a single directory (used for non-dynamic-partition 
writes). */
+  private class SingleDirectoryWriteTask(
+      description: WriteJobDescription,
+      taskAttemptContext: TaskAttemptContext,
+      stagingPath: String) extends ExecuteWriteTask {
+
+    private[this] var outputWriter: OutputWriter = {
+      val outputWriter = description.outputWriterFactory.newInstance(
+        path = stagingPath,
+        bucketId = None,
+        dataSchema = description.nonPartitionColumns.toStructType,
+        context = taskAttemptContext)
+      outputWriter.initConverter(dataSchema = 
description.nonPartitionColumns.toStructType)
+      outputWriter
+    }
+
+    override def execute(iter: Iterator[InternalRow]): Unit = {
+      while (iter.hasNext) {
+        val internalRow = iter.next()
+        outputWriter.writeInternal(internalRow)
+      }
+    }
+
+    override def releaseResources(): Unit = {
+      if (outputWriter != null) {
+        outputWriter.close()
+        outputWriter = null
+      }
+    }
+  }
+
+  /**
+   * Writes data to using dynamic partition writes, meaning this single 
function can write to
+   * multiple directories (partitions) or files (bucketing).
+   */
+  private class DynamicPartitionWriteTask(
+      description: WriteJobDescription,
+      taskAttemptContext: TaskAttemptContext,
+      stagingPath: String) extends ExecuteWriteTask {
+
+    // currentWriter is initialized whenever we see a new key
+    private var currentWriter: OutputWriter = _
+
+    private val bucketColumns: Seq[Attribute] = 
description.bucketSpec.toSeq.flatMap {
+      spec => spec.bucketColumnNames.map(c => 
description.allColumns.find(_.name == c).get)
+    }
+
+    private val sortColumns: Seq[Attribute] = 
description.bucketSpec.toSeq.flatMap {
+      spec => spec.sortColumnNames.map(c => description.allColumns.find(_.name 
== c).get)
+    }
+
+    private def bucketIdExpression: Option[Expression] = 
description.bucketSpec.map { spec =>
+      // Use `HashPartitioning.partitionIdExpression` as our bucket id 
expression, so that we can
+      // guarantee the data distribution is same between shuffle and bucketed 
data source, which
+      // enables us to only shuffle one side when join a bucketed table and a 
normal one.
+      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
+    }
+
+    /** Expressions that given a partition key build a string like: 
col1=val/col2=val/... */
+    private def partitionStringExpression: Seq[Expression] = {
+      description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
+        val escaped = ScalaUDF(
+          PartitioningUtils.escapePathName _,
+          StringType,
+          Seq(Cast(c, StringType)),
+          Seq(StringType))
+        val str = If(IsNull(c), 
Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
+        val partitionName = Literal(c.name + "=") :: str :: Nil
+        if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
+      }
+    }
+
+    private def getBucketIdFromKey(key: InternalRow): Option[Int] =
+      description.bucketSpec.map { _ => 
key.getInt(description.partitionColumns.length) }
+
+    /**
+     * Open and returns a new OutputWriter given a partition key and optional 
bucket id.
+     * If bucket id is specified, we will append it to the end of the file 
name, but before the
+     * file extension, e.g. 
part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
+     */
+    private def newOutputWriter(
+        key: InternalRow,
+        getPartitionString: UnsafeProjection): OutputWriter = {
+      val path =
+        if (description.partitionColumns.nonEmpty) {
+          val partitionPath = getPartitionString(key).getString(0)
+          new Path(stagingPath, partitionPath).toString
+        } else {
+          stagingPath
+        }
+      val bucketId = getBucketIdFromKey(key)
+
+      val newWriter = description.outputWriterFactory.newInstance(
+        path = path,
+        bucketId = bucketId,
+        dataSchema = description.nonPartitionColumns.toStructType,
+        context = taskAttemptContext)
+      newWriter.initConverter(description.nonPartitionColumns.toStructType)
+      newWriter
+    }
+
+    override def execute(iter: Iterator[InternalRow]): Unit = {
+      // We should first sort by partition columns, then bucket id, and 
finally sorting columns.
+      val sortingExpressions: Seq[Expression] =
+      description.partitionColumns ++ bucketIdExpression ++ sortColumns
+      val getSortingKey = UnsafeProjection.create(sortingExpressions, 
description.allColumns)
+
+      val sortingKeySchema = StructType(sortingExpressions.map {
+        case a: Attribute => StructField(a.name, a.dataType, a.nullable)
+        // The sorting expressions are all `Attribute` except bucket id.
+        case _ => StructField("bucketId", IntegerType, nullable = false)
+      })
+
+      // Returns the data columns to be written given an input row
+      val getOutputRow = UnsafeProjection.create(
+        description.nonPartitionColumns, description.allColumns)
+
+      // Returns the partition path given a partition key.
+      val getPartitionString =
+      UnsafeProjection.create(Seq(Concat(partitionStringExpression)), 
description.partitionColumns)
+
+      // Sorts the data before write, so that we only need one writer at the 
same time.
+      val sorter = new UnsafeKVExternalSorter(
+        sortingKeySchema,
+        StructType.fromAttributes(description.nonPartitionColumns),
+        SparkEnv.get.blockManager,
+        SparkEnv.get.serializerManager,
+        TaskContext.get().taskMemoryManager().pageSizeBytes,
+        
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
+
+      while (iter.hasNext) {
+        val currentRow = iter.next()
+        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
+      }
+      logInfo(s"Sorting complete. Writing out partition files one at a time.")
+
+      val getBucketingKey: InternalRow => InternalRow = if 
(sortColumns.isEmpty) {
+        identity
+      } else {
+        
UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map
 {
+          case (expr, ordinal) => BoundReference(ordinal, expr.dataType, 
expr.nullable)
+        })
+      }
+
+      val sortedIterator = sorter.sortedIterator()
+
+      // If anything below fails, we should abort the task.
+      var currentKey: UnsafeRow = null
+      while (sortedIterator.next()) {
+        val nextKey = 
getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
+        if (currentKey != nextKey) {
+          if (currentWriter != null) {
+            currentWriter.close()
+            currentWriter = null
+          }
+          currentKey = nextKey.copy()
+          logDebug(s"Writing partition: $currentKey")
+
+          currentWriter = newOutputWriter(currentKey, getPartitionString)
+        }
+        currentWriter.writeInternal(sortedIterator.getValue)
+      }
+      if (currentWriter != null) {
+        currentWriter.close()
+        currentWriter = null
+      }
+    }
+
+    override def releaseResources(): Unit = {
+      if (currentWriter != null) {
+        currentWriter.close()
+        currentWriter = null
+      }
+    }
+  }
+
+  private def setupDriverCommitter(job: Job, path: String, isAppend: Boolean): 
OutputCommitter = {
+    // Setup IDs
+    val jobId = SparkHadoopWriter.createJobID(new Date, 0)
+    val taskId = new TaskID(jobId, TaskType.MAP, 0)
+    val taskAttemptId = new TaskAttemptID(taskId, 0)
+
+    // Set up the configuration object
+    job.getConfiguration.set("mapred.job.id", jobId.toString)
+    job.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+    job.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
+    job.getConfiguration.setBoolean("mapred.task.is.map", true)
+    job.getConfiguration.setInt("mapred.task.partition", 0)
+
+    // This UUID is sent to executor side together with the serialized 
`Configuration` object within
+    // the `Job` instance.  `OutputWriters` on the executor side should use 
this UUID to generate
+    // unique task output files.
+    // This UUID is used to avoid output file name collision between different 
appending write jobs.
+    // These jobs may belong to different SparkContext instances. Concrete 
data source
+    // implementations may use this UUID to generate unique file names (e.g.,
+    // `part-r-<task-id>-<job-uuid>.parquet`). The reason why this ID is used 
to identify a job
+    // rather than a single task output file is that, speculative tasks must 
generate the same
+    // output file name as the original task.
+    job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, 
UUID.randomUUID().toString)
+
+    val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, 
taskAttemptId)
+    val outputCommitter = newOutputCommitter(
+      job.getOutputFormatClass, taskAttemptContext, path, isAppend)
+    outputCommitter.setupJob(job)
+    outputCommitter
+  }
+
+  private def newOutputCommitter(
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      context: TaskAttemptContext,
+      path: String,
+      isAppend: Boolean): OutputCommitter = {
+    val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
+
+    if (isAppend) {
+      // If we are appending data to an existing dir, we will only use the 
output committer
+      // associated with the file output format since it is not safe to use a 
custom
+      // committer for appending. For example, in S3, direct parquet output 
committer may
+      // leave partial data in the destination dir when the appending job 
fails.
+      // See SPARK-8578 for more details
+      logInfo(
+        s"Using default output committer 
${defaultOutputCommitter.getClass.getCanonicalName} " +
+          "for appending.")
+      defaultOutputCommitter
+    } else {
+      val configuration = context.getConfiguration
+      val clazz =
+        configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, 
classOf[OutputCommitter])
+
+      if (clazz != null) {
+        logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
+
+        // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
+        // has an associated output committer. To override this output 
committer,
+        // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
+        // If a data source needs to override the output committer, it needs 
to set the
+        // output committer in prepareForWrite method.
+        if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
+          // The specified output committer is a FileOutputCommitter.
+          // So, we will use the FileOutputCommitter-specified constructor.
+          val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
+          ctor.newInstance(new Path(path), context)
+        } else {
+          // The specified output committer is just an OutputCommitter.
+          // So, we will use the no-argument constructor.
+          val ctor = clazz.getDeclaredConstructor()
+          ctor.newInstance()
+        }
+      } else {
+        // If output committer class is not set, we will use the one 
associated with the
+        // file output format.
+        logInfo(
+          s"Using output committer class 
${defaultOutputCommitter.getClass.getCanonicalName}")
+        defaultOutputCommitter
+      }
+    }
+  }
+}
+
+object WriterContainer {
+  val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
deleted file mode 100644
index 253aa44..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.util.{Date, UUID}
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => 
MapReduceFileOutputCommitter}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-
-
-/** A container for all the details required when writing to a table. */
-private[datasources] case class WriteRelation(
-    sparkSession: SparkSession,
-    dataSchema: StructType,
-    path: String,
-    prepareJobForWrite: Job => OutputWriterFactory,
-    bucketSpec: Option[BucketSpec])
-
-object WriterContainer {
-  val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
-}
-
-private[datasources] abstract class BaseWriterContainer(
-    @transient val relation: WriteRelation,
-    @transient private val job: Job,
-    isAppend: Boolean)
-  extends Logging with Serializable {
-
-  protected val dataSchema = relation.dataSchema
-
-  protected val serializableConf =
-    new SerializableConfiguration(job.getConfiguration)
-
-  // This UUID is used to avoid output file name collision between different 
appending write jobs.
-  // These jobs may belong to different SparkContext instances. Concrete data 
source implementations
-  // may use this UUID to generate unique file names (e.g., 
`part-r-<task-id>-<job-uuid>.parquet`).
-  //  The reason why this ID is used to identify a job rather than a single 
task output file is
-  // that, speculative tasks must generate the same output file name as the 
original task.
-  private val uniqueWriteJobId = UUID.randomUUID()
-
-  // This is only used on driver side.
-  @transient private val jobContext: JobContext = job
-
-  // The following fields are initialized and used on both driver and executor 
side.
-  @transient protected var outputCommitter: OutputCommitter = _
-  @transient private var jobId: JobID = _
-  @transient private var taskId: TaskID = _
-  @transient private var taskAttemptId: TaskAttemptID = _
-  @transient protected var taskAttemptContext: TaskAttemptContext = _
-
-  protected val outputPath: String = relation.path
-
-  protected var outputWriterFactory: OutputWriterFactory = _
-
-  private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
-
-  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit
-
-  def driverSideSetup(): Unit = {
-    setupIDs(0, 0, 0)
-    setupConf()
-
-    // This UUID is sent to executor side together with the serialized 
`Configuration` object within
-    // the `Job` instance.  `OutputWriters` on the executor side should use 
this UUID to generate
-    // unique task output files.
-    job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, 
uniqueWriteJobId.toString)
-
-    // Order of the following two lines is important.  For Hadoop 1, 
TaskAttemptContext constructor
-    // clones the Configuration object passed in.  If we initialize the 
TaskAttemptContext first,
-    // configurations made in prepareJobForWrite(job) are not populated into 
the TaskAttemptContext.
-    //
-    // Also, the `prepareJobForWrite` call must happen before initializing 
output format and output
-    // committer, since their initialization involve the job configuration, 
which can be potentially
-    // decorated in `prepareJobForWrite`.
-    outputWriterFactory = relation.prepareJobForWrite(job)
-    taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, 
taskAttemptId)
-
-    outputFormatClass = job.getOutputFormatClass
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupJob(jobContext)
-  }
-
-  def executorSideSetup(taskContext: TaskContext): Unit = {
-    setupIDs(taskContext.stageId(), taskContext.partitionId(), 
taskContext.attemptNumber())
-    setupConf()
-    taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, 
taskAttemptId)
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupTask(taskAttemptContext)
-  }
-
-  protected def getWorkPath: String = {
-    outputCommitter match {
-      // FileOutputCommitter writes to a temporary location returned by 
`getWorkPath`.
-      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
-      case _ => outputPath
-    }
-  }
-
-  protected def newOutputWriter(path: String, bucketId: Option[Int] = None): 
OutputWriter = {
-    try {
-      outputWriterFactory.newInstance(path, bucketId, dataSchema, 
taskAttemptContext)
-    } catch {
-      case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
-        if (outputCommitter.getClass.getName.contains("Direct")) {
-          // SPARK-11382: DirectParquetOutputCommitter is not idempotent, 
meaning on retry
-          // attempts, the task will fail because the output file is created 
from a prior attempt.
-          // This often means the most visible error to the user is 
misleading. Augment the error
-          // to tell the user to look for the actual error.
-          throw new SparkException("The output file already exists but this 
could be due to a " +
-            "failure from an earlier attempt. Look through the earlier logs or 
stage page for " +
-            "the first error.\n  File exists error: " + e, e)
-        } else {
-          throw e
-        }
-    }
-  }
-
-  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
-    val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
-
-    if (isAppend) {
-      // If we are appending data to an existing dir, we will only use the 
output committer
-      // associated with the file output format since it is not safe to use a 
custom
-      // committer for appending. For example, in S3, direct parquet output 
committer may
-      // leave partial data in the destination dir when the appending job 
fails.
-      //
-      // See SPARK-8578 for more details
-      logInfo(
-        s"Using default output committer 
${defaultOutputCommitter.getClass.getCanonicalName} " +
-          "for appending.")
-      defaultOutputCommitter
-    } else {
-      val configuration = context.getConfiguration
-      val committerClass = configuration.getClass(
-        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
-      Option(committerClass).map { clazz =>
-        logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
-
-        // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
-        // has an associated output committer. To override this output 
committer,
-        // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
-        // If a data source needs to override the output committer, it needs 
to set the
-        // output committer in prepareForWrite method.
-        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-          // The specified output committer is a FileOutputCommitter.
-          // So, we will use the FileOutputCommitter-specified constructor.
-          val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
-          ctor.newInstance(new Path(outputPath), context)
-        } else {
-          // The specified output committer is just an OutputCommitter.
-          // So, we will use the no-argument constructor.
-          val ctor = clazz.getDeclaredConstructor()
-          ctor.newInstance()
-        }
-      }.getOrElse {
-        // If output committer class is not set, we will use the one 
associated with the
-        // file output format.
-        logInfo(
-          s"Using output committer class 
${defaultOutputCommitter.getClass.getCanonicalName}")
-        defaultOutputCommitter
-      }
-    }
-  }
-
-  private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
-    this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
-    this.taskId = new TaskID(this.jobId, TaskType.MAP, splitId)
-    this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
-  }
-
-  private def setupConf(): Unit = {
-    serializableConf.value.set("mapred.job.id", jobId.toString)
-    serializableConf.value.set("mapred.tip.id", 
taskAttemptId.getTaskID.toString)
-    serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
-    serializableConf.value.setBoolean("mapred.task.is.map", true)
-    serializableConf.value.setInt("mapred.task.partition", 0)
-  }
-
-  def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, 
jobId.getId, taskId.getId)
-  }
-
-  def abortTask(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortTask(taskAttemptContext)
-    }
-    logError(s"Task attempt $taskAttemptId aborted.")
-  }
-
-  def commitJob(): Unit = {
-    outputCommitter.commitJob(jobContext)
-    logInfo(s"Job $jobId committed.")
-  }
-
-  def abortJob(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
-    }
-    logError(s"Job $jobId aborted.")
-  }
-}
-
-/**
- * A writer that writes all of the rows in a partition to a single file.
- */
-private[datasources] class DefaultWriterContainer(
-    relation: WriteRelation,
-    job: Job,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
-    executorSideSetup(taskContext)
-    var writer = newOutputWriter(getWorkPath)
-    writer.initConverter(dataSchema)
-
-    // If anything below fails, we should abort the task.
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks {
-        while (iterator.hasNext) {
-          val internalRow = iterator.next()
-          writer.writeInternal(internalRow)
-        }
-        commitTask()
-      }(catchBlock = abortTask())
-    } catch {
-      case t: Throwable =>
-        throw new SparkException("Task failed while writing rows", t)
-    }
-
-    def commitTask(): Unit = {
-      try {
-        if (writer != null) {
-          writer.close()
-          writer = null
-        }
-        super.commitTask()
-      } catch {
-        case cause: Throwable =>
-          // This exception will be handled in 
`InsertIntoHadoopFsRelation.insert$writeRows`, and
-          // will cause `abortTask()` to be invoked.
-          throw new RuntimeException("Failed to commit task", cause)
-      }
-    }
-
-    def abortTask(): Unit = {
-      try {
-        if (writer != null) {
-          writer.close()
-        }
-      } finally {
-        super.abortTask()
-      }
-    }
-  }
-}
-
-/**
- * A writer that dynamically opens files based on the given partition columns. 
 Internally this is
- * done by maintaining a HashMap of open files until `maxFiles` is reached.  
If this occurs, the
- * writer externally sorts the remaining rows and then writes out them out one 
file at a time.
- */
-private[datasources] class DynamicPartitionWriterContainer(
-    relation: WriteRelation,
-    job: Job,
-    partitionColumns: Seq[Attribute],
-    dataColumns: Seq[Attribute],
-    inputSchema: Seq[Attribute],
-    defaultPartitionName: String,
-    maxOpenFiles: Int,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  private val bucketSpec = relation.bucketSpec
-
-  private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
-    spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
-  }
-
-  private val sortColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
-    spec => spec.sortColumnNames.map(c => inputSchema.find(_.name == c).get)
-  }
-
-  private def bucketIdExpression: Option[Expression] = bucketSpec.map { spec =>
-    // Use `HashPartitioning.partitionIdExpression` as our bucket id 
expression, so that we can
-    // guarantee the data distribution is same between shuffle and bucketed 
data source, which
-    // enables us to only shuffle one side when join a bucketed table and a 
normal one.
-    HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
-  }
-
-  // Expressions that given a partition key build a string like: 
col1=val/col2=val/...
-  private def partitionStringExpression: Seq[Expression] = {
-    partitionColumns.zipWithIndex.flatMap { case (c, i) =>
-      val escaped =
-        ScalaUDF(
-          PartitioningUtils.escapePathName _,
-          StringType,
-          Seq(Cast(c, StringType)),
-          Seq(StringType))
-      val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
-      val partitionName = Literal(c.name + "=") :: str :: Nil
-      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
-    }
-  }
-
-  private def getBucketIdFromKey(key: InternalRow): Option[Int] = 
bucketSpec.map { _ =>
-    key.getInt(partitionColumns.length)
-  }
-
-  /**
-   * Open and returns a new OutputWriter given a partition key and optional 
bucket id.
-   * If bucket id is specified, we will append it to the end of the file name, 
but before the
-   * file extension, e.g. 
part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
-   */
-  private def newOutputWriter(
-      key: InternalRow,
-      getPartitionString: UnsafeProjection): OutputWriter = {
-    val path = if (partitionColumns.nonEmpty) {
-      val partitionPath = getPartitionString(key).getString(0)
-      new Path(getWorkPath, partitionPath).toString
-    } else {
-      getWorkPath
-    }
-    val bucketId = getBucketIdFromKey(key)
-    val newWriter = super.newOutputWriter(path, bucketId)
-    newWriter.initConverter(dataSchema)
-    newWriter
-  }
-
-  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
-    executorSideSetup(taskContext)
-
-    // We should first sort by partition columns, then bucket id, and finally 
sorting columns.
-    val sortingExpressions: Seq[Expression] = partitionColumns ++ 
bucketIdExpression ++ sortColumns
-    val getSortingKey = UnsafeProjection.create(sortingExpressions, 
inputSchema)
-
-    val sortingKeySchema = StructType(sortingExpressions.map {
-      case a: Attribute => StructField(a.name, a.dataType, a.nullable)
-      // The sorting expressions are all `Attribute` except bucket id.
-      case _ => StructField("bucketId", IntegerType, nullable = false)
-    })
-
-    // Returns the data columns to be written given an input row
-    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
-
-    // Returns the partition path given a partition key.
-    val getPartitionString =
-      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, 
partitionColumns)
-
-    // Sorts the data before write, so that we only need one writer at the 
same time.
-    // TODO: inject a local sort operator in planning.
-    val sorter = new UnsafeKVExternalSorter(
-      sortingKeySchema,
-      StructType.fromAttributes(dataColumns),
-      SparkEnv.get.blockManager,
-      SparkEnv.get.serializerManager,
-      TaskContext.get().taskMemoryManager().pageSizeBytes,
-      
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
-
-    while (iterator.hasNext) {
-      val currentRow = iterator.next()
-      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
-    }
-    logInfo(s"Sorting complete. Writing out partition files one at a time.")
-
-    val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) 
{
-      identity
-    } else {
-      
UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map
 {
-        case (expr, ordinal) => BoundReference(ordinal, expr.dataType, 
expr.nullable)
-      })
-    }
-
-    val sortedIterator = sorter.sortedIterator()
-
-    // If anything below fails, we should abort the task.
-    var currentWriter: OutputWriter = null
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks {
-        var currentKey: UnsafeRow = null
-        while (sortedIterator.next()) {
-          val nextKey = 
getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
-          if (currentKey != nextKey) {
-            if (currentWriter != null) {
-              currentWriter.close()
-              currentWriter = null
-            }
-            currentKey = nextKey.copy()
-            logDebug(s"Writing partition: $currentKey")
-
-            currentWriter = newOutputWriter(currentKey, getPartitionString)
-          }
-          currentWriter.writeInternal(sortedIterator.getValue)
-        }
-        if (currentWriter != null) {
-          currentWriter.close()
-          currentWriter = null
-        }
-
-        commitTask()
-      }(catchBlock = {
-        if (currentWriter != null) {
-          currentWriter.close()
-        }
-        abortTask()
-      })
-    } catch {
-      case t: Throwable =>
-        throw new SparkException("Task failed while writing rows", t)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f313117b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8afd39d..9061b1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -339,13 +339,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val PARTITION_MAX_FILES =
-    SQLConfigBuilder("spark.sql.sources.maxConcurrentWrites")
-      .doc("The maximum number of concurrent files to open before falling back 
on sorting when " +
-            "writing out files using dynamic partitioning.")
-      .intConf
-      .createWithDefault(1)
-
   val BUCKETING_ENABLED = 
SQLConfigBuilder("spark.sql.sources.bucketing.enabled")
     .doc("When false, we will treat bucketed table as normal table")
     .booleanConf
@@ -733,8 +726,6 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
   def partitionColumnTypeInferenceEnabled: Boolean =
     getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
 
-  def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES)
-
   def parallelPartitionDiscoveryThreshold: Int =
     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to