[ https://issues.apache.org/jira/browse/HUDI-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ethan Guo updated HUDI-7309: ---------------------------- Fix Version/s: 0.15.0 > Disable filter pushing down when the parquet type corresponding to its field > logical type is not comparable > ----------------------------------------------------------------------------------------------------------- > > Key: HUDI-7309 > URL: https://issues.apache.org/jira/browse/HUDI-7309 > Project: Apache Hudi > Issue Type: Bug > Components: flink > Affects Versions: 0.14.0, 0.14.1 > Environment: Hudi 0.14.0 > Hudi 0.14.1rc1 > Flink 1.17.1 > Reporter: Yao Zhang > Assignee: Yao Zhang > Priority: Major > Labels: pull-request-available > Fix For: 0.15.0, 1.0.0 > > > Given thee table web_sales from TPCDS: > {code:sql} > CREATE TABLE web_sales ( > ws_sold_date_sk int, > ws_sold_time_sk int, > ws_ship_date_sk int, > ws_item_sk int, > ws_bill_customer_sk int, > ws_bill_cdemo_sk int, > ws_bill_hdemo_sk int, > ws_bill_addr_sk int, > ws_ship_customer_sk int, > ws_ship_cdemo_sk int, > ws_ship_hdemo_sk int, > ws_ship_addr_sk int, > ws_web_page_sk int, > ws_web_site_sk int, > ws_ship_mode_sk int, > ws_warehouse_sk int, > ws_promo_sk int, > ws_order_number int, > ws_quantity int, > ws_wholesale_cost decimal(7,2), > ws_list_price decimal(7,2), > ws_sales_price decimal(7,2), > ws_ext_discount_amt decimal(7,2), > ws_ext_sales_price decimal(7,2), > ws_ext_wholesale_cost decimal(7,2), > ws_ext_list_price decimal(7,2), > ws_ext_tax decimal(7,2), > ws_coupon_amt decimal(7,2), > ws_ext_ship_cost decimal(7,2), > ws_net_paid decimal(7,2), > ws_net_paid_inc_tax decimal(7,2), > ws_net_paid_inc_ship decimal(7,2), > ws_net_paid_inc_ship_tax decimal(7,2), > ws_net_profit decimal(7,2) > ) with ( > 'connector' = 'hudi', > 'path' = 'hdfs://path/to/web_sales', > 'table.type' = 'COPY_ON_WRITE', > 'hoodie.datasource.write.recordkey.field' = > 'ws_item_sk,ws_order_number' > ); > {code} > And execute: > {code:sql} > select * from web_sales where ws_sold_date_sk = 2451268 and ws_sales_price > between 100.00 and 150.00 > {code} > An exception will occur: > {code:java} > Caused by: java.lang.NullPointerException: left cannot be null > at java.util.Objects.requireNonNull(Objects.java:228) > at > org.apache.parquet.filter2.predicate.Operators$BinaryLogicalFilterPredicate.<init>(Operators.java:257) > at > org.apache.parquet.filter2.predicate.Operators$And.<init>(Operators.java:301) > at > org.apache.parquet.filter2.predicate.FilterApi.and(FilterApi.java:249) > at > org.apache.hudi.source.ExpressionPredicates$And.filter(ExpressionPredicates.java:551) > at > org.apache.hudi.source.ExpressionPredicates$Or.filter(ExpressionPredicates.java:589) > at > org.apache.hudi.source.ExpressionPredicates$Or.filter(ExpressionPredicates.java:589) > at > org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:68) > at > org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:130) > at > org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.open(CopyOnWriteInputFormat.java:66) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) > {code} > After further investigation, decimal type is not comparable in the form it > stored in parquet format (fix length byte array). The way that pushes down > this filter to parquet predicates are not > supported(ExpressionPredicates::toParquetPredicate does not provide decimal > conversion). Then when it constructs the AND filter, both the filters of > operands are null. That's how this issue reproduces. > If we execute this SQL: > {code:sql} > select * from web_sales where ws_sold_date_sk = 2451268 and ws_sales_price > between 100.00 and 150.00 > {code} > It works without any problems as the predicates generated by pushing down > process are null. Then Flink engine will filter the data instead of parquet. > To solve this, I plan to add null checks for both AND and OR filter > predicates contruction. If the field type pushing down was not supported, the > generated filter would be null. The pushing down could be disabled by not > contructing the AND or OR filter if any of its operands is null. -- This message was sent by Atlassian Jira (v8.20.10#820010)