Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10498#discussion_r48889469
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 ---
    @@ -451,3 +457,147 @@ private[sql] class DynamicPartitionWriterContainer(
         }
       }
     }
    +
    +/**
    + * A writer that dynamically opens files based on the given partition 
columns.  Internally this is
    + * done by maintaining a HashMap of open files until `maxFiles` is 
reached.  If this occurs, the
    + * writer externally sorts the remaining rows and then writes out them out 
one file at a time.
    + */
    +private[sql] class BucketedPartitionWriterContainer(
    +    relation: BucketedHadoopFsRelation,
    +    job: Job,
    +    partitionColumns: Seq[Attribute],
    +    bucketSpec: BucketSpec,
    +    dataColumns: Seq[Attribute],
    +    inputSchema: Seq[Attribute],
    +    defaultPartitionName: String,
    +    isAppend: Boolean)
    +  extends BaseWriterContainer(relation, job, isAppend) {
    +
    +  def writeRows(taskContext: TaskContext, iterator: 
Iterator[InternalRow]): Unit = {
    +    executorSideSetup(taskContext)
    +
    +    val bucketColumns = bucketSpec.bucketColumnNames.map(c => 
inputSchema.find(_.name == c).get)
    +    val numBuckets = bucketSpec.numBuckets
    +    val sortColumns = bucketSpec.sortColumnNames.map(c => 
inputSchema.find(_.name == c).get)
    +    val bucketIdExpr = Pmod(new Murmur3Hash(bucketColumns), 
Literal(numBuckets))
    +
    +    val getSortingKey =
    +      UnsafeProjection.create((partitionColumns :+ bucketIdExpr) ++ 
sortColumns, inputSchema)
    +
    +    val sortingKeySchema = {
    +      val fields = StructType.fromAttributes(partitionColumns)
    +        .add("bucketId", IntegerType, nullable = false) ++
    +        StructType.fromAttributes(sortColumns)
    +      StructType(fields)
    +    }
    +
    +    // Returns the data columns to be written given an input row
    +    val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
    +
    +    // Expressions that given a partition key build a string like: 
col1=val/col2=val/...
    +    val partitionStringExpression = partitionColumns.zipWithIndex.flatMap 
{ case (c, i) =>
    +      val escaped =
    +        ScalaUDF(
    +          PartitioningUtils.escapePathName _,
    +          StringType,
    +          Seq(Cast(c, StringType)),
    +          Seq(StringType))
    +      val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
    +      val partitionName = Literal(c.name + "=") :: str :: Nil
    +      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: 
partitionName
    +    }
    +
    +    // Returns the partition path given a partition key.
    +    val getPartitionString =
    +      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, 
partitionColumns)
    +
    +    // If anything below fails, we should abort the task.
    +    try {
    +      // TODO: remove duplicated code.
    +      // TODO: if sorting columns are empty, we can keep all writers in a 
hash map and avoid sorting
    +      // here.
    +      val sorter = new UnsafeKVExternalSorter(
    +        sortingKeySchema,
    +        StructType.fromAttributes(dataColumns),
    +        SparkEnv.get.blockManager,
    +        TaskContext.get().taskMemoryManager().pageSizeBytes)
    +
    +      while (iterator.hasNext) {
    +        val currentRow = iterator.next()
    +        sorter.insertKV(getSortingKey(currentRow), 
getOutputRow(currentRow))
    +      }
    +
    +      logInfo(s"Sorting complete. Writing out partition files one at a 
time.")
    +
    +      def sameBucket(row1: InternalRow, row2: InternalRow): Boolean = {
    +        if (row1.getInt(partitionColumns.length) != 
row2.getInt(partitionColumns.length)) {
    +          false
    +        } else {
    +          var i = partitionColumns.length - 1
    +          while (i >= 0) {
    +            val dt = partitionColumns(i).dataType
    +            if (row1.get(i, dt) != row2.get(i, dt)) return false
    +            i -= 1
    +          }
    +          true
    +        }
    +      }
    +      val sortedIterator = sorter.sortedIterator()
    +      var currentKey: InternalRow = null
    +      var currentWriter: OutputWriter = null
    +      try {
    +        while (sortedIterator.next()) {
    +          if (currentKey == null || !sameBucket(currentKey, 
sortedIterator.getKey)) {
    +            if (currentWriter != null) {
    +              currentWriter.close()
    +            }
    +            currentKey = sortedIterator.getKey.copy()
    +            logDebug(s"Writing partition: $currentKey")
    +            currentWriter = newOutputWriter(currentKey)
    +          }
    +          currentWriter.writeInternal(sortedIterator.getValue)
    +        }
    +      } finally {
    +        if (currentWriter != null) { currentWriter.close() }
    +      }
    +
    +      commitTask()
    +    } catch {
    +      case cause: Throwable =>
    +        logError("Aborting task.", cause)
    +        abortTask()
    +        throw new SparkException("Task failed while writing rows.", cause)
    +    }
    +
    +    /** Open and returns a new OutputWriter given a partition key. */
    --- End diff --
    
    This comment is not accurate. Do you explain the on disk file naming 
anywhere? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to