ahshahid opened a new issue, #6039:
URL: https://github.com/apache/iceberg/issues/6039

   Spark has Partition Pruning rule which under right condition can fetch all 
the join keys of one side of the table,  and pass it as an In Clause filter to 
other table.
   
   For eg if the query is select * from  Table1 joinTable2  on  Table1. A =  
Table2.B where …..
   
   In this case depending upon the stats / cardinality ,  the DPP rule may 
exeute a query like
   
   select  distinct A from Table1  
   
   such that it has all the unique keys  of Column A from Table1, which can be 
passed as an In filter to Table2.
   
   Thus on Table2 , there can be a Filter  B IN ( unique values of A………)
   
   Spark provides an Interface SupportsRuntimeFilter which if implemented  by 
the underlying Datasource , can pass all the columns which the Datasource can 
do to prune the  row space.
   Though the name of the rule is PartitionPruning rule, but spark is agnostic 
to whether the columns passed by data source have to be partitioned column or 
not.
   In case of iceberg ( if table is using parquet format , at least, not sure 
yet if it applies to orc or avro as I have  not yet investigated those 2 
formats), if the column type is comparable ( at least for numerical types) we 
have info of max/min.
   These can be utilized to prune the row space further by skipping row groups, 
even if the column is not a partitioning column.
   
   Iceberg currently provides only partitioning columns as part of 
SupportsRuntimeFilter. This can be augmented to non partitioning columns too.
   
   Since iceberg storage format , at least if it is parquet ( not sure about 
avro or orc yet, afaik) , implies that for some column types ( at least 
numerical I suppose, though may be string),  stats are available at manifest 
level and Row Group level.
   
    
   Implementation idea:
   Lets assume col A & col B are Int.
   
   and unique values of Col B obtained by spark’s DPP query is say ( 10, 17, 
18, 1, 5, 4)
   
   We can create a SortedSet of these values 1, 4, 5, 10, 17, 18
   
   Iceberg and parquet, invoke the filters passed , at multiple levels..
   
   Like, at time of manifest reading, Row Group reading etc.
   
   At such places, the “iceberg & parquet” code base, invoke the Filters ( via 
Visitor pattern) , passing data objects from which min & max values of the 
column are available.
   
   Thus say if the min passed is 3, and max passed is 6 for  column A,
   
   we can obtain a tailSet starting from (3) on the SortedSet.
   
   Open iterator on tail set,
   
   if iter.next exists and  its value is <= max, that means the File/RowGroup ( 
or whatever chunk for which the min/max are present) needs to be considered. 
else it can be ignored
   
   so here iter.next will be true & its value is 4 .. since 4 <= 6 , the block 
/ chunk/ RowGroup will be read.
   
    
   
   This is the general idea.
   
   For this I have extended  BoundSetPredicate and created a new 
BoundRangeInPredicate
   
   At some point , I think one of them should suffice.
   
   For now I have kept it separate.
   
   Reasons :
   
   1) BoundSetPredicate does not keep sorted data . It keeps it as a Set..
   
   The places where it needs to do checks on lower Bound , uppper Bound, it 
uses comparator to sort on each call. This can be avoided if the set was sorted 
in the first place.
   
    
   
   2) The Parquet would convert the BoundSetPredicate to In Filter.  This would 
be evaluated on each row  too.
   
   But in our case , looking up the Set for each row would be expensive, 
because it is a sorted set and in any case it will do look up in Joining 
HashSet anyways. So no point in doing 2 lookups. With a UserDefined Filter , 
per row check can be skipped.
   
   But this needs to be looked into little more detail, as it appears that 
parquet would do pruning at page store level too.  and there is no callback 
recieved by filter at Page Store level. The filtering done at Page level is 
sort of tied to filter implementation in parquet, while the RangeIn filter 
which I created is user defined.
   
    
   
   I will be opening a PR in some time for  this, as I need to add tests and 
run tpcds benchmarks ( though as such code wise it is ready)
   
   But I would solicit your feedback as to what you all think?
   
   If this looks good, then I also have in mind to see the perf impact in case 
of general broadcast hash joins, where the joining key is  int, short etc , so 
that the hashkeys as sorted set can be passed to the stream side before opening 
the iterator and can be used to skip row groups ( or may be page level too)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to