Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1499#discussion_r15556144
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -0,0 +1,667 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.collection
    +
    +import java.io._
    +import java.util.Comparator
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.collection.mutable
    +
    +import com.google.common.io.ByteStreams
    +
    +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
    +import org.apache.spark.serializer.Serializer
    +import org.apache.spark.storage.BlockId
    +
    +/**
    + * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
    + * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
    + * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
    + * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
    + *
    + * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
    + *
    + * @param aggregator optional Aggregator with combine functions to use for 
merging data
    + * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
    + * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
    + * @param serializer serializer to use when spilling to disk
    + *
    + * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
    + * want the output keys to be sorted. In a map task without map-side 
combine for example, you
    + * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
    + * want to do combining, having an Ordering is more efficient than not 
having it.
    + *
    + * At a high level, this class works as follows:
    + *
    + * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
    + *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
    + *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
    + *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
    + *
    + * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
    + *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
    + *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
    + *   don't have to write out the partition ID for every element.
    + *
    + * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
    + *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
    + *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
    + *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
    + *   for equality to merge values.
    + *
    + * - Users are expected to call stop() at the end to delete all the 
intermediate files.
    + */
    +private[spark] class ExternalSorter[K, V, C](
    +    aggregator: Option[Aggregator[K, V, C]] = None,
    +    partitioner: Option[Partitioner] = None,
    +    ordering: Option[Ordering[K]] = None,
    +    serializer: Option[Serializer] = None) extends Logging {
    +
    +  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
    +  private val shouldPartition = numPartitions > 1
    +
    +  private val blockManager = SparkEnv.get.blockManager
    +  private val diskBlockManager = blockManager.diskBlockManager
    +  private val ser = Serializer.getSerializer(serializer)
    +  private val serInstance = ser.newInstance()
    +
    +  private val conf = SparkEnv.get.conf
    +  private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", 
true)
    +  private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 
100) * 1024
    +  private val serializerBatchSize = 
conf.getLong("spark.shuffle.spill.batchSize", 10000)
    --- End diff --
    
    would be great to comment on why we are doing this batching (to avoid hash 
map growing too large right?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to