ahshahid opened a new issue, #6039:
URL: https://github.com/apache/iceberg/issues/6039
Spark has Partition Pruning rule which under right condition can fetch all
the join keys of one side of the table, and pass it as an In Clause filter to
other table.
For eg if the query is select * from Table1 joinTable2 on Table1. A =
Table2.B where …..
In this case depending upon the stats / cardinality , the DPP rule may
exeute a query like
select distinct A from Table1
such that it has all the unique keys of Column A from Table1, which can be
passed as an In filter to Table2.
Thus on Table2 , there can be a Filter B IN ( unique values of A………)
Spark provides an Interface SupportsRuntimeFilter which if implemented by
the underlying Datasource , can pass all the columns which the Datasource can
do to prune the row space.
Though the name of the rule is PartitionPruning rule, but spark is agnostic
to whether the columns passed by data source have to be partitioned column or
not.
In case of iceberg ( if table is using parquet format , at least, not sure
yet if it applies to orc or avro as I have not yet investigated those 2
formats), if the column type is comparable ( at least for numerical types) we
have info of max/min.
These can be utilized to prune the row space further by skipping row groups,
even if the column is not a partitioning column.
Iceberg currently provides only partitioning columns as part of
SupportsRuntimeFilter. This can be augmented to non partitioning columns too.
Since iceberg storage format , at least if it is parquet ( not sure about
avro or orc yet, afaik) , implies that for some column types ( at least
numerical I suppose, though may be string), stats are available at manifest
level and Row Group level.
Implementation idea:
Lets assume col A & col B are Int.
and unique values of Col B obtained by spark’s DPP query is say ( 10, 17,
18, 1, 5, 4)
We can create a SortedSet of these values 1, 4, 5, 10, 17, 18
Iceberg and parquet, invoke the filters passed , at multiple levels..
Like, at time of manifest reading, Row Group reading etc.
At such places, the “iceberg & parquet” code base, invoke the Filters ( via
Visitor pattern) , passing data objects from which min & max values of the
column are available.
Thus say if the min passed is 3, and max passed is 6 for column A,
we can obtain a tailSet starting from (3) on the SortedSet.
Open iterator on tail set,
if iter.next exists and its value is <= max, that means the File/RowGroup (
or whatever chunk for which the min/max are present) needs to be considered.
else it can be ignored
so here iter.next will be true & its value is 4 .. since 4 <= 6 , the block
/ chunk/ RowGroup will be read.
This is the general idea.
For this I have extended BoundSetPredicate and created a new
BoundRangeInPredicate
At some point , I think one of them should suffice.
For now I have kept it separate.
Reasons :
1) BoundSetPredicate does not keep sorted data . It keeps it as a Set..
The places where it needs to do checks on lower Bound , uppper Bound, it
uses comparator to sort on each call. This can be avoided if the set was sorted
in the first place.
2) The Parquet would convert the BoundSetPredicate to In Filter. This would
be evaluated on each row too.
But in our case , looking up the Set for each row would be expensive,
because it is a sorted set and in any case it will do look up in Joining
HashSet anyways. So no point in doing 2 lookups. With a UserDefined Filter ,
per row check can be skipped.
But this needs to be looked into little more detail, as it appears that
parquet would do pruning at page store level too. and there is no callback
recieved by filter at Page Store level. The filtering done at Page level is
sort of tied to filter implementation in parquet, while the RangeIn filter
which I created is user defined.
I will be opening a PR in some time for this, as I need to add tests and
run tpcds benchmarks ( though as such code wise it is ready)
But I would solicit your feedback as to what you all think?
If this looks good, then I also have in mind to see the perf impact in case
of general broadcast hash joins, where the joining key is int, short etc , so
that the hashkeys as sorted set can be passed to the stream side before opening
the iterator and can be used to skip row groups ( or may be page level too)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]