This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 9966c3d298 HIVE-26128: Enabling dynamic runtime filtering in Iceberg tables throws exception at runtime (#3404) (Adam Szita, reviewed by Peter Vary) 9966c3d298 is described below commit 9966c3d2986aeb20fb3698f894c1ca32b390bfa8 Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Tue Jun 28 09:45:50 2022 +0200 HIVE-26128: Enabling dynamic runtime filtering in Iceberg tables throws exception at runtime (#3404) (Adam Szita, reviewed by Peter Vary) --- data/conf/iceberg/tez/tez-site.xml | 8 +- .../iceberg/mr/hive/HiveIcebergFilterFactory.java | 12 +- .../queries/positive/dynamic_semijoin_reduction.q | 38 ++++ .../positive/dynamic_semijoin_reduction.q.out | 214 +++++++++++++++++++++ 4 files changed, 266 insertions(+), 6 deletions(-) diff --git a/data/conf/iceberg/tez/tez-site.xml b/data/conf/iceberg/tez/tez-site.xml index 7ad5ad4c66..467bfb4dad 100644 --- a/data/conf/iceberg/tez/tez-site.xml +++ b/data/conf/iceberg/tez/tez-site.xml @@ -1,8 +1,4 @@ <configuration> - <property> - <name>tez.am.resource.memory.mb</name> - <value>128</value> - </property> <property> <name>tez.am.dag.scheduler.class</name> <value>org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled</value> @@ -11,4 +7,8 @@ <name>tez.am.resource.memory.mb</name> <value>256</value> </property> + <property> + <name>hive.tez.container.size</name> + <value>512</value> + </property> </configuration> diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java index 1559325c9a..6101ad159a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java @@ -107,8 +107,16 @@ public class HiveIcebergFilterFactory { return in(column, leafToLiteralList(leaf)); case BETWEEN: List<Object> icebergLiterals = leafToLiteralList(leaf); - return and(greaterThanOrEqual(column, icebergLiterals.get(0)), - lessThanOrEqual(column, icebergLiterals.get(1))); + if (icebergLiterals.size() == 2) { + return and(greaterThanOrEqual(column, icebergLiterals.get(0)), + lessThanOrEqual(column, icebergLiterals.get(1))); + } else { + // In case semijoin reduction optimization was applied, there will be a BETWEEN( DynamicValue, DynamicValue) + // clause, where DynamicValue is not evaluable in Tez AM, where Hive filter is translated into Iceberg filter. + // Overwriting to constant true as the optimization will be utilized by Hive/Tez and no-op for Iceberg. + // (Also: the original filter and Iceberg filter are both part of JobConf on the execution side.) + return Expressions.alwaysTrue(); + } case IS_NULL: return isNull(column); default: diff --git a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q new file mode 100644 index 0000000000..035b5525f8 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q @@ -0,0 +1,38 @@ +--! qt:dataset:srcpart +--! qt:dataset:alltypesorc +set hive.compute.query.using.stats=false; +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.optimize.ppd=true; +set hive.ppd.remove.duplicatefilters=true; +set hive.tez.dynamic.partition.pruning=true; +set hive.tez.dynamic.semijoin.reduction=true; +set hive.optimize.metadataonly=false; +set hive.optimize.index.filter=true; +set hive.stats.autogather=true; +set hive.tez.bigtable.minsize.semijoin.reduction=1; +set hive.tez.min.bloom.filter.entries=1; +set hive.stats.fetch.column.stats=true; +set hive.tez.bloom.filter.factor=1.0f; + +-- Create Tables +create table srcpart_date_n7 (key string, value string) partitioned by (ds string ) stored by iceberg; +CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING) partitioned by (ds string) stored by iceberg; + +-- Add Partitions +--alter table srcpart_date_n7 add partition (ds = "2008-04-08"); +--alter table srcpart_date_n7 add partition (ds = "2008-04-09"); + +--alter table srcpart_small_n3 add partition (ds = "2008-04-08"); +--alter table srcpart_small_n3 add partition (ds = "2008-04-09"); + +-- Load +insert overwrite table srcpart_date_n7 select key, value, ds from srcpart where ds = "2008-04-08"; +insert overwrite table srcpart_date_n7 select key, value, ds from srcpart where ds = "2008-04-09"; +insert overwrite table srcpart_small_n3 select key, value, ds from srcpart where ds = "2008-04-09" limit 20; + +EXPLAIN select count(*) from srcpart_date_n7 join srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1); +select count(*) from srcpart_date_n7 join srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1); + +drop table srcpart_date_n7; +drop table srcpart_small_n3; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out new file mode 100644 index 0000000000..ef5effcb09 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out @@ -0,0 +1,214 @@ +PREHOOK: query: create table srcpart_date_n7 (key string, value string) partitioned by (ds string ) stored by iceberg +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcpart_date_n7 +POSTHOOK: query: create table srcpart_date_n7 (key string, value string) partitioned by (ds string ) stored by iceberg +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcpart_date_n7 +PREHOOK: query: CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING) partitioned by (ds string) stored by iceberg +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@srcpart_small_n3 +POSTHOOK: query: CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING) partitioned by (ds string) stored by iceberg +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcpart_small_n3 +PREHOOK: query: insert overwrite table srcpart_date_n7 select key, value, ds from srcpart where ds = "2008-04-08" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Output: default@srcpart_date_n7 +POSTHOOK: query: insert overwrite table srcpart_date_n7 select key, value, ds from srcpart where ds = "2008-04-08" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Output: default@srcpart_date_n7 +PREHOOK: query: insert overwrite table srcpart_date_n7 select key, value, ds from srcpart where ds = "2008-04-09" +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@srcpart_date_n7 +POSTHOOK: query: insert overwrite table srcpart_date_n7 select key, value, ds from srcpart where ds = "2008-04-09" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: default@srcpart_date_n7 +PREHOOK: query: insert overwrite table srcpart_small_n3 select key, value, ds from srcpart where ds = "2008-04-09" limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@srcpart_small_n3 +POSTHOOK: query: insert overwrite table srcpart_small_n3 select key, value, ds from srcpart where ds = "2008-04-09" limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: default@srcpart_small_n3 +PREHOOK: query: EXPLAIN select count(*) from srcpart_date_n7 join srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_date_n7 +PREHOOK: Input: default@srcpart_small_n3 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: EXPLAIN select count(*) from srcpart_date_n7 join srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_date_n7 +POSTHOOK: Input: default@srcpart_small_n3 +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart_date_n7 + filterExpr: (key is not null and key BETWEEN DynamicValue(RS_7_srcpart_small_n3_key1_min) AND DynamicValue(RS_7_srcpart_small_n3_key1_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_n3_key1_bloom_filter))) (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (key is not null and key BETWEEN DynamicValue(RS_7_srcpart_small_n3_key1_min) AND DynamicValue(RS_7_srcpart_small_n3_key1_max) and in_bloom_filter(key, DynamicValue(RS_7_srcpart_small_n3_key1_bloom_filter))) (type: boolean) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized + Map 4 + Map Operator Tree: + TableScan + alias: srcpart_small_n3 + filterExpr: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key1 is not null (type: boolean) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key1 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 20 Data size: 1740 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col0), max(_col0), bloom_filter(_col0, expectedEntries=20) + minReductionHashAggr: 0.95 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 126 Data size: 1008 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.99 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, expectedEntries=20) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from srcpart_date_n7 join srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_date_n7 +PREHOOK: Input: default@srcpart_small_n3 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from srcpart_date_n7 join srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_date_n7 +POSTHOOK: Input: default@srcpart_small_n3 +POSTHOOK: Output: hdfs://### HDFS PATH ### +176 +PREHOOK: query: drop table srcpart_date_n7 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@srcpart_date_n7 +PREHOOK: Output: default@srcpart_date_n7 +POSTHOOK: query: drop table srcpart_date_n7 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@srcpart_date_n7 +POSTHOOK: Output: default@srcpart_date_n7 +PREHOOK: query: drop table srcpart_small_n3 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@srcpart_small_n3 +PREHOOK: Output: default@srcpart_small_n3 +POSTHOOK: query: drop table srcpart_small_n3 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@srcpart_small_n3 +POSTHOOK: Output: default@srcpart_small_n3