[ 
https://issues.apache.org/jira/browse/HUDI-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7309:
----------------------------
    Fix Version/s: 0.15.0

> Disable filter pushing down when the parquet type corresponding to its field 
> logical type is not comparable
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-7309
>                 URL: https://issues.apache.org/jira/browse/HUDI-7309
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>    Affects Versions: 0.14.0, 0.14.1
>         Environment: Hudi 0.14.0
> Hudi 0.14.1rc1
> Flink 1.17.1
>            Reporter: Yao Zhang
>            Assignee: Yao Zhang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.15.0, 1.0.0
>
>
> Given thee table web_sales from TPCDS:
> {code:sql}
> CREATE TABLE web_sales (
>                            ws_sold_date_sk int,
>                            ws_sold_time_sk int,
>                            ws_ship_date_sk int,
>                            ws_item_sk int,
>                            ws_bill_customer_sk int,
>                            ws_bill_cdemo_sk int,
>                            ws_bill_hdemo_sk int,
>                            ws_bill_addr_sk int,
>                            ws_ship_customer_sk int,
>                            ws_ship_cdemo_sk int,
>                            ws_ship_hdemo_sk int,
>                            ws_ship_addr_sk int,
>                            ws_web_page_sk int,
>                            ws_web_site_sk int,
>                            ws_ship_mode_sk int,
>                            ws_warehouse_sk int,
>                            ws_promo_sk int,
>                            ws_order_number int,
>                            ws_quantity int,
>                            ws_wholesale_cost decimal(7,2),
>                            ws_list_price decimal(7,2),
>                            ws_sales_price decimal(7,2),
>                            ws_ext_discount_amt decimal(7,2),
>                            ws_ext_sales_price decimal(7,2),
>                            ws_ext_wholesale_cost decimal(7,2),
>                            ws_ext_list_price decimal(7,2),
>                            ws_ext_tax decimal(7,2),
>                            ws_coupon_amt decimal(7,2),
>                            ws_ext_ship_cost decimal(7,2),
>                            ws_net_paid decimal(7,2),
>                            ws_net_paid_inc_tax decimal(7,2),
>                            ws_net_paid_inc_ship decimal(7,2),
>                            ws_net_paid_inc_ship_tax decimal(7,2),
>                            ws_net_profit decimal(7,2)
> ) with (
>         'connector' = 'hudi',
>         'path' = 'hdfs://path/to/web_sales',
>         'table.type' = 'COPY_ON_WRITE',
>         'hoodie.datasource.write.recordkey.field' = 
> 'ws_item_sk,ws_order_number'
>       );
> {code}
> And execute:
> {code:sql}
> select * from web_sales where ws_sold_date_sk = 2451268 and ws_sales_price 
> between 100.00 and 150.00
> {code}
> An exception will occur:
> {code:java}
> Caused by: java.lang.NullPointerException: left cannot be null
>         at java.util.Objects.requireNonNull(Objects.java:228)
>         at 
> org.apache.parquet.filter2.predicate.Operators$BinaryLogicalFilterPredicate.<init>(Operators.java:257)
>         at 
> org.apache.parquet.filter2.predicate.Operators$And.<init>(Operators.java:301)
>         at 
> org.apache.parquet.filter2.predicate.FilterApi.and(FilterApi.java:249)
>         at 
> org.apache.hudi.source.ExpressionPredicates$And.filter(ExpressionPredicates.java:551)
>         at 
> org.apache.hudi.source.ExpressionPredicates$Or.filter(ExpressionPredicates.java:589)
>         at 
> org.apache.hudi.source.ExpressionPredicates$Or.filter(ExpressionPredicates.java:589)
>         at 
> org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:68)
>         at 
> org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:130)
>         at 
> org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:66)
>         at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
> {code}
> After further investigation, decimal type is not comparable in the form it 
> stored in parquet format (fix length byte array). The way that pushes down 
> this filter to parquet predicates are not 
> supported(ExpressionPredicates::toParquetPredicate does not provide decimal 
> conversion). Then when it constructs the AND filter, both the filters of 
> operands are null. That's how this issue reproduces.
> If we execute this SQL:
> {code:sql}
> select * from web_sales where ws_sold_date_sk = 2451268 and ws_sales_price 
> between 100.00 and 150.00
> {code}
> It works without any problems as the predicates generated by pushing down 
> process are null. Then Flink engine will filter the data instead of parquet.
> To solve this, I plan to add null checks for both AND and OR filter 
> predicates contruction. If the field type pushing down was not supported, the 
> generated filter would be null. The pushing down could be disabled by not 
> contructing the AND or OR filter if any of its operands is null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to