This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 62668e5 HIVE-25955: Partitioned tables migrated to Iceberg aren't cached in LLAP (#3026) (Adam Szita, reviewed by Peter Vary) 62668e5 is described below commit 62668e5b5b65e92f87e8f0188621d21fe1a98426 Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Tue Feb 15 08:49:58 2022 +0100 HIVE-25955: Partitioned tables migrated to Iceberg aren't cached in LLAP (#3026) (Adam Szita, reviewed by Peter Vary) --- .../apache/iceberg/orc/VectorizedReadUtils.java | 16 ++++++- .../test/queries/positive/llap_iceberg_read_orc.q | 18 +++++++- .../positive/llap/llap_iceberg_read_orc.q.out | 52 ++++++++++++++++++++++ .../hive/llap/io/api/impl/LlapRecordReader.java | 2 + 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java index 287dd04..2f6b3ab 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java @@ -21,6 +21,7 @@ package org.apache.iceberg.orc; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.conf.HiveConf; @@ -37,6 +38,7 @@ import org.apache.hive.iceberg.org.apache.orc.TypeDescription; import org.apache.hive.iceberg.org.apache.orc.impl.OrcTail; import org.apache.hive.iceberg.org.apache.orc.impl.ReaderImpl; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.Expression; @@ -134,15 +136,25 @@ public class VectorizedReadUtils { // We need to map with the current (i.e. current Hive table columns) full schema (without projections), // as OrcInputFormat will take care of the projections by the use of an include boolean array - Schema currentSchema = task.spec().schema(); + PartitionSpec spec = task.spec(); + Schema currentSchema = spec.schema(); TypeDescription readOrcSchema; if (ORCSchemaUtil.hasIds(fileSchema)) { readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, fileSchema); } else { + Schema readSchemaForOriginalFile = currentSchema; + // In case of migrated, originally partitioned tables, partition values are not present in the file + if (spec.isPartitioned()) { + readSchemaForOriginalFile = currentSchema.select(currentSchema.columns().stream() + .filter(c -> !spec.identitySourceIds().contains(c.fieldId())) + .map(c -> c.name()) + .collect(Collectors.toList())); + } + TypeDescription typeWithIds = ORCSchemaUtil.applyNameMapping(fileSchema, MappingUtil.create(currentSchema)); - readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, typeWithIds); + readOrcSchema = ORCSchemaUtil.buildOrcProjection(readSchemaForOriginalFile, typeWithIds); } job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING, readOrcSchema.toString()); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q index c692dd4..b171157 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q @@ -4,6 +4,7 @@ set hive.vectorized.execution.enabled=true; DROP TABLE IF EXISTS llap_orders PURGE; DROP TABLE IF EXISTS llap_items PURGE; +DROP TABLE IF EXISTS mig_source PURGE; CREATE EXTERNAL TABLE llap_items (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS ORC; @@ -103,4 +104,19 @@ SELECT state, max(city), avg(itemid) from llap_orders WHERE region = 'EU' GROUP --some more projections SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items i JOIN llap_orders o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name; -SELECT i.name, i.description, SUM(o.quantity) FROM llap_items i JOIN llap_orders o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description; \ No newline at end of file +SELECT i.name, i.description, SUM(o.quantity) FROM llap_items i JOIN llap_orders o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description; + +--------------------------------------------- +--Test migrated partitioned table gets cached + +CREATE EXTERNAL TABLE mig_source (id int) partitioned by (region string) stored as ORC; +INSERT INTO mig_source VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US'); +ALTER TABLE mig_source SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'); + +-- Should miss, but fill cache +SELECT region, SUM(id) from mig_source GROUP BY region; + +-- Should hit cache +set hive.llap.io.cache.only=true; +SELECT region, SUM(id) from mig_source GROUP BY region; +set hive.llap.io.cache.only=false; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out index 23392e3..585d8a0 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out @@ -6,6 +6,10 @@ PREHOOK: query: DROP TABLE IF EXISTS llap_items PURGE PREHOOK: type: DROPTABLE POSTHOOK: query: DROP TABLE IF EXISTS llap_items PURGE POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS mig_source PURGE +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS mig_source PURGE +POSTHOOK: type: DROPTABLE PREHOOK: query: CREATE EXTERNAL TABLE llap_items (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS ORC PREHOOK: type: CREATETABLE PREHOOK: Output: database:default @@ -411,3 +415,51 @@ Model 3 Performance 42 Model S Long range 389 Model S Plaid 221 Model Y Performance 163 +PREHOOK: query: CREATE EXTERNAL TABLE mig_source (id int) partitioned by (region string) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@mig_source +POSTHOOK: query: CREATE EXTERNAL TABLE mig_source (id int) partitioned by (region string) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@mig_source +PREHOOK: query: INSERT INTO mig_source VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@mig_source +POSTHOOK: query: INSERT INTO mig_source VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@mig_source +POSTHOOK: Output: default@mig_source@region=EU +POSTHOOK: Output: default@mig_source@region=US +POSTHOOK: Lineage: mig_source PARTITION(region=EU).id SCRIPT [] +POSTHOOK: Lineage: mig_source PARTITION(region=US).id SCRIPT [] +PREHOOK: query: ALTER TABLE mig_source SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@mig_source +PREHOOK: Output: default@mig_source +POSTHOOK: query: ALTER TABLE mig_source SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@mig_source +POSTHOOK: Output: default@mig_source +PREHOOK: query: SELECT region, SUM(id) from mig_source GROUP BY region +PREHOOK: type: QUERY +PREHOOK: Input: default@mig_source +#### A masked pattern was here #### +POSTHOOK: query: SELECT region, SUM(id) from mig_source GROUP BY region +POSTHOOK: type: QUERY +POSTHOOK: Input: default@mig_source +#### A masked pattern was here #### +EU 6 +US 3 +PREHOOK: query: SELECT region, SUM(id) from mig_source GROUP BY region +PREHOOK: type: QUERY +PREHOOK: Input: default@mig_source +#### A masked pattern was here #### +POSTHOOK: query: SELECT region, SUM(id) from mig_source GROUP BY region +POSTHOOK: type: QUERY +POSTHOOK: Input: default@mig_source +#### A masked pattern was here #### +EU 6 +US 3 diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index c2005d6..315752d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -84,6 +84,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> { @@ -135,6 +136,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf); if (!rr.checkOrcSchemaEvolution()) { rr.close(); + throwIfCacheOnlyRead(HiveConf.getBoolVar(job, ConfVars.LLAP_IO_CACHE_ONLY)); return null; } return rr;