Yao Zhang created HUDI-7309:
-------------------------------

             Summary: Disable filter pushing down when the parquet type 
corresponding to its field logical type is not comparable
                 Key: HUDI-7309
                 URL: https://issues.apache.org/jira/browse/HUDI-7309
             Project: Apache Hudi
          Issue Type: Bug
          Components: flink
    Affects Versions: 0.14.1, 0.14.0
         Environment: Hudi 0.14.0
Hudi 0.14.1rc1
Flink 1.17.1
            Reporter: Yao Zhang
            Assignee: Yao Zhang


Given thee table web_sales from TPCDS:
{code:sql}
CREATE TABLE web_sales (
                           ws_sold_date_sk int,
                           ws_sold_time_sk int,
                           ws_ship_date_sk int,
                           ws_item_sk int,
                           ws_bill_customer_sk int,
                           ws_bill_cdemo_sk int,
                           ws_bill_hdemo_sk int,
                           ws_bill_addr_sk int,
                           ws_ship_customer_sk int,
                           ws_ship_cdemo_sk int,
                           ws_ship_hdemo_sk int,
                           ws_ship_addr_sk int,
                           ws_web_page_sk int,
                           ws_web_site_sk int,
                           ws_ship_mode_sk int,
                           ws_warehouse_sk int,
                           ws_promo_sk int,
                           ws_order_number int,
                           ws_quantity int,
                           ws_wholesale_cost decimal(7,2),
                           ws_list_price decimal(7,2),
                           ws_sales_price decimal(7,2),
                           ws_ext_discount_amt decimal(7,2),
                           ws_ext_sales_price decimal(7,2),
                           ws_ext_wholesale_cost decimal(7,2),
                           ws_ext_list_price decimal(7,2),
                           ws_ext_tax decimal(7,2),
                           ws_coupon_amt decimal(7,2),
                           ws_ext_ship_cost decimal(7,2),
                           ws_net_paid decimal(7,2),
                           ws_net_paid_inc_tax decimal(7,2),
                           ws_net_paid_inc_ship decimal(7,2),
                           ws_net_paid_inc_ship_tax decimal(7,2),
                           ws_net_profit decimal(7,2)
) with (
        'connector' = 'hudi',
        'path' = 'hdfs://path/to/web_sales',
        'table.type' = 'COPY_ON_WRITE',
        'hoodie.datasource.write.recordkey.field' = 'ws_item_sk,ws_order_number'
      );
{code}
And execute:
{code:sql}
select * from web_sales where ws_sold_date_sk = 2451268 and ws_sales_price 
between 100.00 and 150.00
{code}
An exception will occur:
{code:java}
Caused by: java.lang.NullPointerException: left cannot be null
        at java.util.Objects.requireNonNull(Objects.java:228)
        at 
org.apache.parquet.filter2.predicate.Operators$BinaryLogicalFilterPredicate.<init>(Operators.java:257)
        at 
org.apache.parquet.filter2.predicate.Operators$And.<init>(Operators.java:301)
        at 
org.apache.parquet.filter2.predicate.FilterApi.and(FilterApi.java:249)
        at 
org.apache.hudi.source.ExpressionPredicates$And.filter(ExpressionPredicates.java:551)
        at 
org.apache.hudi.source.ExpressionPredicates$Or.filter(ExpressionPredicates.java:589)
        at 
org.apache.hudi.source.ExpressionPredicates$Or.filter(ExpressionPredicates.java:589)
        at 
org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:68)
        at 
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:130)
        at 
org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:66)
        at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
{code}
After further investigation, decimal type is not comparable in the form it 
stored in parquet format (fix length byte array). The way that pushes down this 
filter to parquet predicates are not 
supported(ExpressionPredicates::toParquetPredicate does not provide decimal 
conversion). Then when it constructs the AND filter, both the filters of 
operands are null. That's how this issue reproduces.

If we execute this SQL:
{code:sql}
select 
{code}
{color:#ff66ff}*{color}
{code:sql}
 from web_sales where ws_sold_date_sk = 2451268 and ws_sales_price between 
100.00 and 150.00
{code}
It works without any problems as the predicates generated by pushing down 
process are null. Then Flink engine will filter the data instead of parquet.

To solve this, I plan to add null checks for both AND and OR filter predicates 
contruction. If the field type pushing down was not supported, the generated 
filter would be null. The pushing down could be disabled by not contructing the 
AND or OR filter if any of its operands is null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to