This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new fa111f15008 HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter) fa111f15008 is described below commit fa111f150086f0069e3a6a5b8e6f34a4afae3b1b Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Mon Aug 8 14:42:16 2022 +0200 HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter) --- .../mr/hive/vector/HiveVectorizedReader.java | 22 +- .../parquet}/ParquetFooterInputFromCache.java | 122 +++--- .../queries/positive/llap_iceberg_read_parquet.q | 122 ++++++ .../positive/llap/llap_iceberg_read_parquet.q.out | 465 +++++++++++++++++++++ .../test/resources/testconfiguration.properties | 4 +- .../org/apache/hadoop/hive/llap/io/api/LlapIo.java | 17 + .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java | 58 +++ .../hive/llap/io/encoded/LlapOrcCacheLoader.java | 4 +- .../hive/llap/io/encoded/OrcEncodedDataReader.java | 18 +- .../llap/io/encoded/SerDeEncodedDataReader.java | 11 +- .../org/apache/hadoop/hive/llap/LlapHiveUtils.java | 30 +- .../apache/hadoop/hive/ql/io/SyntheticFileId.java | 29 ++ .../vector/ParquetFooterInputFromCache.java | 4 +- .../vector/VectorizedParquetRecordReader.java | 57 +-- 14 files changed, 839 insertions(+), 124 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java index 00b9b3c73f0..6e23dff92c6 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -42,7 +43,9 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hive.iceberg.org.apache.orc.OrcConf; +import org.apache.hive.iceberg.org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.hive.iceberg.org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -53,6 +56,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat; import org.apache.iceberg.orc.VectorizedReadUtils; +import org.apache.iceberg.parquet.ParquetFooterInputFromCache; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -125,6 +129,7 @@ public class HiveVectorizedReader { // TODO: Iceberg currently does not track the last modification time of a file. Until that's added, // we need to set Long.MIN_VALUE as last modification time in the fileId triplet. SyntheticFileId fileId = new SyntheticFileId(path, task.file().fileSizeInBytes(), Long.MIN_VALUE); + fileId.toJobConf(job); RecordReader<NullWritable, VectorizedRowBatch> recordReader = null; switch (format) { @@ -133,7 +138,7 @@ public class HiveVectorizedReader { break; case PARQUET: - recordReader = parquetRecordReader(job, reporter, task, path, start, length); + recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId); break; default: throw new UnsupportedOperationException("Vectorized Hive reading unimplemented for format: " + format); @@ -182,11 +187,22 @@ public class HiveVectorizedReader { } private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReader(JobConf job, Reporter reporter, - FileScanTask task, Path path, long start, long length) throws IOException { + FileScanTask task, Path path, long start, long length, SyntheticFileId fileId) throws IOException { InputSplit split = new FileSplit(path, start, length, job); VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat(); - MessageType fileSchema = ParquetFileReader.readFooter(job, path).getFileMetaData().getSchema(); + MemoryBufferOrBuffers footerData = null; + if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) && + LlapProxy.getIo() != null) { + LlapProxy.getIo().initCacheOnlyInputFormat(inputFormat); + footerData = LlapProxy.getIo().getParquetFooterBuffersFromCache(path, job, fileId); + } + + ParquetMetadata parquetMetadata = footerData != null ? + ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), ParquetMetadataConverter.NO_FILTER) : + ParquetFileReader.readFooter(job, path); + + MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema(); MessageType typeWithIds = null; Schema expectedSchema = task.spec().schema(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java similarity index 59% copy from ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java copy to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java index e2e60670cae..bd4bd74106f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java @@ -1,42 +1,52 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hadoop.hive.ql.io.parquet.vector; +package org.apache.iceberg.parquet; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; - import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.io.InputFile; -import org.apache.parquet.io.SeekableInputStream; +import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.hive.iceberg.org.apache.parquet.io.InputFile; +import org.apache.hive.iceberg.org.apache.parquet.io.SeekableInputStream; /** + * Copy of ParquetFooterInputFromCache from hive-exec module to switch dependent Parquet packages + * to the shaded version (org.apache.hive.iceberg.org.apache.parquet.io...) + * * The Parquet InputFile implementation that allows the reader to * read the footer from cache without being aware of the latter. * This implements both InputFile and the InputStream that the reader gets from InputFile. */ -final class ParquetFooterInputFromCache +public final class ParquetFooterInputFromCache extends SeekableInputStream implements InputFile { - final static int FOOTER_LENGTH_SIZE = 4; // For the file size check. + public static final int FOOTER_LENGTH_SIZE = 4; // For the file size check. private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE; private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length; - private final int length, footerLength; - private int position = 0, bufferIx = 0, bufferPos = 0; + private final int length; + private final int footerLength; + private int position = 0; + private int bufferIx = 0; + private int bufferPos = 0; private final MemoryBuffer[] cacheData; private final int[] positions; @@ -50,18 +60,18 @@ final class ParquetFooterInputFromCache cacheData = new MemoryBuffer[bufs.length + 1]; System.arraycopy(bufs, 0, cacheData, 0, bufs.length); } - int footerLength = 0; + int footerLen = 0; positions = new int[cacheData.length]; for (int i = 0; i < cacheData.length - 1; ++i) { - positions[i] = footerLength; + positions[i] = footerLen; int dataLen = cacheData[i].getByteBufferRaw().remaining(); assert dataLen > 0; - footerLength += dataLen; + footerLen += dataLen; } - positions[cacheData.length - 1] = footerLength; - cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLength); - this.footerLength = footerLength; - this.length = footerLength + FAKE_PREFIX_LENGTH + TAIL_LENGTH; + positions[cacheData.length - 1] = footerLen; + cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLen); + this.footerLength = footerLen; + this.length = footerLen + FAKE_PREFIX_LENGTH + TAIL_LENGTH; } @Override @@ -71,7 +81,7 @@ final class ParquetFooterInputFromCache @Override public SeekableInputStream newStream() throws IOException { - // Note: this doesn't maintain proper newStream semantics (if any). + // Note: this doesn't maintain proper newStream semantics (if any). // We could either clone this instead or enforce that this is only called once. return this; } @@ -82,9 +92,9 @@ final class ParquetFooterInputFromCache } @Override - public void seek(long targetPos) throws IOException { - this.position = (int)targetPos; - targetPos -= FAKE_PREFIX_LENGTH; + public void seek(long pos) throws IOException { + this.position = (int) pos; + long targetPos = pos - FAKE_PREFIX_LENGTH; // Not efficient, but we don't expect this to be called frequently. for (int i = 1; i <= positions.length; ++i) { int endPos = (i == positions.length) ? (length - FAKE_PREFIX_LENGTH) : positions[i]; @@ -94,25 +104,32 @@ final class ParquetFooterInputFromCache return; } } - throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength - + Arrays.toString(positions)); + throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength + + Arrays.toString(positions)); } @Override public void readFully(byte[] b, int offset, int len) throws IOException { - if (readInternal(b, offset, len) == len) return; + if (readInternal(b, offset, len) == len) { + return; + } throw new EOFException(); } - public int readInternal(byte[] b, int offset, int len) { - if (position >= length) return -1; - int argPos = offset, argEnd = offset + len; + public int readInternal(byte[] bytes, int offset, int len) { + if (position >= length) { + return -1; + } + int argPos = offset; + int argEnd = offset + len; while (argPos < argEnd) { - if (bufferIx == cacheData.length) return (argPos - offset); + if (bufferIx == cacheData.length) { + return argPos - offset; + } ByteBuffer data = cacheData[bufferIx].getByteBufferDup(); int toConsume = Math.min(argEnd - argPos, data.remaining() - bufferPos); data.position(data.position() + bufferPos); - data.get(b, argPos, toConsume); + data.get(bytes, argPos, toConsume); if (data.remaining() == 0) { ++bufferIx; bufferPos = 0; @@ -126,7 +143,9 @@ final class ParquetFooterInputFromCache @Override public int read() throws IOException { - if (position >= length) return -1; + if (position >= length) { + return -1; + } ++position; ByteBuffer data = cacheData[bufferIx].getByteBufferRaw(); int bp = bufferPos; @@ -148,9 +167,9 @@ final class ParquetFooterInputFromCache bb.position(bb.position() + result); } } else { - byte[] b = new byte[bb.remaining()]; - result = readInternal(b, 0, bb.remaining()); - bb.put(b, 0, result); + byte[] bytes = new byte[bb.remaining()]; + result = readInternal(bytes, 0, bb.remaining()); + bb.put(bytes, 0, result); } return result; } @@ -169,18 +188,19 @@ final class ParquetFooterInputFromCache * The fake buffer that emulates end of file, with footer length and magic. Given that these * can be generated based on the footer buffer itself, we don't cache them. */ - private final static class FooterEndBuffer implements MemoryBuffer { + private static final class FooterEndBuffer implements MemoryBuffer { private final ByteBuffer bb; - public FooterEndBuffer(int footerLength) { - byte[] b = new byte[8]; - b[0] = (byte) ((footerLength >>> 0) & 0xFF); - b[1] = (byte) ((footerLength >>> 8) & 0xFF); - b[2] = (byte) ((footerLength >>> 16) & 0xFF); - b[3] = (byte) ((footerLength >>> 24) & 0xFF); + + FooterEndBuffer(int footerLength) { + byte[] bytes = new byte[8]; + bytes[0] = (byte) ((footerLength >>> 0) & 0xFF); + bytes[1] = (byte) ((footerLength >>> 8) & 0xFF); + bytes[2] = (byte) ((footerLength >>> 16) & 0xFF); + bytes[3] = (byte) ((footerLength >>> 24) & 0xFF); for (int i = 0; i < ParquetFileWriter.MAGIC.length; ++i) { - b[4 + i] = ParquetFileWriter.MAGIC[i]; + bytes[4 + i] = ParquetFileWriter.MAGIC[i]; } - bb = ByteBuffer.wrap(b); + bb = ByteBuffer.wrap(bytes); } @Override @@ -193,4 +213,4 @@ final class ParquetFooterInputFromCache return bb.duplicate(); } } -} \ No newline at end of file +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q new file mode 100644 index 00000000000..d961dc74d6b --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q @@ -0,0 +1,122 @@ +--test against vectorized LLAP execution mode +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; + +DROP TABLE IF EXISTS llap_orders_parquet PURGE; +DROP TABLE IF EXISTS llap_items_parquet PURGE; +DROP TABLE IF EXISTS mig_source_parquet PURGE; + + +CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET; +INSERT INTO llap_items_parquet VALUES +(0, 35000, 'Sedan', 'Model 3', 'Standard range plus'), +(1, 45000, 'Sedan', 'Model 3', 'Long range'), +(2, 50000, 'Sedan', 'Model 3', 'Performance'), +(3, 48000, 'Crossover', 'Model Y', 'Long range'), +(4, 55000, 'Crossover', 'Model Y', 'Performance'), +(5, 83000, 'Sports', 'Model S', 'Long range'), +(6, 123000, 'Sports', 'Model S', 'Plaid'); + +CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET; +INSERT INTO llap_orders_parquet VALUES +(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'), +(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'), +(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'), +(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'), +(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'), +(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'), +(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'), +(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'), +(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'), +(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'), +(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'), +(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'), +(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'), +(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'), +(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'), +(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'), +(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'), +(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'), +(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'), +(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'), +(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA'); + +--select query without any schema change yet +SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description; + + +--schema evolution on unpartitioned table +--renames and reorders +ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description; +ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name; +SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description; + +--adding a column +ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float); +INSERT INTO llap_items_parquet VALUES +(7, 'Model X', 93000, 'Long range', 'SUV', 3.8), +(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5); +SELECT cat, min(to60) from llap_items_parquet group by cat; + +--removing a column +ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float); +INSERT INTO llap_items_parquet VALUES +(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5), +(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5); +SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name; + + +--schema evolution on partitioned table (including partition changes) +--renames and reorders +ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2; +ALTER TABLE llap_orders_parquet CHANGE p1 region string; +INSERT INTO llap_orders_parquet VALUES +(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129')); +SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region; + +ALTER TABLE llap_orders_parquet CHANGE p2 state string; +SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state; + +--adding new column +ALTER TABLE llap_orders_parquet ADD COLUMNS (city string); +INSERT INTO llap_orders_parquet VALUES +(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München'); +SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state; + +--making it a partition column +ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city); +INSERT INTO llap_orders_parquet VALUES +(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia'); +SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state; + +--de-partitioning a column +ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city); +INSERT INTO llap_orders_parquet VALUES +(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London'); +SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state; + +--removing a column from schema +ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string); +INSERT INTO llap_orders_parquet VALUES +(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris'); +SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state; + + +--some more projections +SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name; +SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description; + +--------------------------------------------- +--Test migrated partitioned table gets cached + +CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET; +INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US'); +ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'); + +-- Should miss, but fill cache +SELECT region, SUM(id) from mig_source_parquet GROUP BY region; + +-- Should hit cache +set hive.llap.io.cache.only=true; +SELECT region, SUM(id) from mig_source_parquet GROUP BY region; +set hive.llap.io.cache.only=false; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out new file mode 100644 index 00000000000..a7ae1a2c34d --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out @@ -0,0 +1,465 @@ +PREHOOK: query: DROP TABLE IF EXISTS llap_orders_parquet PURGE +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS llap_orders_parquet PURGE +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS llap_items_parquet PURGE +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS llap_items_parquet PURGE +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS mig_source_parquet PURGE +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS mig_source_parquet PURGE +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: INSERT INTO llap_items_parquet VALUES +(0, 35000, 'Sedan', 'Model 3', 'Standard range plus'), +(1, 45000, 'Sedan', 'Model 3', 'Long range'), +(2, 50000, 'Sedan', 'Model 3', 'Performance'), +(3, 48000, 'Crossover', 'Model Y', 'Long range'), +(4, 55000, 'Crossover', 'Model Y', 'Performance'), +(5, 83000, 'Sports', 'Model S', 'Long range'), +(6, 123000, 'Sports', 'Model S', 'Plaid') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: INSERT INTO llap_items_parquet VALUES +(0, 35000, 'Sedan', 'Model 3', 'Standard range plus'), +(1, 45000, 'Sedan', 'Model 3', 'Long range'), +(2, 50000, 'Sedan', 'Model 3', 'Performance'), +(3, 48000, 'Crossover', 'Model Y', 'Long range'), +(4, 55000, 'Crossover', 'Model Y', 'Performance'), +(5, 83000, 'Sports', 'Model S', 'Long range'), +(6, 123000, 'Sports', 'Model S', 'Plaid') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: INSERT INTO llap_orders_parquet VALUES +(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'), +(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'), +(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'), +(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'), +(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'), +(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'), +(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'), +(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'), +(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'), +(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'), +(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'), +(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'), +(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'), +(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'), +(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'), +(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'), +(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'), +(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'), +(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'), +(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'), +(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES +(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'), +(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'), +(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'), +(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'), +(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'), +(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'), +(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'), +(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'), +(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'), +(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'), +(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'), +(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'), +(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'), +(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'), +(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'), +(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'), +(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'), +(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'), +(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'), +(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'), +(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +Model 3 Performance 42 +Model S Long range 213 +Model S Plaid 132 +Model Y Performance 163 +PREHOOK: query: ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description +PREHOOK: type: ALTERTABLE_RENAMECOL +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description +POSTHOOK: type: ALTERTABLE_RENAMECOL +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name +PREHOOK: type: ALTERTABLE_RENAMECOL +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name +POSTHOOK: type: ALTERTABLE_RENAMECOL +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +Model S Plaid 132 +PREHOOK: query: ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float) +PREHOOK: type: ALTERTABLE_ADDCOLS +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float) +POSTHOOK: type: ALTERTABLE_ADDCOLS +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: INSERT INTO llap_items_parquet VALUES +(7, 'Model X', 93000, 'Long range', 'SUV', 3.8), +(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: INSERT INTO llap_items_parquet VALUES +(7, 'Model X', 93000, 'Long range', 'SUV', 3.8), +(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: SELECT cat, min(to60) from llap_items_parquet group by cat +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_items_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT cat, min(to60) from llap_items_parquet group by cat +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_items_parquet +#### A masked pattern was here #### +Crossover NULL +SUV 2.5 +Sedan NULL +Sports NULL +PREHOOK: query: ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: INSERT INTO llap_items_parquet VALUES +(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5), +(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_items_parquet +POSTHOOK: query: INSERT INTO llap_items_parquet VALUES +(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5), +(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_items_parquet +PREHOOK: query: SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_items_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_items_parquet +#### A masked pattern was here #### +Cybertruck 4.5 50000 +Model S NULL 123000 +Model X 2.5 113000 +Model Y NULL 55000 +PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2 +PREHOOK: type: ALTERTABLE_RENAMECOL +PREHOOK: Input: default@llap_orders_parquet +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2 +POSTHOOK: type: ALTERTABLE_RENAMECOL +POSTHOOK: Input: default@llap_orders_parquet +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p1 region string +PREHOOK: type: ALTERTABLE_RENAMECOL +PREHOOK: Input: default@llap_orders_parquet +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p1 region string +POSTHOOK: type: ALTERTABLE_RENAMECOL +POSTHOOK: Input: default@llap_orders_parquet +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: INSERT INTO llap_orders_parquet VALUES +(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129')) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES +(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129')) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +EU 2000-01-04 19:55:46.129 153 +US 2007-06-24 19:23:22.829 12 +PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p2 state string +PREHOOK: type: ALTERTABLE_RENAMECOL +PREHOOK: Input: default@llap_orders_parquet +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p2 state string +POSTHOOK: type: ALTERTABLE_RENAMECOL +POSTHOOK: Input: default@llap_orders_parquet +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +EU ES 2007-12-02 22:30:39.302 18 +EU HU 2000-01-04 19:55:46.129 75 +EU IT 2012-08-28 01:35:54.283 60 +US TX 2007-06-24 19:23:22.829 12 +PREHOOK: query: ALTER TABLE llap_orders_parquet ADD COLUMNS (city string) +PREHOOK: type: ALTERTABLE_ADDCOLS +PREHOOK: Input: default@llap_orders_parquet +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: ALTER TABLE llap_orders_parquet ADD COLUMNS (city string) +POSTHOOK: type: ALTERTABLE_ADDCOLS +POSTHOOK: Input: default@llap_orders_parquet +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: INSERT INTO llap_orders_parquet VALUES +(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES +(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +DE München +ES NULL +FR NULL +HU NULL +IT NULL +UK NULL +PREHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city) +PREHOOK: type: ALTERTABLE_SETPARTSPEC +PREHOOK: Input: default@llap_orders_parquet +POSTHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city) +POSTHOOK: type: ALTERTABLE_SETPARTSPEC +POSTHOOK: Input: default@llap_orders_parquet +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: INSERT INTO llap_orders_parquet VALUES +(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES +(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +DE München 5.75 +ES NULL 5.0 +FR NULL 2.75 +HU NULL 7.0 +IT Venezia 6.0 +UK NULL 1.0 +PREHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city) +PREHOOK: type: ALTERTABLE_SETPARTSPEC +PREHOOK: Input: default@llap_orders_parquet +POSTHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city) +POSTHOOK: type: ALTERTABLE_SETPARTSPEC +POSTHOOK: Input: default@llap_orders_parquet +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: INSERT INTO llap_orders_parquet VALUES +(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES +(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +DE München 5.75 +ES NULL 5.0 +FR NULL 2.75 +HU NULL 7.0 +IT Venezia 6.0 +UK London 2.0 +PREHOOK: query: ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string) +PREHOOK: type: ALTERTABLE_REPLACECOLS +PREHOOK: Input: default@llap_orders_parquet +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string) +POSTHOOK: type: ALTERTABLE_REPLACECOLS +POSTHOOK: Input: default@llap_orders_parquet +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: INSERT INTO llap_orders_parquet VALUES +(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@llap_orders_parquet +POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES +(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@llap_orders_parquet +PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +DE München 5.75 +ES NULL 5.0 +FR Paris 3.2 +HU NULL 7.0 +IT Venezia 6.0 +UK London 2.0 +PREHOOK: query: SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +München Cybertruck 50000 4.5 99 +NULL Model 3 50000 NULL 42 +Venezia Model S 123000 NULL 89 +NULL Model S 83000 NULL 185 +NULL Model Y 55000 NULL 76 +PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description +PREHOOK: type: QUERY +PREHOOK: Input: default@llap_items_parquet +PREHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description +POSTHOOK: type: QUERY +POSTHOOK: Input: default@llap_items_parquet +POSTHOOK: Input: default@llap_orders_parquet +#### A masked pattern was here #### +Cybertruck Dual Motor AWD 99 +Model 3 Performance 42 +Model S Long range 389 +Model S Plaid 221 +Model Y Performance 163 +PREHOOK: query: CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@mig_source_parquet +POSTHOOK: query: CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@mig_source_parquet +PREHOOK: query: INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@mig_source_parquet +POSTHOOK: query: INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@mig_source_parquet +POSTHOOK: Output: default@mig_source_parquet@region=EU +POSTHOOK: Output: default@mig_source_parquet@region=US +POSTHOOK: Lineage: mig_source_parquet PARTITION(region=EU).id SCRIPT [] +POSTHOOK: Lineage: mig_source_parquet PARTITION(region=US).id SCRIPT [] +PREHOOK: query: ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@mig_source_parquet +PREHOOK: Output: default@mig_source_parquet +POSTHOOK: query: ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@mig_source_parquet +POSTHOOK: Output: default@mig_source_parquet +PREHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region +PREHOOK: type: QUERY +PREHOOK: Input: default@mig_source_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region +POSTHOOK: type: QUERY +POSTHOOK: Input: default@mig_source_parquet +#### A masked pattern was here #### +EU 6 +US 3 +PREHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region +PREHOOK: type: QUERY +PREHOOK: Input: default@mig_source_parquet +#### A masked pattern was here #### +POSTHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region +POSTHOOK: type: QUERY +POSTHOOK: Input: default@mig_source_parquet +#### A masked pattern was here #### +EU 6 +US 3 diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index b32ad20d577..0b589fe4d36 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -404,9 +404,11 @@ erasurecoding.only.query.files=\ iceberg.llap.query.files=\ llap_iceberg_read_orc.q,\ + llap_iceberg_read_parquet.q,\ vectorized_iceberg_read_mixed.q,\ vectorized_iceberg_read_orc.q,\ vectorized_iceberg_read_parquet.q iceberg.llap.only.query.files=\ - llap_iceberg_read_orc.q + llap_iceberg_read_orc.q,\ + llap_iceberg_read_parquet.q diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java index a4fc13a0ee0..c650288a93a 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.Deserializer; @@ -62,6 +64,21 @@ public interface LlapIo<T> { */ OrcTail getOrcTailFromCache(Path path, Configuration conf, CacheTag tag, @Nullable Object fileKey) throws IOException; + + /** + * Returns the metadata buffers associated with the Parquet file on the given path. + * Content is either obtained from cache, or from disk if there is a cache miss. + * @param path Parquet file path + * @param conf jobConf + * @param fileKey fileId of the Parquet file (either the Long fileId of HDFS or the SyntheticFileId). + * Optional, if it is not provided, it will be generated, see: + * org.apache.hadoop.hive.ql.io.HdfsUtils#getFileId() + * @return + * @throws IOException + */ + MemoryBufferOrBuffers getParquetFooterBuffersFromCache(Path path, JobConf conf, @Nullable Object fileKey) + throws IOException; + /** * Handles request to evict entities specified in the request object. * @param protoRequest lists Hive entities (DB, table, etc..) whose LLAP buffers should be evicted. diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index ecf8d2575c9..4634c4639f0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -27,10 +27,14 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import javax.annotation.Nullable; import javax.management.ObjectName; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; import org.apache.hadoop.hive.llap.ProactiveEviction; import org.apache.hadoop.hive.llap.cache.LlapCacheHydration; import org.apache.hadoop.hive.llap.cache.MemoryLimitedPathCache; @@ -82,6 +86,8 @@ import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.vector.ParquetFooterInputFromCache; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.io.NullWritable; @@ -93,12 +99,19 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hive.common.util.FixedSizedObjectPool; import org.apache.hive.common.util.HiveStringUtils; import org.apache.orc.impl.OrcTail; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.io.SeekableInputStream; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; + public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump { public static final Logger LOG = LoggerFactory.getLogger("LlapIoImpl"); public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc"); @@ -455,6 +468,50 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump { } } + @Override + public MemoryBufferOrBuffers getParquetFooterBuffersFromCache(Path path, JobConf conf, @Nullable Object fileKey) + throws IOException { + + Preconditions.checkNotNull(fileMetadataCache, "Metadata cache must not be null"); + + boolean isReadCacheOnly = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_CACHE_ONLY); + CacheTag tag = VectorizedParquetRecordReader.cacheTagOfParquetFile(path, daemonConf, conf); + + MemoryBufferOrBuffers footerData = (fileKey == null ) ? null + : fileMetadataCache.getFileMetadata(fileKey); + if (footerData != null) { + LOG.info("Found the footer in cache for " + fileKey); + try { + return footerData; + } finally { + fileMetadataCache.decRefBuffer(footerData); + } + } else { + throwIfCacheOnlyRead(isReadCacheOnly); + } + + final FileSystem fs = path.getFileSystem(conf); + final FileStatus stat = fs.getFileStatus(path); + + // To avoid reading the footer twice, we will cache it first and then read from cache. + // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact. + try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(path))) { + long footerLengthIndex = stat.getLen() + - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length; + stream.seek(footerLengthIndex); + int footerLength = BytesUtils.readIntLittleEndian(stream); + stream.seek(footerLengthIndex - footerLength); + LOG.info("Caching the footer of length " + footerLength + " for " + fileKey); + // Note: we don't pass in isStopped here - this is not on an IO thread. + footerData = fileMetadataCache.putFileMetadata(fileKey, footerLength, stream, tag, null); + try { + return footerData; + } finally { + fileMetadataCache.decRefBuffer(footerData); + } + } + } + @Override public LlapDaemonProtocolProtos.CacheEntryList fetchCachedContentInfo() { if (useLowLevelCache) { @@ -479,4 +536,5 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump { LOG.warn("Cannot load data into the cache. Low level cache is disabled."); } } + } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java index e4e38392b75..1fe56c08dd6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.FileMetadataCache; import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; +import org.apache.hadoop.hive.llap.LlapHiveUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile; import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader; @@ -42,7 +43,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.function.Supplier; -import static org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.determineFileId; import static org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.getFsSupplier; /** @@ -77,7 +77,7 @@ public class LlapOrcCacheLoader implements AutoCloseable { public void init() throws IOException { fsSupplier = getFsSupplier(path, daemonConf); - Object fileKey = determineFileId(fsSupplier, path, daemonConf); + Object fileKey = LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), path, daemonConf); if(!fileKey.equals(this.fileKey)) { throw new IOException("File key mismatch."); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index aa03bde34e5..7d840599c9a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -499,8 +499,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> return true; } - private static Object determineFileId(Supplier<FileSystem> fsSupplier, - FileSplit split, Configuration daemonConf) throws IOException { + private static Object determineFileId(Supplier<FileSystem> fsSupplier, FileSplit split, Configuration daemonConf) + throws IOException { if (split instanceof OrcSplit) { Object fileKey = ((OrcSplit)split).getFileKey(); @@ -509,17 +509,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } } LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); - return determineFileId(fsSupplier, split.getPath(), daemonConf); - } - - static Object determineFileId(Supplier<FileSystem> fsSupplier, Path path, Configuration daemonConf) - throws IOException { - - boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID); - boolean checkDefaultFs = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID); - boolean forceSynthetic = !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH); - - return HdfsUtils.getFileId(fsSupplier.get(), path, allowSynthetic, checkDefaultFs, forceSynthetic); + return LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), split.getPath(), daemonConf); } /** @@ -590,7 +580,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> Configuration daemonConf, MetadataCache metadataCache, Object fileKey) throws IOException { Supplier<FileSystem> fsSupplier = getFsSupplier(path, jobConf); if (fileKey == null) { - fileKey = determineFileId(fsSupplier, path, daemonConf); + fileKey = LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), path, daemonConf); } if(fileKey == null || metadataCache == null) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index f9058ef7956..40b745e09ff 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCa import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions; @@ -224,10 +223,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> fs = split.getPath().getFileSystem(daemonConf); PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts); - fileKey = determineCacheKey(fs, split, partitionDesc, - HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), - HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), - !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); + fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf); cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null; this.sourceInputFormat = sourceInputFormat; @@ -1733,12 +1729,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } private static Object determineCacheKey(FileSystem fs, FileSplit split, PartitionDesc partitionDesc, - boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException { + Configuration daemonConf) + throws IOException { /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split. Object fileKey = ((OrcSplit)split).getFileKey(); if (fileKey != null) return fileKey; */ LlapIoImpl.LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); - Object fileId = HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); + Object fileId = LlapHiveUtils.createFileIdUsingFS(fs, split.getPath(), daemonConf); return SchemaAwareCacheKey.buildCacheKey(fileId, LlapHiveUtils.getSchemaHash(partitionDesc)); } diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java index 52126d971c1..ba62b8d89c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java @@ -18,16 +18,18 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; -import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -145,4 +147,30 @@ public final class LlapHiveUtils { return "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); } + /** + * Determines the fileID for the given path using the FileSystem type provided while considering daemon configuration. + * Invokes HdfsUtils.getFileId(), the resulting file ID can be of types Long (inode) or SyntheticFileId depending + * on the FS type and the actual daemon configuration. + * Can be costly on cloud file systems. + * @param fs FileSystem type + * @param path Path associated to this file + * @param daemonConf Llap daemon configuration + * @return the generated fileID, can be null in special cases (e.g. conf disallows synthetic ID on a non-HDFS FS) + * @throws IOException + */ + public static Object createFileIdUsingFS(FileSystem fs, Path path, Configuration daemonConf) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Will invoke HdfsUtils.getFileId - this is costly on cloud file systems. " + + "Turn on TRACE level logging to show call trace."); + if (LOG.isTraceEnabled()) { + LOG.trace(Arrays.deepToString(Thread.currentThread().getStackTrace())); + } + } + boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID); + boolean checkDefaultFs = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID); + boolean forceSynthetic = !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH); + + return HdfsUtils.getFileId(fs, path, allowSynthetic, checkDefaultFs, forceSynthetic); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java index dc79e1076b0..56df1547910 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java @@ -21,12 +21,19 @@ package org.apache.hadoop.hive.ql.io; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; public final class SyntheticFileId implements Writable { + + private static final String JOBCONF_KEY = "SYNTHETIC_FILE_ID"; + private static final Pattern STRING_PATTERN = Pattern.compile("\\[(-?\\d+),\\s(-?\\d+),\\s(\\d+)\\]"); + private long pathHash; private long modTime; private long length; @@ -41,6 +48,16 @@ public final class SyntheticFileId implements Writable { this.length = len; } + private SyntheticFileId(String fileIdAsString) { + Matcher matcher = STRING_PATTERN.matcher(fileIdAsString); + if (!matcher.matches()) { + throw new IllegalArgumentException("Expected format " + STRING_PATTERN + " but got " + fileIdAsString); + } + this.pathHash = Long.parseLong(matcher.group(1)); + this.modTime = Long.parseLong(matcher.group(2)); + this.length = Long.parseLong(matcher.group(3)); + } + public SyntheticFileId(FileStatus file) { this(file.getPath(), file.getLen(), file.getModificationTime()); } @@ -109,4 +126,16 @@ public final class SyntheticFileId implements Writable { public long getLength() { return length; } + + public void toJobConf(JobConf job) { + job.set(JOBCONF_KEY, this.toString()); + } + + public static SyntheticFileId fromJobConf(JobConf job) { + String idAsString = job.get(JOBCONF_KEY); + if (idAsString == null) { + return null; + } + return new SyntheticFileId(idAsString); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java index e2e60670cae..49960caf7aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java @@ -30,9 +30,9 @@ import org.apache.parquet.io.SeekableInputStream; * read the footer from cache without being aware of the latter. * This implements both InputFile and the InputStream that the reader gets from InputFile. */ -final class ParquetFooterInputFromCache +public final class ParquetFooterInputFromCache extends SeekableInputStream implements InputFile { - final static int FOOTER_LENGTH_SIZE = 4; // For the file size check. + public final static int FOOTER_LENGTH_SIZE = 4; // For the file size check. private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE; private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length; private final int length, footerLength; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index e0e14863dfd..f7b13cb3d6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -30,12 +30,14 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapCacheAwareFs; import org.apache.hadoop.hive.llap.LlapHiveUtils; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.BucketIdentifier; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.SyntheticFileId; import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -52,12 +54,10 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.parquet.ParquetRuntimeException; -import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetInputSplit; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -83,7 +83,6 @@ import java.util.List; import java.util.Set; import java.util.TreeMap; -import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; /** @@ -104,7 +103,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private Object[] partitionValues; private Path cacheFsPath; private static final int MAP_DEFINITION_LEVEL_MAX = 3; - private final boolean isReadCacheOnly; /** * For each request column, the reader to read this column. This is NULL if this column @@ -150,11 +148,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase this.cacheConf = cacheConf; if (metadataCache != null) { - cacheKey = HdfsUtils.getFileId(filePath.getFileSystem(conf), filePath, - HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), - HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), - !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); - // HdfsUtils.getFileId might yield to null in certain configurations + cacheKey = SyntheticFileId.fromJobConf(conf); + if (cacheKey == null) { + cacheKey = LlapHiveUtils.createFileIdUsingFS(filePath.getFileSystem(conf), filePath, cacheConf); + } + // createFileIdUsingFS() might yield to null in certain configurations if (cacheKey != null) { cacheTag = cacheTagOfParquetFile(filePath, cacheConf, conf); // If we are going to use cache, change the path to depend on file ID for extra consistency. @@ -169,7 +167,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); //initialize the rowbatchContext - isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY); rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf); if (parquetInputSplit != null) { @@ -285,40 +282,14 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private ParquetMetadata readSplitFooter(JobConf configuration, final Path file, Object cacheKey, MetadataFilter filter, CacheTag tag) throws IOException { - MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null - : metadataCache.getFileMetadata(cacheKey); - if (footerData != null) { - LOG.info("Found the footer in cache for " + cacheKey); - try { - return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); - } finally { - metadataCache.decRefBuffer(footerData); - } - } else { - throwIfCacheOnlyRead(isReadCacheOnly); - } - final FileSystem fs = file.getFileSystem(configuration); - final FileStatus stat = fs.getFileStatus(file); if (cacheKey == null || metadataCache == null) { + // Non-LLAP case + FileSystem fs = file.getFileSystem(configuration); + FileStatus stat = fs.getFileStatus(file); return readFooterFromFile(file, fs, stat, filter); - } - - // To avoid reading the footer twice, we will cache it first and then read from cache. - // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact. - try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(file))) { - long footerLengthIndex = stat.getLen() - - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length; - stream.seek(footerLengthIndex); - int footerLength = BytesUtils.readIntLittleEndian(stream); - stream.seek(footerLengthIndex - footerLength); - LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey); - // Note: we don't pass in isStopped here - this is not on an IO thread. - footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag, null); - try { - return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); - } finally { - metadataCache.decRefBuffer(footerData); - } + } else { + MemoryBufferOrBuffers footerData = LlapProxy.getIo().getParquetFooterBuffersFromCache(file, configuration, cacheKey); + return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter); } } @@ -337,7 +308,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase return ParquetFileReader.readFooter(inputFile, filter); } - private static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, JobConf jobConf) { + public static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, JobConf jobConf) { MapWork mapWork = LlapHiveUtils.findMapWork(jobConf); if (!HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE) || mapWork == null) { return null;