Repository: spark Updated Branches: refs/heads/master dbd90e544 -> bfc5569a5
[SPARK-26289][CORE] cleanup enablePerfMetrics parameter from BytesToBytesMap ## What changes were proposed in this pull request? `enablePerfMetrics `was originally designed in `BytesToBytesMap `to control `getNumHashCollisions getTimeSpentResizingNs getAverageProbesPerLookup`. However, as the Spark version gradual progress. this parameter is only used for `getAverageProbesPerLookup ` and always given to true when using `BytesToBytesMap`. it is also dangerous to determine whether `getAverageProbesPerLookup `opens and throws an `IllegalStateException `exception. So this pr will be remove `enablePerfMetrics `parameter from `BytesToBytesMap`. thanks. ## How was this patch tested? the existed test cases. Closes #23244 from heary-cao/enablePerfMetrics. Authored-by: caoxuewen <cao.xue...@zte.com.cn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfc5569a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfc5569a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfc5569a Branch: refs/heads/master Commit: bfc5569a53510bc75c15384084ff89b418592875 Parents: dbd90e5 Author: caoxuewen <cao.xue...@zte.com.cn> Authored: Fri Dec 7 09:57:35 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Dec 7 09:57:35 2018 +0800 ---------------------------------------------------------------------- .../spark/unsafe/map/BytesToBytesMap.java | 33 +++++--------------- .../map/AbstractBytesToBytesMapSuite.java | 4 +-- .../UnsafeFixedWidthAggregationMap.java | 2 +- .../sql/execution/joins/HashedRelation.scala | 6 ++-- 4 files changed, 12 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index a4e8859..405e529 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -159,11 +159,9 @@ public final class BytesToBytesMap extends MemoryConsumer { */ private final Location loc; - private final boolean enablePerfMetrics; + private long numProbes = 0L; - private long numProbes = 0; - - private long numKeyLookups = 0; + private long numKeyLookups = 0L; private long peakMemoryUsedBytes = 0L; @@ -180,8 +178,7 @@ public final class BytesToBytesMap extends MemoryConsumer { SerializerManager serializerManager, int initialCapacity, double loadFactor, - long pageSizeBytes, - boolean enablePerfMetrics) { + long pageSizeBytes) { super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode()); this.taskMemoryManager = taskMemoryManager; this.blockManager = blockManager; @@ -189,7 +186,6 @@ public final class BytesToBytesMap extends MemoryConsumer { this.loadFactor = loadFactor; this.loc = new Location(); this.pageSizeBytes = pageSizeBytes; - this.enablePerfMetrics = enablePerfMetrics; if (initialCapacity <= 0) { throw new IllegalArgumentException("Initial capacity must be greater than 0"); } @@ -209,14 +205,6 @@ public final class BytesToBytesMap extends MemoryConsumer { TaskMemoryManager taskMemoryManager, int initialCapacity, long pageSizeBytes) { - this(taskMemoryManager, initialCapacity, pageSizeBytes, false); - } - - public BytesToBytesMap( - TaskMemoryManager taskMemoryManager, - int initialCapacity, - long pageSizeBytes, - boolean enablePerfMetrics) { this( taskMemoryManager, SparkEnv.get() != null ? SparkEnv.get().blockManager() : null, @@ -224,8 +212,7 @@ public final class BytesToBytesMap extends MemoryConsumer { initialCapacity, // In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5. 0.5, - pageSizeBytes, - enablePerfMetrics); + pageSizeBytes); } /** @@ -462,15 +449,12 @@ public final class BytesToBytesMap extends MemoryConsumer { public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash) { assert(longArray != null); - if (enablePerfMetrics) { - numKeyLookups++; - } + numKeyLookups++; + int pos = hash & mask; int step = 1; while (true) { - if (enablePerfMetrics) { - numProbes++; - } + numProbes++; if (longArray.get(pos * 2) == 0) { // This is a new key. loc.with(pos, hash, false); @@ -860,9 +844,6 @@ public final class BytesToBytesMap extends MemoryConsumer { * Returns the average number of probes per key lookup. */ public double getAverageProbesPerLookup() { - if (!enablePerfMetrics) { - throw new IllegalStateException(); - } return (1.0 * numProbes) / numKeyLookups; } http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 53a233f..aa29232 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -530,7 +530,7 @@ public abstract class AbstractBytesToBytesMapSuite { @Test public void spillInIterator() throws IOException { BytesToBytesMap map = new BytesToBytesMap( - taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false); + taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024); try { int i; for (i = 0; i < 1024; i++) { @@ -569,7 +569,7 @@ public abstract class AbstractBytesToBytesMapSuite { @Test public void multipleValuesForSameKey() { BytesToBytesMap map = - new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false); + new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024); try { int i; for (i = 0; i < 1024; i++) { http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index c8cf44b..7e76a65 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -98,7 +98,7 @@ public final class UnsafeFixedWidthAggregationMap { this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema); this.groupingKeySchema = groupingKeySchema; this.map = new BytesToBytesMap( - taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes, true); + taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes); // Initialize the buffer for aggregation value final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema); http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index e8c01d4..b1ff6e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -248,8 +248,7 @@ private[joins] class UnsafeHashedRelation( binaryMap = new BytesToBytesMap( taskMemoryManager, (nKeys * 1.5 + 1).toInt, // reduce hash collision - pageSizeBytes, - true) + pageSizeBytes) var i = 0 var keyBuffer = new Array[Byte](1024) @@ -299,8 +298,7 @@ private[joins] object UnsafeHashedRelation { taskMemoryManager, // Only 70% of the slots can be used before growing, more capacity help to reduce collision (sizeEstimate * 1.5 + 1).toInt, - pageSizeBytes, - true) + pageSizeBytes) // Create a mapping of buildKeys -> rows val keyGenerator = UnsafeProjection.create(key) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org