[ https://issues.apache.org/jira/browse/DRILL-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
weijie.tong updated DRILL-6385: ------------------------------- Affects Version/s: (was: 1.14.0) 1.15.0 > Support JPPD (Join Predicate Push Down) > --------------------------------------- > > Key: DRILL-6385 > URL: https://issues.apache.org/jira/browse/DRILL-6385 > Project: Apache Drill > Issue Type: New Feature > Components: Server, Execution - Flow > Affects Versions: 1.15.0 > Reporter: weijie.tong > Assignee: weijie.tong > Priority: Major > Labels: ready-to-commit > > This feature is to support the JPPD (Join Predicate Push Down). It will > benefit the HashJoin ,Broadcast HashJoin performance by reducing the number > of rows to send across the network ,the memory consumed. This feature is > already supported by Impala which calls it RuntimeFilter > ([https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]). > The first PR will try to push down a bloom filter of HashJoin node to > Parquet’s scan node. The propose basic procedure is described as follow: > # The HashJoin build side accumulate the equal join condition rows to > construct a bloom filter. Then it sends out the bloom filter to the foreman > node. > # The foreman node accept the bloom filters passively from all the fragments > that has the HashJoin operator. It then aggregates the bloom filters to form > a global bloom filter. > # The foreman node broadcasts the global bloom filter to all the probe side > scan nodes which maybe already have send out partial data to the hash join > nodes(currently the hash join node will prefetch one batch from both sides ). > 4. The scan node accepts a global bloom filter from the foreman node. > It will filter the rest rows satisfying the bloom filter. > > To implement above execution flow, some main new notion described as below: > 1. RuntimeFilter > It’s a filter container which may contain BloomFilter or MinMaxFilter. > 2. RuntimeFilterReporter > It wraps the logic to send hash join’s bloom filter to the foreman.The > serialized bloom filter will be sent out through the data tunnel.This object > will be instanced by the FragmentExecutor and passed to the > FragmentContext.So the HashJoin operator can obtain it through the > FragmentContext. > 3. RuntimeFilterRequestHandler > It is responsible to accept a SendRuntimeFilterRequest RPC to strip the > actual BloomFilter from the network. It then translates this filter to the > WorkerBee’s new interface registerRuntimeFilter. > Another RPC type is BroadcastRuntimeFilterRequest. It will register the > accepted global bloom filter to the WorkerBee by the registerRuntimeFilter > method and then propagate to the FragmentContext through which the probe side > scan node can fetch the aggregated bloom filter. > 4.RuntimeFilterManager > The foreman will instance a RuntimeFilterManager .It will indirectly get > every RuntimeFilter by the WorkerBee. Once all the BloomFilters have been > accepted and aggregated . It will broadcast the aggregated bloom filter to > all the probe side scan nodes through the data tunnel by a > BroadcastRuntimeFilterRequest RPC. > 5. RuntimeFilterEnableOption > A global option will be added to decide whether to enable this new feature. > > Welcome suggestion and advice from you.The related PR will be presented as > soon as possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)