[ 
https://issues.apache.org/jira/browse/DRILL-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

weijie.tong updated DRILL-6385:
-------------------------------
    Affects Version/s:     (was: 1.14.0)
                       1.15.0

> Support JPPD (Join Predicate Push Down)
> ---------------------------------------
>
>                 Key: DRILL-6385
>                 URL: https://issues.apache.org/jira/browse/DRILL-6385
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components:  Server, Execution - Flow
>    Affects Versions: 1.15.0
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>              Labels: ready-to-commit
>
> This feature is to support the JPPD (Join Predicate Push Down). It will 
> benefit the HashJoin ,Broadcast HashJoin performance by reducing the number 
> of rows to send across the network ,the memory consumed. This feature is 
> already supported by Impala which calls it RuntimeFilter 
> ([https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]).
>  The first PR will try to push down a bloom filter of HashJoin node to 
> Parquet’s scan node.   The propose basic procedure is described as follow:
>  # The HashJoin build side accumulate the equal join condition rows to 
> construct a bloom filter. Then it sends out the bloom filter to the foreman 
> node.
>  # The foreman node accept the bloom filters passively from all the fragments 
> that has the HashJoin operator. It then aggregates the bloom filters to form 
> a global bloom filter.
>  # The foreman node broadcasts the global bloom filter to all the probe side 
> scan nodes which maybe already have send out partial data to the hash join 
> nodes(currently the hash join node will prefetch one batch from both sides ).
>       4.  The scan node accepts a global bloom filter from the foreman node. 
> It will filter the rest rows satisfying the bloom filter.
>  
> To implement above execution flow, some main new notion described as below:
>       1. RuntimeFilter
> It’s a filter container which may contain BloomFilter or MinMaxFilter.
>       2. RuntimeFilterReporter
> It wraps the logic to send hash join’s bloom filter to the foreman.The 
> serialized bloom filter will be sent out through the data tunnel.This object 
> will be instanced by the FragmentExecutor and passed to the 
> FragmentContext.So the HashJoin operator can obtain it through the 
> FragmentContext.
>      3. RuntimeFilterRequestHandler
> It is responsible to accept a SendRuntimeFilterRequest RPC to strip the 
> actual BloomFilter from the network. It then translates this filter to the 
> WorkerBee’s new interface registerRuntimeFilter.
> Another RPC type is BroadcastRuntimeFilterRequest. It will register the 
> accepted global bloom filter to the WorkerBee by the registerRuntimeFilter 
> method and then propagate to the FragmentContext through which the probe side 
> scan node can fetch the aggregated bloom filter.
>       4.RuntimeFilterManager
> The foreman will instance a RuntimeFilterManager .It will indirectly get 
> every RuntimeFilter by the WorkerBee. Once all the BloomFilters have been 
> accepted and aggregated . It will broadcast the aggregated bloom filter to 
> all the probe side scan nodes through the data tunnel by a 
> BroadcastRuntimeFilterRequest RPC.
>      5. RuntimeFilterEnableOption 
>  A global option will be added to decide whether to enable this new feature.
>  
> Welcome suggestion and advice from you.The related PR will be presented as 
> soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to