Repository: hive Updated Branches: refs/heads/master 66c522676 -> 3acaa2a76
HIVE-16722 Converting bucketed non-acid table to acid should perform validation (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3acaa2a7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3acaa2a7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3acaa2a7 Branch: refs/heads/master Commit: 3acaa2a76c66fdfd4e75197f74c7aa17788fb592 Parents: 66c5226 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Wed Oct 25 15:36:33 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Wed Oct 25 15:36:54 2017 -0700 ---------------------------------------------------------------------- .../TransactionalValidationListener.java | 61 +++++++++++++++++++- .../hive/ql/io/orc/OrcRawRecordMerger.java | 10 ++-- 2 files changed, 66 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3acaa2a7/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 29d8da8..49c8cbb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -17,11 +17,17 @@ */ package org.apache.hadoop.hive.metastore; +import java.io.IOException; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -125,9 +131,10 @@ public final class TransactionalValidationListener extends MetaStorePreEventList } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { - throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() + + throw new MetaException(getTableName(newTable) + " cannot be declared transactional because it's an external table"); } + validateTableStructure(context.getHandler(), newTable); hasValidTransactionalValue = true; } @@ -297,4 +304,56 @@ public final class TransactionalValidationListener extends MetaStorePreEventList } return null; // All checks passed, return null. } + private final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); + /** + * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD + */ + private static final Pattern ORIGINAL_PATTERN_COPY = + Pattern.compile("[0-9]+_[0-9]+" + "_copy_" + "[0-9]+"); + + /** + * It's assumed everywhere that original data files are named according to + * {@link #ORIGINAL_PATTERN} or{@link #ORIGINAL_PATTERN_COPY} + * This checks that when transaction=true is set and throws if it finds any files that don't + * follow convention. + */ + private void validateTableStructure(HiveMetaStore.HMSHandler hmsHandler, Table table) + throws MetaException { + Path tablePath; + try { + Warehouse wh = hmsHandler.getWh(); + if (table.getSd().getLocation() == null || table.getSd().getLocation().isEmpty()) { + tablePath = wh.getDefaultTablePath(hmsHandler.getMS().getDatabase(table.getDbName()), + table.getTableName()); + } else { + tablePath = wh.getDnsPath(new Path(table.getSd().getLocation())); + } + FileSystem fs = wh.getFs(tablePath); + //FileSystem fs = FileSystem.get(getConf()); + RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(tablePath, true); + while (iterator.hasNext()) { + LocatedFileStatus fileStatus = iterator.next(); + if (!fileStatus.isFile()) { + continue; + } + boolean validFile = + (ORIGINAL_PATTERN.matcher(fileStatus.getPath().getName()).matches() || + ORIGINAL_PATTERN_COPY.matcher(fileStatus.getPath().getName()).matches() + ); + if (!validFile) { + throw new IllegalStateException("Unexpected data file name format. Cannot convert " + + getTableName(table) + " to transactional table. File: " + fileStatus.getPath()); + } + } + } catch (IOException|NoSuchObjectException e) { + String msg = "Unable to list files for " + getTableName(table); + LOG.error(msg, e); + MetaException e1 = new MetaException(msg); + e1.initCause(e); + throw e1; + } + } + private static String getTableName(Table table) { + return table.getDbName() + "." + table.getTableName(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3acaa2a7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index cbbb4c4..eed6d22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -24,6 +24,7 @@ import java.util.TreeMap; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.BucketCodec; @@ -31,7 +32,6 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -296,9 +296,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * of these files as part of a single logical bucket file. * * Also, for unbucketed (non acid) tables, there are no guarantees where data files may be placed. - * For example, CTAS+Tez+Union creates subdirs 1/, 2/, etc for each leg of the Union. Thus the - * data file need not be an immediate child of partition dir. All files for a given writerId are - * treated as one logical unit to assign {@link RecordIdentifier}s to them consistently. + * For example, CTAS+Tez+Union creates subdirs + * {@link AbstractFileMergeOperator#UNION_SUDBIR_PREFIX}_1/, + * {@link AbstractFileMergeOperator#UNION_SUDBIR_PREFIX}_2/, etc for each leg of the Union. Thus + * the data file need not be an immediate child of partition dir. All files for a given writerId + * are treated as one logical unit to assign {@link RecordIdentifier}s to them consistently. * * For Compaction, where each split includes the whole bucket, this means reading over all the * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.