Repository: spark
Updated Branches:
  refs/heads/master 701fb5052 -> 1b6a5d4af


[SPARK-11493] remove bitset from BytesToBytesMap

Since we have 4 bytes as number of records in the beginning of a page, the 
address can not be zero, so we do not need the bitset.

For performance concerns, the bitset could help speed up false lookup if the 
slot is empty (because bitset is smaller than longArray, cache hit rate will be 
higher). In practice, the map is filled with 35% - 70% (use 50% as average), so 
only half of the false lookups can benefit of it, all others will pay the cost 
of load the bitset (still need to access the longArray anyway).

For aggregation, we always need to access the longArray (insert a new key after 
false lookup), also confirmed by a benchmark.

 For broadcast hash join, there could be a regression, but a simple benchmark 
showed that it may not (most of lookup are false):

```
sqlContext.range(1<<20).write.parquet("small")
df = sqlContext.read.parquet('small')
for i in range(3):
    t = time.time()
    df2 = sqlContext.range(1<<26).selectExpr("id * 1111111111 % 987654321 as 
id2")
    df2.join(df, df.id == df2.id2).count()
    print time.time() -t
```

Having bitset (used time in seconds):
```
17.5404241085
10.2758829594
10.5786800385
```
After removing bitset (used time in seconds):
```
21.8939979076
12.4132959843
9.97224712372
```

cc rxin nongli

Author: Davies Liu <dav...@databricks.com>

Closes #9452 from davies/remove_bitset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b6a5d4a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b6a5d4a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b6a5d4a

Branch: refs/heads/master
Commit: 1b6a5d4af9691c3f7f3ebee3146dc13d12a0e047
Parents: 701fb50
Author: Davies Liu <dav...@databricks.com>
Authored: Wed Nov 4 14:45:02 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Wed Nov 4 14:45:02 2015 -0800

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       |  58 +++-------
 .../org/apache/spark/unsafe/bitset/BitSet.java  | 113 -------------------
 .../apache/spark/unsafe/bitset/BitSetSuite.java |  88 ---------------
 3 files changed, 15 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b6a5d4a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index e36709c..07241c8 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -35,7 +35,6 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.bitset.BitSet;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.unsafe.memory.MemoryLocation;
@@ -123,12 +122,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
    */
   private boolean canGrowArray = true;
 
-  /**
-   * A {@link BitSet} used to track location of the map where the key is set.
-   * Size of the bitset should be half of the size of the long array.
-   */
-  @Nullable private BitSet bitset;
-
   private final double loadFactor;
 
   /**
@@ -427,7 +420,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
    * This is a thread-safe version of `lookup`, could be used by multiple 
threads.
    */
   public void safeLookup(Object keyBase, long keyOffset, int keyLength, 
Location loc) {
-    assert(bitset != null);
     assert(longArray != null);
 
     if (enablePerfMetrics) {
@@ -440,7 +432,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       if (enablePerfMetrics) {
         numProbes++;
       }
-      if (!bitset.isSet(pos)) {
+      if (longArray.get(pos * 2) == 0) {
         // This is a new key.
         loc.with(pos, hashcode, false);
         return;
@@ -644,7 +636,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
       assert (!isDefined) : "Can only set value once for a key";
       assert (keyLength % 8 == 0);
       assert (valueLength % 8 == 0);
-      assert(bitset != null);
       assert(longArray != null);
 
       if (numElements == MAX_CAPACITY || !canGrowArray) {
@@ -678,7 +669,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
       Platform.putInt(base, offset, Platform.getInt(base, offset) + 1);
       pageCursor += recordLength;
       numElements++;
-      bitset.set(pos);
       final long storedKeyAddress = 
taskMemoryManager.encodePageNumberAndOffset(
         currentPage, recordOffset);
       longArray.set(pos * 2, storedKeyAddress);
@@ -734,7 +724,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
     assert (capacity <= MAX_CAPACITY);
     acquireMemory(capacity * 16);
     longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 
2]));
-    bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
 
     this.growthThreshold = (int) (capacity * loadFactor);
     this.mask = capacity - 1;
@@ -749,7 +738,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
       long used = longArray.memoryBlock().size();
       longArray = null;
       releaseMemory(used);
-      bitset = null;
     }
   }
 
@@ -795,9 +783,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
     for (MemoryBlock dataPage : dataPages) {
       totalDataPagesSize += dataPage.size();
     }
-    return totalDataPagesSize +
-      ((bitset != null) ? bitset.memoryBlock().size() : 0L) +
-      ((longArray != null) ? longArray.memoryBlock().size() : 0L);
+    return totalDataPagesSize + ((longArray != null) ? 
longArray.memoryBlock().size() : 0L);
   }
 
   private void updatePeakMemoryUsed() {
@@ -852,7 +838,6 @@ public final class BytesToBytesMap extends MemoryConsumer {
    */
   @VisibleForTesting
   void growAndRehash() {
-    assert(bitset != null);
     assert(longArray != null);
 
     long resizeStartTime = -1;
@@ -861,39 +846,26 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
     }
     // Store references to the old data structures to be used when we re-hash
     final LongArray oldLongArray = longArray;
-    final BitSet oldBitSet = bitset;
-    final int oldCapacity = (int) oldBitSet.capacity();
+    final int oldCapacity = (int) oldLongArray.size() / 2;
 
     // Allocate the new data structures
-    try {
-      allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), 
MAX_CAPACITY));
-    } catch (OutOfMemoryError oom) {
-      longArray = oldLongArray;
-      bitset = oldBitSet;
-      throw oom;
-    }
+    allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
 
     // Re-mask (we don't recompute the hashcode because we stored all 32 bits 
of it)
-    for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = 
oldBitSet.nextSetBit(pos + 1)) {
-      final long keyPointer = oldLongArray.get(pos * 2);
-      final int hashcode = (int) oldLongArray.get(pos * 2 + 1);
+    for (int i = 0; i < oldLongArray.size(); i += 2) {
+      final long keyPointer = oldLongArray.get(i);
+      if (keyPointer == 0) {
+        continue;
+      }
+      final int hashcode = (int) oldLongArray.get(i + 1);
       int newPos = hashcode & mask;
       int step = 1;
-      boolean keepGoing = true;
-
-      // No need to check for equality here when we insert so this has one 
less if branch than
-      // the similar code path in addWithoutResize.
-      while (keepGoing) {
-        if (!bitset.isSet(newPos)) {
-          bitset.set(newPos);
-          longArray.set(newPos * 2, keyPointer);
-          longArray.set(newPos * 2 + 1, hashcode);
-          keepGoing = false;
-        } else {
-          newPos = (newPos + step) & mask;
-          step++;
-        }
+      while (longArray.get(newPos * 2) != 0) {
+        newPos = (newPos + step) & mask;
+        step++;
       }
+      longArray.set(newPos * 2, keyPointer);
+      longArray.set(newPos * 2 + 1, hashcode);
     }
     releaseMemory(oldLongArray.memoryBlock().size());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1b6a5d4a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java
----------------------------------------------------------------------
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java
deleted file mode 100644
index 7c12417..0000000
--- a/unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSet.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.bitset;
-
-import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-
-/**
- * A fixed size uncompressed bit set backed by a {@link LongArray}.
- *
- * Each bit occupies exactly one bit of storage.
- */
-public final class BitSet {
-
-  /** A long array for the bits. */
-  private final LongArray words;
-
-  /** Length of the long array. */
-  private final int numWords;
-
-  private final Object baseObject;
-  private final long baseOffset;
-
-  /**
-   * Creates a new {@link BitSet} using the specified memory block. Size of 
the memory block must be
-   * multiple of 8 bytes (i.e. 64 bits).
-   */
-  public BitSet(MemoryBlock memory) {
-    words = new LongArray(memory);
-    assert (words.size() <= Integer.MAX_VALUE);
-    numWords = (int) words.size();
-    baseObject = words.memoryBlock().getBaseObject();
-    baseOffset = words.memoryBlock().getBaseOffset();
-  }
-
-  public MemoryBlock memoryBlock() {
-    return words.memoryBlock();
-  }
-
-  /**
-   * Returns the number of bits in this {@code BitSet}.
-   */
-  public long capacity() {
-    return numWords * 64;
-  }
-
-  /**
-   * Sets the bit at the specified index to {@code true}.
-   */
-  public void set(int index) {
-    assert index < numWords * 64 : "index (" + index + ") should < length (" + 
numWords * 64 + ")";
-    BitSetMethods.set(baseObject, baseOffset, index);
-  }
-
-  /**
-   * Sets the bit at the specified index to {@code false}.
-   */
-  public void unset(int index) {
-    assert index < numWords * 64 : "index (" + index + ") should < length (" + 
numWords * 64 + ")";
-    BitSetMethods.unset(baseObject, baseOffset, index);
-  }
-
-  /**
-   * Returns {@code true} if the bit is set at the specified index.
-   */
-  public boolean isSet(int index) {
-    assert index < numWords * 64 : "index (" + index + ") should < length (" + 
numWords * 64 + ")";
-    return BitSetMethods.isSet(baseObject, baseOffset, index);
-  }
-
-  /**
-   * Returns the index of the first bit that is set to true that occurs on or 
after the
-   * specified starting index. If no such bit exists then {@code -1} is 
returned.
-   * <p>
-   * To iterate over the true bits in a BitSet, use the following loop:
-   * <pre>
-   * <code>
-   *  for (long i = bs.nextSetBit(0); i &gt;= 0; i = bs.nextSetBit(i + 1)) {
-   *    // operate on index i here
-   *  }
-   * </code>
-   * </pre>
-   *
-   * @param fromIndex the index to start checking from (inclusive)
-   * @return the index of the next set bit, or -1 if there is no such bit
-   */
-  public int nextSetBit(int fromIndex) {
-    return BitSetMethods.nextSetBit(baseObject, baseOffset, fromIndex, 
numWords);
-  }
-
-  /**
-   * Returns {@code true} if any bit is set.
-   */
-  public boolean anySet() {
-    return BitSetMethods.anySet(baseObject, baseOffset, numWords);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1b6a5d4a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
----------------------------------------------------------------------
diff --git 
a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java 
b/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
deleted file mode 100644
index 14e3868..0000000
--- a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.bitset;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.unsafe.memory.MemoryBlock;
-
-public class BitSetSuite {
-
-  private static BitSet createBitSet(int capacity) {
-    Assert.assertEquals(0, capacity % 64);
-    return new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
-  }
-
-  @Test
-  public void basicOps() {
-    BitSet bs = createBitSet(64);
-    Assert.assertEquals(64, bs.capacity());
-
-    // Make sure the bit set starts empty.
-    for (int i = 0; i < bs.capacity(); i++) {
-      Assert.assertFalse(bs.isSet(i));
-    }
-    // another form of asserting that the bit set is empty
-    Assert.assertFalse(bs.anySet());
-
-    // Set every bit and check it.
-    for (int i = 0; i < bs.capacity(); i++) {
-      bs.set(i);
-      Assert.assertTrue(bs.isSet(i));
-    }
-
-    // Unset every bit and check it.
-    for (int i = 0; i < bs.capacity(); i++) {
-      Assert.assertTrue(bs.isSet(i));
-      bs.unset(i);
-      Assert.assertFalse(bs.isSet(i));
-    }
-
-    // Make sure anySet() can detect any set bit
-    bs = createBitSet(256);
-    bs.set(64);
-    Assert.assertTrue(bs.anySet());
-  }
-
-  @Test
-  public void traversal() {
-    BitSet bs = createBitSet(256);
-
-    Assert.assertEquals(-1, bs.nextSetBit(0));
-    Assert.assertEquals(-1, bs.nextSetBit(10));
-    Assert.assertEquals(-1, bs.nextSetBit(64));
-
-    bs.set(10);
-    Assert.assertEquals(10, bs.nextSetBit(0));
-    Assert.assertEquals(10, bs.nextSetBit(1));
-    Assert.assertEquals(10, bs.nextSetBit(10));
-    Assert.assertEquals(-1, bs.nextSetBit(11));
-
-    bs.set(11);
-    Assert.assertEquals(10, bs.nextSetBit(10));
-    Assert.assertEquals(11, bs.nextSetBit(11));
-
-    // Skip a whole word and find it
-    bs.set(190);
-    Assert.assertEquals(190, bs.nextSetBit(12));
-
-    Assert.assertEquals(-1, bs.nextSetBit(191));
-    Assert.assertEquals(-1, bs.nextSetBit(256));
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to