[GitHub] weijietong commented on issue #1334: DRILL-6385: Support JPPD feature
weijietong commented on issue #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#issuecomment-408599354 To the cleanup point, I currently think there's no problem. Once the query was canceled, the foreman or the drillbit will run out , the in-flight runtime filter will failed due to the wire termination of one side. The off-heap memory occupied by the bloom filter will be released by the promise of using `SendingAccountor` to send out `RuntimeFilter` through DataTunnel. If we find any potential issues later , we can fixed them at the following JIRAs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205939854 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.memory.BufferAllocator; + +public class BloomFilterCreator { + + public static BloomFilter spawnOne(int numBytes, BufferAllocator bufferAllocator) + { +int size = BloomFilter.adjustByteSize(numBytes); +DrillBuf drillBuf = bufferAllocator.buffer(size); +BloomFilter bloomFilter = new BloomFilter(drillBuf); +return bloomFilter; + } + + public static BloomFilter spawnOne(int ndv, double fpp, BufferAllocator bufferAllocator) + { +int numBytes = BloomFilter.optimalNumOfBits(ndv, fpp); Review comment: will rename the optimalNumOfBits method to optimalNumOfBytes by dividing the return value by 8. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205939823 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(DrillBuf byteBuf) { +this.byteBuf = byteBuf; +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + + public static int adjustByteSize(int numBytes) { +if (numBytes < MINIMUM_BLOOM_SIZE) { + numBytes = MINIMUM_BLOOM_SIZE; +} + +if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) { + numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE; +} + +// 32 bytes alignment, one bucket. +numBytes = (numBytes + 0x1F) & (~0x1F); +return numBytes; + } + + private void setMask(int key) { +final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; + +Arrays.fill(mask, 0); + +for (int i = 0; i < 8; ++i) { + mask[i] = key * SALT[i]; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = mask[i] >> 27; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = 0x1 << mask[i]; +} + } + + /** + *Add an element's hash value to this bloom filter. + * @param hash hash result of element. + */ + public void insert(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); +int initialStartIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(initialStartIndex, tempBucket); +for (int i = 0; i < 8; i++) { + //every iterate batch,we set 32 bits + int bitsetIndex = i * 4; + tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24)); + tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16)); + tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8)); + tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i])); +} +byteBuf.setBytes(initialStartIndex, tempBucket); + } + + /** + * Determine whether an element is set or not. + * + * @param hash the hash value of element. + * @return false if the element is not set, true if the element is probably set. + */ + public boolean find(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); + +int startIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(startIndex, tempBucket); +for (int i = 0; i < 8; i++) { + byte set = 0; + int bitsetIndex = i * 4; + set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24)); + set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16)); + set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8)); + set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]); + if (0 == set) { +return false; + } +} +return true; + } + + /** + * Merge this bloom filter with other one + * @param other + */ + public void or(BloomFilter other) { +int otherLength = other.byteBuf.capacity(); +int thisLength = this.byteBuf.capacity(); +assert otherLength == thisLength; +//to avoid checking times of Byte
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938739 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; Review comment: The reason to keep it as a member variable is to avoid heap memory GC while invoking the corresponding methods frequently. If move it to the locally methods, it will break down the performance . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938434 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(DrillBuf byteBuf) { +this.byteBuf = byteBuf; +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + + public static int adjustByteSize(int numBytes) { +if (numBytes < MINIMUM_BLOOM_SIZE) { + numBytes = MINIMUM_BLOOM_SIZE; +} + +if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) { + numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE; +} + +// 32 bytes alignment, one bucket. +numBytes = (numBytes + 0x1F) & (~0x1F); +return numBytes; + } + + private void setMask(int key) { +final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; + +Arrays.fill(mask, 0); + +for (int i = 0; i < 8; ++i) { + mask[i] = key * SALT[i]; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = mask[i] >> 27; +} + +for (int i = 0; i < 8; ++i) { + mask[i] = 0x1 << mask[i]; +} + } + + /** + *Add an element's hash value to this bloom filter. + * @param hash hash result of element. + */ + public void insert(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); +int initialStartIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(initialStartIndex, tempBucket); +for (int i = 0; i < 8; i++) { + //every iterate batch,we set 32 bits + int bitsetIndex = i * 4; + tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24)); + tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16)); + tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8)); + tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i])); +} +byteBuf.setBytes(initialStartIndex, tempBucket); + } + + /** + * Determine whether an element is set or not. + * + * @param hash the hash value of element. + * @return false if the element is not set, true if the element is probably set. + */ + public boolean find(long hash) { +int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); +int key = (int) hash; +setMask(key); + +int startIndex = bucketIndex * BYTES_PER_BUCKET; +byteBuf.getBytes(startIndex, tempBucket); +for (int i = 0; i < 8; i++) { + byte set = 0; + int bitsetIndex = i * 4; + set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24)); + set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16)); + set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8)); + set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]); + if (0 == set) { +return false; + } +} +return true; + } + + /** + * Merge this bloom filter with other one + * @param other + */ + public void or(BloomFilter other) { +int otherLength = other.byteBuf.capacity(); +int thisLength = this.byteBuf.capacity(); +assert otherLength == thisLength; +//to avoid checking times of Byte
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938123 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; Review comment: BYTES is right. will update that naming. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r205938105 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import io.netty.buffer.DrillBuf; +import java.util.Arrays; + + +/** + * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter + * Filters", the main theory is to construct tiny bucket bloom filters which + * benefit to the cpu cache and SIMD opcode. + */ + +public class BloomFilter { + // Bytes in a bucket. + private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. + private static final int MINIMUM_BLOOM_SIZE = 256; + + private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; + + private DrillBuf byteBuf; + + private int numBytes; + + private int mask[] = new int[8]; + + private byte[] tempBucket = new byte[32]; + + + public BloomFilter(DrillBuf byteBuf) { +this.byteBuf = byteBuf; +this.numBytes = byteBuf.capacity(); +this.byteBuf.writerIndex(numBytes); + } + + + public static int adjustByteSize(int numBytes) { +if (numBytes < MINIMUM_BLOOM_SIZE) { + numBytes = MINIMUM_BLOOM_SIZE; +} + +if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) { + numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE; +} + +// 32 bytes alignment, one bucket. +numBytes = (numBytes + 0x1F) & (~0x1F); +return numBytes; + } + + private void setMask(int key) { +final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; Review comment: those 8 odd numbers was picked from Impala's implementation(see bloom-filter.h) . They act as salt values to participate in calculating the mask. I will add some comments to this method to make it clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services