Repository: spark
Updated Branches:
  refs/heads/master dbd90e544 -> bfc5569a5


[SPARK-26289][CORE] cleanup enablePerfMetrics parameter from BytesToBytesMap

## What changes were proposed in this pull request?

`enablePerfMetrics `was originally designed in `BytesToBytesMap `to control 
`getNumHashCollisions  getTimeSpentResizingNs  getAverageProbesPerLookup`.

However, as the Spark version gradual progress.  this parameter is only used 
for `getAverageProbesPerLookup ` and always given to true when using 
`BytesToBytesMap`.

 it is also dangerous to determine whether `getAverageProbesPerLookup `opens 
and throws an `IllegalStateException `exception.
So this pr will be remove `enablePerfMetrics `parameter from `BytesToBytesMap`. 
thanks.

## How was this patch tested?

the existed test cases.

Closes #23244 from heary-cao/enablePerfMetrics.

Authored-by: caoxuewen <cao.xue...@zte.com.cn>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfc5569a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfc5569a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfc5569a

Branch: refs/heads/master
Commit: bfc5569a53510bc75c15384084ff89b418592875
Parents: dbd90e5
Author: caoxuewen <cao.xue...@zte.com.cn>
Authored: Fri Dec 7 09:57:35 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Dec 7 09:57:35 2018 +0800

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       | 33 +++++---------------
 .../map/AbstractBytesToBytesMapSuite.java       |  4 +--
 .../UnsafeFixedWidthAggregationMap.java         |  2 +-
 .../sql/execution/joins/HashedRelation.scala    |  6 ++--
 4 files changed, 12 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index a4e8859..405e529 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -159,11 +159,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
    */
   private final Location loc;
 
-  private final boolean enablePerfMetrics;
+  private long numProbes = 0L;
 
-  private long numProbes = 0;
-
-  private long numKeyLookups = 0;
+  private long numKeyLookups = 0L;
 
   private long peakMemoryUsedBytes = 0L;
 
@@ -180,8 +178,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       SerializerManager serializerManager,
       int initialCapacity,
       double loadFactor,
-      long pageSizeBytes,
-      boolean enablePerfMetrics) {
+      long pageSizeBytes) {
     super(taskMemoryManager, pageSizeBytes, 
taskMemoryManager.getTungstenMemoryMode());
     this.taskMemoryManager = taskMemoryManager;
     this.blockManager = blockManager;
@@ -189,7 +186,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
     this.loadFactor = loadFactor;
     this.loc = new Location();
     this.pageSizeBytes = pageSizeBytes;
-    this.enablePerfMetrics = enablePerfMetrics;
     if (initialCapacity <= 0) {
       throw new IllegalArgumentException("Initial capacity must be greater 
than 0");
     }
@@ -209,14 +205,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
       TaskMemoryManager taskMemoryManager,
       int initialCapacity,
       long pageSizeBytes) {
-    this(taskMemoryManager, initialCapacity, pageSizeBytes, false);
-  }
-
-  public BytesToBytesMap(
-      TaskMemoryManager taskMemoryManager,
-      int initialCapacity,
-      long pageSizeBytes,
-      boolean enablePerfMetrics) {
     this(
       taskMemoryManager,
       SparkEnv.get() != null ? SparkEnv.get().blockManager() :  null,
@@ -224,8 +212,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       initialCapacity,
       // In order to re-use the longArray for sorting, the load factor cannot 
be larger than 0.5.
       0.5,
-      pageSizeBytes,
-      enablePerfMetrics);
+      pageSizeBytes);
   }
 
   /**
@@ -462,15 +449,12 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
   public void safeLookup(Object keyBase, long keyOffset, int keyLength, 
Location loc, int hash) {
     assert(longArray != null);
 
-    if (enablePerfMetrics) {
-      numKeyLookups++;
-    }
+    numKeyLookups++;
+
     int pos = hash & mask;
     int step = 1;
     while (true) {
-      if (enablePerfMetrics) {
-        numProbes++;
-      }
+      numProbes++;
       if (longArray.get(pos * 2) == 0) {
         // This is a new key.
         loc.with(pos, hash, false);
@@ -860,9 +844,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
    * Returns the average number of probes per key lookup.
    */
   public double getAverageProbesPerLookup() {
-    if (!enablePerfMetrics) {
-      throw new IllegalStateException();
-    }
     return (1.0 * numProbes) / numKeyLookups;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 53a233f..aa29232 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -530,7 +530,7 @@ public abstract class AbstractBytesToBytesMapSuite {
   @Test
   public void spillInIterator() throws IOException {
     BytesToBytesMap map = new BytesToBytesMap(
-      taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, 
false);
+      taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024);
     try {
       int i;
       for (i = 0; i < 1024; i++) {
@@ -569,7 +569,7 @@ public abstract class AbstractBytesToBytesMapSuite {
   @Test
   public void multipleValuesForSameKey() {
     BytesToBytesMap map =
-      new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 
1, 0.5, 1024, false);
+      new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 
1, 0.5, 1024);
     try {
       int i;
       for (i = 0; i < 1024; i++) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index c8cf44b..7e76a65 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -98,7 +98,7 @@ public final class UnsafeFixedWidthAggregationMap {
     this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
     this.groupingKeySchema = groupingKeySchema;
     this.map = new BytesToBytesMap(
-      taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes, true);
+      taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes);
 
     // Initialize the buffer for aggregation value
     final UnsafeProjection valueProjection = 
UnsafeProjection.create(aggregationBufferSchema);

http://git-wip-us.apache.org/repos/asf/spark/blob/bfc5569a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index e8c01d4..b1ff6e8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -248,8 +248,7 @@ private[joins] class UnsafeHashedRelation(
     binaryMap = new BytesToBytesMap(
       taskMemoryManager,
       (nKeys * 1.5 + 1).toInt, // reduce hash collision
-      pageSizeBytes,
-      true)
+      pageSizeBytes)
 
     var i = 0
     var keyBuffer = new Array[Byte](1024)
@@ -299,8 +298,7 @@ private[joins] object UnsafeHashedRelation {
       taskMemoryManager,
       // Only 70% of the slots can be used before growing, more capacity help 
to reduce collision
       (sizeEstimate * 1.5 + 1).toInt,
-      pageSizeBytes,
-      true)
+      pageSizeBytes)
 
     // Create a mapping of buildKeys -> rows
     val keyGenerator = UnsafeProjection.create(key)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to