Repository: spark Updated Branches: refs/heads/master 3b0e44490 -> 5eb89f67e
[SPARK-9577][SQL] Surface concrete iterator types in various sort classes. We often return abstract iterator types in various sort-related classes (e.g. UnsafeKVExternalSorter). It is actually better to return a more concrete type, so the callsite uses that type and JIT can inline the iterator calls. Author: Reynold Xin <r...@databricks.com> Closes #7911 from rxin/surface-concrete-type and squashes the following commits: 0422add [Reynold Xin] [SPARK-9577][SQL] Surface concrete iterator types in various sort classes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5eb89f67 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5eb89f67 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5eb89f67 Branch: refs/heads/master Commit: 5eb89f67e323dcf9fa3d5b30f9b5cb8f10ca1e8c Parents: 3b0e444 Author: Reynold Xin <r...@databricks.com> Authored: Mon Aug 3 18:47:02 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Aug 3 18:47:02 2015 -0700 ---------------------------------------------------------------------- .../unsafe/sort/UnsafeExternalSorter.java | 2 +- .../unsafe/sort/UnsafeInMemorySorter.java | 6 +- .../sql/execution/UnsafeKVExternalSorter.java | 112 ++++++++++--------- .../UnsafeHybridAggregationIterator.scala | 30 +---- 4 files changed, 65 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5eb89f67/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index bf5f965..dec7fcf 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -428,7 +428,7 @@ public final class UnsafeExternalSorter { public UnsafeSorterIterator getSortedIterator() throws IOException { assert(inMemSorter != null); - final UnsafeSorterIterator inMemoryIterator = inMemSorter.getSortedIterator(); + final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator(); int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0); if (spillWriters.isEmpty()) { return inMemoryIterator; http://git-wip-us.apache.org/repos/asf/spark/blob/5eb89f67/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 3131465..1e4b8a1 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -133,7 +133,7 @@ public final class UnsafeInMemorySorter { pointerArrayInsertPosition++; } - private static final class SortedIterator extends UnsafeSorterIterator { + public static final class SortedIterator extends UnsafeSorterIterator { private final TaskMemoryManager memoryManager; private final int sortBufferInsertPosition; @@ -144,7 +144,7 @@ public final class UnsafeInMemorySorter { private long keyPrefix; private int recordLength; - SortedIterator( + private SortedIterator( TaskMemoryManager memoryManager, int sortBufferInsertPosition, long[] sortBuffer) { @@ -186,7 +186,7 @@ public final class UnsafeInMemorySorter { * Return an iterator over record pointers in sorted order. For efficiency, all calls to * {@code next()} will return the same mutable object. */ - public UnsafeSorterIterator getSortedIterator() { + public SortedIterator getSortedIterator() { sorter.sort(pointerArray, 0, pointerArrayInsertPosition / 2, sortComparator); return new SortedIterator(memoryManager, pointerArrayInsertPosition, pointerArray); } http://git-wip-us.apache.org/repos/asf/spark/blob/5eb89f67/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index f6b0176..312ec8e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -134,7 +134,7 @@ public final class UnsafeKVExternalSorter { value.getBaseObject(), value.getBaseOffset(), value.getSizeInBytes(), prefix); } - public KVIterator<UnsafeRow, UnsafeRow> sortedIterator() throws IOException { + public KVSorterIterator sortedIterator() throws IOException { try { final UnsafeSorterIterator underlying = sorter.getSortedIterator(); if (!underlying.hasNext()) { @@ -142,58 +142,7 @@ public final class UnsafeKVExternalSorter { // here in order to prevent memory leaks. cleanupResources(); } - - return new KVIterator<UnsafeRow, UnsafeRow>() { - private UnsafeRow key = new UnsafeRow(); - private UnsafeRow value = new UnsafeRow(); - private int numKeyFields = keySchema.size(); - private int numValueFields = valueSchema.size(); - - @Override - public boolean next() throws IOException { - try { - if (underlying.hasNext()) { - underlying.loadNext(); - - Object baseObj = underlying.getBaseObject(); - long recordOffset = underlying.getBaseOffset(); - int recordLen = underlying.getRecordLength(); - - // Note that recordLen = keyLen + valueLen + 4 bytes (for the keyLen itself) - int keyLen = PlatformDependent.UNSAFE.getInt(baseObj, recordOffset); - int valueLen = recordLen - keyLen - 4; - - key.pointTo(baseObj, recordOffset + 4, numKeyFields, keyLen); - value.pointTo(baseObj, recordOffset + 4 + keyLen, numValueFields, valueLen); - - return true; - } else { - key = null; - value = null; - cleanupResources(); - return false; - } - } catch (IOException e) { - cleanupResources(); - throw e; - } - } - - @Override - public UnsafeRow getKey() { - return key; - } - - @Override - public UnsafeRow getValue() { - return value; - } - - @Override - public void close() { - cleanupResources(); - } - }; + return new KVSorterIterator(underlying); } catch (IOException e) { cleanupResources(); throw e; @@ -233,4 +182,61 @@ public final class UnsafeKVExternalSorter { return ordering.compare(row1, row2); } } + + public class KVSorterIterator extends KVIterator<UnsafeRow, UnsafeRow> { + private UnsafeRow key = new UnsafeRow(); + private UnsafeRow value = new UnsafeRow(); + private final int numKeyFields = keySchema.size(); + private final int numValueFields = valueSchema.size(); + private final UnsafeSorterIterator underlying; + + private KVSorterIterator(UnsafeSorterIterator underlying) { + this.underlying = underlying; + } + + @Override + public boolean next() throws IOException { + try { + if (underlying.hasNext()) { + underlying.loadNext(); + + Object baseObj = underlying.getBaseObject(); + long recordOffset = underlying.getBaseOffset(); + int recordLen = underlying.getRecordLength(); + + // Note that recordLen = keyLen + valueLen + 4 bytes (for the keyLen itself) + int keyLen = PlatformDependent.UNSAFE.getInt(baseObj, recordOffset); + int valueLen = recordLen - keyLen - 4; + + key.pointTo(baseObj, recordOffset + 4, numKeyFields, keyLen); + value.pointTo(baseObj, recordOffset + 4 + keyLen, numValueFields, valueLen); + + return true; + } else { + key = null; + value = null; + cleanupResources(); + return false; + } + } catch (IOException e) { + cleanupResources(); + throw e; + } + } + + @Override + public UnsafeRow getKey() { + return key; + } + + @Override + public UnsafeRow getValue() { + return value; + } + + @Override + public void close() { + cleanupResources(); + } + }; } http://git-wip-us.apache.org/repos/asf/spark/blob/5eb89f67/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala index 37d34eb..b465787 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UnsafeHybridAggregationIterator.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.execution.aggregate -import org.apache.spark.sql.execution.{UnsafeKeyValueSorter, UnsafeFixedWidthAggregationMap} import org.apache.spark.unsafe.KVIterator import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.execution.{UnsafeKVExternalSorter, UnsafeFixedWidthAggregationMap} import org.apache.spark.sql.types.StructType /** @@ -230,7 +230,7 @@ class UnsafeHybridAggregationIterator( } // Step 5: Get the sorted iterator from the externalSorter. - val sortedKVIterator: KVIterator[UnsafeRow, UnsafeRow] = externalSorter.sortedIterator() + val sortedKVIterator: UnsafeKVExternalSorter#KVSorterIterator = externalSorter.sortedIterator() // Step 6: We now create a SortBasedAggregationIterator based on sortedKVIterator. // For a aggregate function with mode Partial, its mode in the SortBasedAggregationIterator @@ -368,31 +368,5 @@ object UnsafeHybridAggregationIterator { newMutableProjection, outputsUnsafeRows) } - - def createFromKVIterator( - groupingKeyAttributes: Seq[Attribute], - valueAttributes: Seq[Attribute], - inputKVIterator: KVIterator[UnsafeRow, InternalRow], - nonCompleteAggregateExpressions: Seq[AggregateExpression2], - nonCompleteAggregateAttributes: Seq[Attribute], - completeAggregateExpressions: Seq[AggregateExpression2], - completeAggregateAttributes: Seq[Attribute], - initialInputBufferOffset: Int, - resultExpressions: Seq[NamedExpression], - newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection), - outputsUnsafeRows: Boolean): UnsafeHybridAggregationIterator = { - new UnsafeHybridAggregationIterator( - groupingKeyAttributes, - valueAttributes, - inputKVIterator, - nonCompleteAggregateExpressions, - nonCompleteAggregateAttributes, - completeAggregateExpressions, - completeAggregateAttributes, - initialInputBufferOffset, - resultExpressions, - newMutableProjection, - outputsUnsafeRows) - } // scalastyle:on } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org