This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new f39f518 HIVE-23729: LLAP text cache fails when using multiple tables/schemas on the same files (Adam Szita, reviewed by Oliver Draese, Peter Vary) f39f518 is described below commit f39f518a14a5d782e6e47fc693311ffac7a0abea Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Tue Jun 23 14:12:45 2020 +0200 HIVE-23729: LLAP text cache fails when using multiple tables/schemas on the same files (Adam Szita, reviewed by Oliver Draese, Peter Vary) --- data/files/csv/00000.csv | 32 +++++++++++ .../hive/llap/cache/SerDeLowLevelCacheImpl.java | 4 +- .../hive/llap/io/encoded/OrcEncodedDataReader.java | 3 +- .../llap/io/encoded/SerDeEncodedDataReader.java | 11 ++-- .../org/apache/hadoop/hive/llap/LlapHiveUtils.java | 38 +++++++++++-- .../hadoop/hive/llap/SchemaAwareCacheKey.java | 65 ++++++++++++++++++++++ .../vector/VectorizedParquetRecordReader.java | 3 +- ql/src/test/queries/clientpositive/csv_llap.q | 21 ++++++- .../results/clientpositive/llap/csv_llap.q.out | 41 ++++++++++++-- 9 files changed, 195 insertions(+), 23 deletions(-) diff --git a/data/files/csv/00000.csv b/data/files/csv/00000.csv new file mode 100644 index 0000000..ba424c0 --- /dev/null +++ b/data/files/csv/00000.csv @@ -0,0 +1,32 @@ +00117,b95f91b8-a2a0-4d28-85ab-54ec0a80535d,FALSE,TRUE,FALSE,FALSE,FALSE +194483,ced84e01-c225-4a4f-b666-75ad4f62fe20,FALSE,FALSE,TRUE,TRUE,TRUE +269414,98451f7e-c82d-4fab-964d-ea49d4c112fe,TRUE,FALSE,FALSE,TRUE,TRUE +591165,a4124e14-cb38-42bb-a01c-41c67dbf96cd,FALSE,FALSE,TRUE,FALSE,FALSE +239413,5d61b10e-ff2a-42f9-b53d-633624de361f,TRUE,TRUE,FALSE,FALSE,TRUE +210078,7264397c-68e9-4a58-9f65-88132bdf6e19,FALSE,TRUE,FALSE,TRUE,FALSE +453386,a8451345-1d3e-4119-9bdd-29a53ac3c6e9,TRUE,FALSE,FALSE,FALSE,FALSE +252028,0e1b95c7-8b1b-40ce-a0a3-ca7264c85c36,TRUE,FALSE,FALSE,FALSE,TRUE +34488,40b8125c-9281-46c4-9d5b-cf224602e518,FALSE,FALSE,FALSE,TRUE,TRUE +634211,92a674cc-a56d-4653-b63f-a054cc2fd357,FALSE,FALSE,FALSE,TRUE,TRUE +300632,39dad068-50f8-49cd-8ee9-fbfa08440212,TRUE,TRUE,TRUE,TRUE,FALSE +807477,4c736a2c-efc9-4bb2-be67-4ef661cfa5bc,TRUE,FALSE,FALSE,FALSE,TRUE +750354,9e6a04c4-2432-4321-903c-b6ac5355b8cc,TRUE,FALSE,FALSE,TRUE,FALSE +244589,85054796-31c5-4f8c-a921-c216be9c6c4f,FALSE,TRUE,FALSE,FALSE,TRUE +474061,8521a204-3288-48ba-8c12-c8399acf05b6,TRUE,TRUE,TRUE,FALSE,FALSE +537646,35241560-d282-4807-9ecb-d1e6e6d74b61,TRUE,FALSE,TRUE,FALSE,FALSE +715496,53fbfa25-0571-4bf4-a4cc-721bdca030f1,TRUE,TRUE,FALSE,FALSE,TRUE +301813,98e711fa-80c7-44b4-9140-684cae60e79f,TRUE,FALSE,TRUE,FALSE,TRUE +438598,6124cb91-2bf8-4d18-bc1a-aadbf6b8d543,TRUE,FALSE,TRUE,TRUE,TRUE +409611,6cc3ee6f-53e0-4867-8ebc-f846241c813d,TRUE,TRUE,FALSE,TRUE,TRUE +575004,646fcfc5-72d4-41ab-8595-74cfaee7eaae,TRUE,TRUE,TRUE,TRUE,TRUE +613055,e775fd20-67e3-40a6-ac0b-b7182a89acd0,FALSE,FALSE,FALSE,TRUE,TRUE +95304,951e4ac8-ac20-4d12-ac31-d3a1b205e06a,FALSE,TRUE,TRUE,TRUE,FALSE +440611,a29e1e6f-a419-46e1-b85f-af8b607c77e1,FALSE,FALSE,FALSE,TRUE,FALSE +198916,b8a955a1-3c75-4428-9af2-b081e1209f3a,FALSE,TRUE,TRUE,FALSE,TRUE +676296,702ad9a0-7aa4-4c32-968b-c9c61623d71a,TRUE,TRUE,TRUE,TRUE,FALSE +117344,ed9bf09e-b3dd-44ad-8d54-b8f9372fe002,FALSE,TRUE,TRUE,FALSE,FALSE +486264,ffff7b94-0e5d-4f01-a008-ddb21674bd03,FALSE,FALSE,FALSE,TRUE,FALSE +216051,da7bdb1c-688f-49e5-9a49-0ec65c2f2d08,TRUE,FALSE,TRUE,TRUE,TRUE +804209,bea55182-b650-4c4f-9983-568745f7e96a,TRUE,TRUE,FALSE,TRUE,FALSE +31651,a33cca26-60e6-4744-a1c6-589e4aeb012b,TRUE,TRUE,TRUE,FALSE,TRUE +324048,c645867d-6e80-4ac4-a375-9103340a327d,FALSE,TRUE,TRUE,TRUE,FALSE diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java index 7930fd9..1478aeb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java @@ -770,7 +770,6 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu try { FileData fd = e.getValue().getCache(); int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0, fileMoving = 0; - sb.append(fd.colCount).append(" columns, ").append(fd.stripes.size()).append(" stripes; "); for (StripeData stripe : fd.stripes) { if (stripe.data == null) continue; for (int i = 0; i < stripe.data.length; ++i) { @@ -807,7 +806,8 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu allEvicted += fileEvicted; allMoving += fileMoving; sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, " + fileUnlocked - + " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved"); + + " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved; "); + sb.append(fd.colCount).append(" columns, ").append(fd.stripes.size()).append(" stripes"); } finally { e.getValue().decRef(); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 35d066a..479656f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -219,8 +219,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // LlapInputFormat needs to know the file schema to decide if schema evolution is supported. orcReader = null; + PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts); cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) - ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null; + ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that fsSupplier = getFsSupplier(split.getPath(), jobConf); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 6d3443a..9d00508 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.LlapHiveUtils; +import org.apache.hadoop.hive.llap.SchemaAwareCacheKey; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl; @@ -220,12 +221,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } fs = split.getPath().getFileSystem(daemonConf); - fileKey = determineFileId(fs, split, + PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts); + fileKey = determineCacheKey(fs, split, partitionDesc, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) - ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null; + ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null; this.sourceInputFormat = sourceInputFormat; this.sourceSerDe = sourceSerDe; this.reporter = reporter; @@ -1720,13 +1722,14 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> return true; } - private static Object determineFileId(FileSystem fs, FileSplit split, + private static Object determineCacheKey(FileSystem fs, FileSplit split, PartitionDesc partitionDesc, boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException { /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split. Object fileKey = ((OrcSplit)split).getFileKey(); if (fileKey != null) return fileKey; */ LlapIoImpl.LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); - return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); + Object fileId = HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic); + return SchemaAwareCacheKey.buildCacheKey(fileId, LlapHiveUtils.getSchemaHash(partitionDesc)); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java index e998fa1..2779969 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java @@ -37,6 +37,8 @@ import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMN_TYPES; + /** * Covers utility functions that are used by LLAP code and depend on Hive constructs e.g. ql code. */ @@ -48,18 +50,29 @@ public final class LlapHiveUtils { // Not to be used; } - public static CacheTag getDbAndTableNameForMetrics(Path path, boolean includeParts, - Map<Path, PartitionDesc> parts) { - - assert(parts != null); + /** + * Takes a Path and looks up the PartitionDesc instance associated with it in a map of Path->PartitionDesc entries. + * If it is not found (e.g. Path denotes a partition path, but map contains table level instances only) we will try + * to do the same with the parent of this path, traversing up until there's a match, if any. + * @param path the absolute path used for the look up + * @param partitionDescMap the map + * @return the PartitionDesc instance if found, null if not found + */ + public static PartitionDesc partitionDescForPath(Path path, Map<Path, PartitionDesc> partitionDescMap) { + assert(partitionDescMap != null); // Look for PartitionDesc instance matching our Path Path parentPath = path; - PartitionDesc part = parts.get(parentPath); + PartitionDesc part = partitionDescMap.get(parentPath); while (!parentPath.isRoot() && part == null) { parentPath = parentPath.getParent(); - part = parts.get(parentPath); + part = partitionDescMap.get(parentPath); } + return part; + } + + public static CacheTag getDbAndTableNameForMetrics(Path path, boolean includeParts, + PartitionDesc part) { // Fallback to legacy cache tag creation logic. if (part == null) { @@ -73,6 +86,19 @@ public final class LlapHiveUtils { } } + public static int getSchemaHash(PartitionDesc part) { + if (part == null) { + return SchemaAwareCacheKey.NO_SCHEMA_HASH; + } else { + Object columnTypes = part.getProperties().get(META_TABLE_COLUMN_TYPES); + if (columnTypes != null) { + return columnTypes.toString().hashCode(); + } else { + return SchemaAwareCacheKey.NO_SCHEMA_HASH; + } + } + } + /** * Returns MapWork based what is serialized in the JobConf instance provided. * @param job diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SchemaAwareCacheKey.java b/ql/src/java/org/apache/hadoop/hive/llap/SchemaAwareCacheKey.java new file mode 100644 index 0000000..7078ee5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/SchemaAwareCacheKey.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.util.Objects; + +/** + * Cache key that stores schema hash. Useful caching serdes that do not have a self-describing schema built in. + */ +public class SchemaAwareCacheKey { + + public static final int NO_SCHEMA_HASH = -1; + + private final Object fileId; + private final int schemaHash; + + private SchemaAwareCacheKey(Object fileId, int schemaHash) { + this.fileId = fileId; + this.schemaHash = schemaHash; + } + + public static Object buildCacheKey(Object fileId, int schemaHash) { + if (schemaHash == NO_SCHEMA_HASH) { + return fileId; + } else { + return new SchemaAwareCacheKey(fileId, schemaHash); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SchemaAwareCacheKey that = (SchemaAwareCacheKey) o; + return schemaHash == that.schemaHash && Objects.equals(fileId, that.fileId); + } + + @Override + public int hashCode() { + return Objects.hash(fileId, schemaHash); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{fileId=").append(fileId).append(", schemaHash=").append(schemaHash).append('}'); + return sb.toString(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index a3bbb7b..5cd25d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -215,7 +215,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase } if (cacheKey != null) { if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) { - cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true, parts); + PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts); + cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true, partitionDesc); } // If we are going to use cache, change the path to depend on file ID for extra consistency. FileSystem fs = file.getFileSystem(configuration); diff --git a/ql/src/test/queries/clientpositive/csv_llap.q b/ql/src/test/queries/clientpositive/csv_llap.q index c262c92..c268911 100644 --- a/ql/src/test/queries/clientpositive/csv_llap.q +++ b/ql/src/test/queries/clientpositive/csv_llap.q @@ -1,17 +1,32 @@ +--SETUP---------------------------------------------------------------------------------------------------------------- CREATE EXTERNAL TABLE csv_llap_test (ts int, id string, b1 boolean, b2 boolean, b3 boolean, b4 boolean) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT - 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' +LOCATION '../../data/files/csv'; + +CREATE EXTERNAL TABLE csv_llap_test_differentschema (ts int, id string, b1 boolean) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.OpenCSVSerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.mapred.TextInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' +LOCATION '../../data/files/csv'; -LOAD DATA LOCAL INPATH '../../data/files/small_csv.csv' INTO TABLE csv_llap_test; ---location '../../data/files/small_csv.csv'; +--TEST that cache lookup results in a hit, even for OpenCSVSerde------------------------------------------------------- SELECT MIN(ts) FROM csv_llap_test; set hive.llap.io.cache.only=true; --an exception would be thrown from here on for cache miss SELECT MIN(ts) FROM csv_llap_test; + +set hive.llap.io.cache.only=false; + +--TEST that another table with different schema defined on the same text files works without cache problems------------ +SELECT MIN(ts) FROM csv_llap_test_differentschema; diff --git a/ql/src/test/results/clientpositive/llap/csv_llap.q.out b/ql/src/test/results/clientpositive/llap/csv_llap.q.out index 1bf6d09..65e35ec 100644 --- a/ql/src/test/results/clientpositive/llap/csv_llap.q.out +++ b/ql/src/test/results/clientpositive/llap/csv_llap.q.out @@ -5,7 +5,9 @@ STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' +#### A masked pattern was here #### PREHOOK: type: CREATETABLE +#### A masked pattern was here #### PREHOOK: Output: database:default PREHOOK: Output: default@csv_llap_test POSTHOOK: query: CREATE EXTERNAL TABLE csv_llap_test (ts int, id string, b1 boolean, b2 boolean, b3 boolean, b4 boolean) @@ -15,17 +17,35 @@ STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' +#### A masked pattern was here #### POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### POSTHOOK: Output: database:default POSTHOOK: Output: default@csv_llap_test -PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/small_csv.csv' INTO TABLE csv_llap_test -PREHOOK: type: LOAD +PREHOOK: query: CREATE EXTERNAL TABLE csv_llap_test_differentschema (ts int, id string, b1 boolean) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.OpenCSVSerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.mapred.TextInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' #### A masked pattern was here #### -PREHOOK: Output: default@csv_llap_test -POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/small_csv.csv' INTO TABLE csv_llap_test -POSTHOOK: type: LOAD +PREHOOK: type: CREATETABLE #### A masked pattern was here #### -POSTHOOK: Output: default@csv_llap_test +PREHOOK: Output: database:default +PREHOOK: Output: default@csv_llap_test_differentschema +POSTHOOK: query: CREATE EXTERNAL TABLE csv_llap_test_differentschema (ts int, id string, b1 boolean) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.OpenCSVSerde' +STORED AS INPUTFORMAT + 'org.apache.hadoop.mapred.TextInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@csv_llap_test_differentschema PREHOOK: query: SELECT MIN(ts) FROM csv_llap_test PREHOOK: type: QUERY PREHOOK: Input: default@csv_llap_test @@ -44,3 +64,12 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@csv_llap_test #### A masked pattern was here #### 00117 +PREHOOK: query: SELECT MIN(ts) FROM csv_llap_test_differentschema +PREHOOK: type: QUERY +PREHOOK: Input: default@csv_llap_test_differentschema +#### A masked pattern was here #### +POSTHOOK: query: SELECT MIN(ts) FROM csv_llap_test_differentschema +POSTHOOK: type: QUERY +POSTHOOK: Input: default@csv_llap_test_differentschema +#### A masked pattern was here #### +00117