[ https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529133#comment-16529133 ]
jin xing edited comment on KUDU-2483 at 7/1/18 3:37 PM: -------------------------------------------------------- Thanks [~tlipcon] for comment The benefit I think is straight. I created two Kudu tables from Spark shell: {code:java} CREATE TABLE smallTable ( idA string NOT NULL, dt string NOT NULL, PRIMARY KEY (id,dt)) PARTITION BY HASH (id) PARTITIONS 2, RANGE (dt) ( PARTITION "20180630" <= VALUES < "20180701", PARTITION "20180701" <= VALUES < "20180702", PARTITION "20180702" <= VALUES < "20180703" ) CREATE TABLE bigTable ( idB string NOT NULL, idC string NOT NULL, dt string NOT NULL, PRIMARY KEY (holder_alipay_id,quote_biz_id,dt)) PARTITION BY HASH (idB, idC) PARTITIONS 10, RANGE (dt) ( PARTITION "20180630" <= VALUES < "20180701", PARTITION "20180701" <= VALUES < "20180702", PARTITION "20180702" <= VALUES < "20180703" ){code} I inserted 6 rows to smallTable and 323075 rows into bigTable. Then query with sql {code:java} select * from smallTable inner join bigTable on smallTable.idA=bigTable.idB; {code} I added a boolean config `spark.sql.kudu.pushDownKuduBloomFilters` to control if this feature is enabled. I created the bloom filters by size=32KB and fp_rate=0.01. When `spark.sql.kudu.pushDownKuduBloomFilters` is enabled, statistics from Spark are shown as below: number of output rows from Kudu: 114 duration cost by query: 2s When ``spark.sql.kudu.pushDownKuduBloomFilters` is disabled, statics from Spark are shown as below: number of output rows from Kudu: 323075 duration cost by query: 16s I post the screen shot for comparison as below: [^image-2018-07-01-23-29-05-517.png] was (Author: jinxing6...@126.com): Thanks [~tlipcon] for comment The benefit I think is straight. I created two Kudu tables from Spark shell: {code:java} CREATE TABLE smallTable ( idA string NOT NULL, dt string NOT NULL, PRIMARY KEY (id,dt)) PARTITION BY HASH (id) PARTITIONS 2, RANGE (dt) ( PARTITION "20180630" <= VALUES < "20180701", PARTITION "20180701" <= VALUES < "20180702", PARTITION "20180702" <= VALUES < "20180703" ) CREATE TABLE bigTable ( idB string NOT NULL, idC string NOT NULL, dt string NOT NULL, PRIMARY KEY (holder_alipay_id,quote_biz_id,dt)) PARTITION BY HASH (idB, idC) PARTITIONS 10, RANGE (dt) ( PARTITION "20180630" <= VALUES < "20180701", PARTITION "20180701" <= VALUES < "20180702", PARTITION "20180702" <= VALUES < "20180703" ){code} I inserted 6 rows to smallTable and 323075 rows into bigTable. Then query with sql {code:java} select * from smallTable inner join bigTable on smallTable.idA=bigTable.idB; {code} I added a boolean config `spark.sql.kudu.pushDownKuduBloomFilters` to control if this feature is enabled. I created the bloom filters by size=23KB and fp_rate=0.01. When `spark.sql.kudu.pushDownKuduBloomFilters` is enabled, statistics from Spark are shown as below: number of output rows from Kudu: 114 duration cost by query: 2s When ``spark.sql.kudu.pushDownKuduBloomFilters` is disabled, statics from Spark are shown as below: number of output rows from Kudu: 323075 duration cost by query: 16s I post the screen shot for comparison as below: [^image-2018-07-01-23-29-05-517.png] > Scan tablets with bloom filter > ------------------------------ > > Key: KUDU-2483 > URL: https://issues.apache.org/jira/browse/KUDU-2483 > Project: Kudu > Issue Type: New Feature > Components: client > Reporter: jin xing > Priority: Major > Attachments: KUDU-2483, image-2018-07-01-23-29-05-517.png > > > Join is really common/popular in Spark SQL, in this JIRA I take broadcast > join as an example and describe how Kudu's bloom filter can help accelerate > distributed computing. > Spark runs broadcast join with below steps: > 1. When do broadcast join, we have a small table and a big table; Spark will > read all data from small table to one worker and build a hash table; > 2. The generated hash table from step 1 is broadcasted to all the workers, > which will read the splits from big table; > 3. Workers start fetching and iterating all the splits of big table and see > if the joining keys exists in the hash table; Only matched joining keys is > retained. > From above, step 3 is the heaviest, especially when the worker and split > storage is not on the same host and bandwith is limited. Actually the cost > brought by step 3 is not always necessary. Think about below scenario: > {code:none} > Small table A > id name > 1 Jin > 6 Xing > Big table B > id age > 1 10 > 2 21 > 3 33 > 4 65 > 5 32 > 6 23 > 7 18 > 8 20 > 9 22 > {code} > Run query with SQL: *select * from A inner join B on A.id=B.id* > It's pretty straight that we don't need to fetch all the data from Table B, > because the number of matched keys is really small; > I propose to use small table to build a bloom filter(BF) and use the > generated BF as a predicate/filter to fetch data from big table, thus: > 1. Much traffic/bandwith is saved. > 2. Less data to processe by worker > Broadcast join is just an example, other types of join will also benefit if > we scan with a BF > In a nutshell, I think Kudu can provide an iterface, by which user can scan > data with bloom filters -- This message was sent by Atlassian JIRA (v7.6.3#76005)