amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204293791
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##########
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(DrillBuf byteBuf) {
+    this.byteBuf = byteBuf;
+    this.numBytes = byteBuf.capacity();
+    this.byteBuf.writerIndex(numBytes);
+  }
+
+
+  public static int adjustByteSize(int numBytes) {
+    if (numBytes < MINIMUM_BLOOM_SIZE) {
+      numBytes = MINIMUM_BLOOM_SIZE;
+    }
+
+    if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) {
+      numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE;
+    }
+
+    // 32 bytes alignment, one bucket.
+    numBytes = (numBytes + 0x1F) & (~0x1F);
+    return numBytes;
+  }
+
+  private void setMask(int key) {
+    final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+    Arrays.fill(mask, 0);
+
+    for (int i = 0; i < 8; ++i) {
+      mask[i] = key * SALT[i];
+    }
+
+    for (int i = 0; i < 8; ++i) {
+      mask[i] = mask[i] >> 27;
+    }
+
+    for (int i = 0; i < 8; ++i) {
+      mask[i] = 0x1 << mask[i];
+    }
+  }
+
+  /**
+   *Add an element's hash value to this bloom filter.
+   * @param hash hash result of element.
+   */
+  public void insert(long hash) {
+    int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+    int key = (int) hash;
+    setMask(key);
+    int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
+    byteBuf.getBytes(initialStartIndex, tempBucket);
+    for (int i = 0; i < 8; i++) {
+      //every iterate batch,we set 32 bits
+      int bitsetIndex = i * 4;
+      tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) 
(mask[i] >>> 24));
+      tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | 
(byte) (mask[i] >>> 16));
+      tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | 
(byte) (mask[i] >>> 8));
+      tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | 
(byte) (mask[i]));
+    }
+    byteBuf.setBytes(initialStartIndex, tempBucket);
+  }
+
+  /**
+   * Determine whether an element is set or not.
+   *
+   * @param hash the hash value of element.
+   * @return false if the element is not set, true if the element is probably 
set.
+   */
+  public boolean find(long hash) {
+    int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
+    int key = (int) hash;
+    setMask(key);
+
+    int startIndex = bucketIndex * BYTES_PER_BUCKET;
+    byteBuf.getBytes(startIndex, tempBucket);
+    for (int i = 0; i < 8; i++) {
+      byte set = 0;
+      int bitsetIndex = i * 4;
+      set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24));
+      set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16));
+      set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8));
+      set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]);
+      if (0 == set) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Merge this bloom filter with other one
+   * @param other
+   */
+  public void or(BloomFilter other) {
+    int otherLength = other.byteBuf.capacity();
+    int thisLength = this.byteBuf.capacity();
+    assert otherLength == thisLength;
+    //to avoid checking times of ByteBuf.getBytes, here we get the bytes in 
batch
 
 Review comment:
   not sure what 'checking times of ByteBuf.getBytes' means in this context..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to