This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9966c3d298 HIVE-26128: Enabling dynamic runtime filtering in Iceberg 
tables throws exception at runtime (#3404) (Adam Szita, reviewed by Peter Vary)
9966c3d298 is described below

commit 9966c3d2986aeb20fb3698f894c1ca32b390bfa8
Author: Adam Szita <40628386+sz...@users.noreply.github.com>
AuthorDate: Tue Jun 28 09:45:50 2022 +0200

    HIVE-26128: Enabling dynamic runtime filtering in Iceberg tables throws 
exception at runtime (#3404) (Adam Szita, reviewed by Peter Vary)
---
 data/conf/iceberg/tez/tez-site.xml                 |   8 +-
 .../iceberg/mr/hive/HiveIcebergFilterFactory.java  |  12 +-
 .../queries/positive/dynamic_semijoin_reduction.q  |  38 ++++
 .../positive/dynamic_semijoin_reduction.q.out      | 214 +++++++++++++++++++++
 4 files changed, 266 insertions(+), 6 deletions(-)

diff --git a/data/conf/iceberg/tez/tez-site.xml 
b/data/conf/iceberg/tez/tez-site.xml
index 7ad5ad4c66..467bfb4dad 100644
--- a/data/conf/iceberg/tez/tez-site.xml
+++ b/data/conf/iceberg/tez/tez-site.xml
@@ -1,8 +1,4 @@
 <configuration>
-  <property>
-    <name>tez.am.resource.memory.mb</name>
-    <value>128</value>
-  </property>
   <property>
     <name>tez.am.dag.scheduler.class</name>
     
<value>org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled</value>
@@ -11,4 +7,8 @@
     <name>tez.am.resource.memory.mb</name>
     <value>256</value>
   </property>
+  <property>
+    <name>hive.tez.container.size</name>
+    <value>512</value>
+  </property>
 </configuration>
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
index 1559325c9a..6101ad159a 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
@@ -107,8 +107,16 @@ public class HiveIcebergFilterFactory {
         return in(column, leafToLiteralList(leaf));
       case BETWEEN:
         List<Object> icebergLiterals = leafToLiteralList(leaf);
-        return and(greaterThanOrEqual(column, icebergLiterals.get(0)),
-                lessThanOrEqual(column, icebergLiterals.get(1)));
+        if (icebergLiterals.size() == 2) {
+          return and(greaterThanOrEqual(column, icebergLiterals.get(0)),
+              lessThanOrEqual(column, icebergLiterals.get(1)));
+        } else {
+          // In case semijoin reduction optimization was applied, there will 
be a BETWEEN( DynamicValue, DynamicValue)
+          // clause, where DynamicValue is not evaluable in Tez AM, where Hive 
filter is translated into Iceberg filter.
+          // Overwriting to constant true as the optimization will be utilized 
by Hive/Tez and no-op for Iceberg.
+          // (Also: the original filter and Iceberg filter are both part of 
JobConf on the execution side.)
+          return Expressions.alwaysTrue();
+        }
       case IS_NULL:
         return isNull(column);
       default:
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q
new file mode 100644
index 0000000000..035b5525f8
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_semijoin_reduction.q
@@ -0,0 +1,38 @@
+--! qt:dataset:srcpart
+--! qt:dataset:alltypesorc
+set hive.compute.query.using.stats=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.tez.dynamic.semijoin.reduction=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.stats.autogather=true;
+set hive.tez.bigtable.minsize.semijoin.reduction=1;
+set hive.tez.min.bloom.filter.entries=1;
+set hive.stats.fetch.column.stats=true;
+set hive.tez.bloom.filter.factor=1.0f;
+
+-- Create Tables
+create table srcpart_date_n7 (key string, value string) partitioned by (ds 
string ) stored by iceberg;
+CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING) partitioned by (ds 
string) stored by iceberg;
+
+-- Add Partitions
+--alter table srcpart_date_n7 add partition (ds = "2008-04-08");
+--alter table srcpart_date_n7 add partition (ds = "2008-04-09");
+
+--alter table srcpart_small_n3 add partition (ds = "2008-04-08");
+--alter table srcpart_small_n3 add partition (ds = "2008-04-09");
+
+-- Load
+insert overwrite table srcpart_date_n7   select key, value, ds from srcpart 
where ds = "2008-04-08";
+insert overwrite table srcpart_date_n7  select key, value, ds from srcpart 
where ds = "2008-04-09";
+insert overwrite table srcpart_small_n3  select key, value, ds from srcpart 
where ds = "2008-04-09" limit 20;
+
+EXPLAIN select count(*) from srcpart_date_n7 join srcpart_small_n3 on 
(srcpart_date_n7.key = srcpart_small_n3.key1);
+select count(*) from srcpart_date_n7 join srcpart_small_n3 on 
(srcpart_date_n7.key = srcpart_small_n3.key1);
+
+drop table srcpart_date_n7;
+drop table srcpart_small_n3;
\ No newline at end of file
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out
new file mode 100644
index 0000000000..ef5effcb09
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/dynamic_semijoin_reduction.q.out
@@ -0,0 +1,214 @@
+PREHOOK: query: create table srcpart_date_n7 (key string, value string) 
partitioned by (ds string ) stored by iceberg
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: create table srcpart_date_n7 (key string, value string) 
partitioned by (ds string ) stored by iceberg
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING) 
partitioned by (ds string) stored by iceberg
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcpart_small_n3
+POSTHOOK: query: CREATE TABLE srcpart_small_n3(key1 STRING, value1 STRING) 
partitioned by (ds string) stored by iceberg
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcpart_small_n3
+PREHOOK: query: insert overwrite table srcpart_date_n7   select key, value, ds 
from srcpart where ds = "2008-04-08"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: insert overwrite table srcpart_date_n7   select key, value, 
ds from srcpart where ds = "2008-04-08"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: insert overwrite table srcpart_date_n7  select key, value, ds 
from srcpart where ds = "2008-04-09"
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: insert overwrite table srcpart_date_n7  select key, value, ds 
from srcpart where ds = "2008-04-09"
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: insert overwrite table srcpart_small_n3  select key, value, ds 
from srcpart where ds = "2008-04-09" limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: Output: default@srcpart_small_n3
+POSTHOOK: query: insert overwrite table srcpart_small_n3  select key, value, 
ds from srcpart where ds = "2008-04-09" limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+POSTHOOK: Output: default@srcpart_small_n3
+PREHOOK: query: EXPLAIN select count(*) from srcpart_date_n7 join 
srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_date_n7
+PREHOOK: Input: default@srcpart_small_n3
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: EXPLAIN select count(*) from srcpart_date_n7 join 
srcpart_small_n3 on (srcpart_date_n7.key = srcpart_small_n3.key1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_date_n7
+POSTHOOK: Input: default@srcpart_small_n3
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Reducer 5 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_date_n7
+                  filterExpr: (key is not null and key BETWEEN 
DynamicValue(RS_7_srcpart_small_n3_key1_min) AND 
DynamicValue(RS_7_srcpart_small_n3_key1_max) and in_bloom_filter(key, 
DynamicValue(RS_7_srcpart_small_n3_key1_bloom_filter))) (type: boolean)
+                  Statistics: Num rows: 2000 Data size: 174000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (key is not null and key BETWEEN 
DynamicValue(RS_7_srcpart_small_n3_key1_min) AND 
DynamicValue(RS_7_srcpart_small_n3_key1_max) and in_bloom_filter(key, 
DynamicValue(RS_7_srcpart_small_n3_key1_bloom_filter))) (type: boolean)
+                    Statistics: Num rows: 2000 Data size: 174000 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 2000 Data size: 174000 Basic 
stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 2000 Data size: 174000 Basic 
stats: COMPLETE Column stats: COMPLETE
+            Execution mode: vectorized
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart_small_n3
+                  filterExpr: key1 is not null (type: boolean)
+                  Statistics: Num rows: 20 Data size: 1740 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key1 is not null (type: boolean)
+                    Statistics: Num rows: 20 Data size: 1740 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key1 (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 20 Data size: 1740 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 20 Data size: 1740 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 20 Data size: 1740 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        Group By Operator
+                          aggregations: min(_col0), max(_col0), 
bloom_filter(_col0, expectedEntries=20)
+                          minReductionHashAggr: 0.95
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Statistics: Num rows: 1 Data size: 552 Basic stats: 
COMPLETE Column stats: COMPLETE
+                          Reduce Output Operator
+                            null sort order: 
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 552 Basic 
stats: COMPLETE Column stats: COMPLETE
+                            value expressions: _col0 (type: string), _col1 
(type: string), _col2 (type: binary)
+            Execution mode: vectorized
+        Reducer 2 
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                Statistics: Num rows: 126 Data size: 1008 Basic stats: 
COMPLETE Column stats: COMPLETE
+                Group By Operator
+                  aggregations: count()
+                  minReductionHashAggr: 0.99
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  Reduce Output Operator
+                    null sort order: 
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: COMPLETE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  table:
+                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), 
bloom_filter(VALUE._col2, expectedEntries=20)
+                mode: final
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE 
Column stats: COMPLETE
+                Reduce Output Operator
+                  null sort order: 
+                  sort order: 
+                  Statistics: Num rows: 1 Data size: 552 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  value expressions: _col0 (type: string), _col1 (type: 
string), _col2 (type: binary)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from srcpart_date_n7 join srcpart_small_n3 on 
(srcpart_date_n7.key = srcpart_small_n3.key1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart_date_n7
+PREHOOK: Input: default@srcpart_small_n3
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) from srcpart_date_n7 join srcpart_small_n3 on 
(srcpart_date_n7.key = srcpart_small_n3.key1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart_date_n7
+POSTHOOK: Input: default@srcpart_small_n3
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+176
+PREHOOK: query: drop table srcpart_date_n7
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_date_n7
+PREHOOK: Output: default@srcpart_date_n7
+POSTHOOK: query: drop table srcpart_date_n7
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_date_n7
+POSTHOOK: Output: default@srcpart_date_n7
+PREHOOK: query: drop table srcpart_small_n3
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@srcpart_small_n3
+PREHOOK: Output: default@srcpart_small_n3
+POSTHOOK: query: drop table srcpart_small_n3
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@srcpart_small_n3
+POSTHOOK: Output: default@srcpart_small_n3

Reply via email to