HIVE-16195 : MM tables: mm_conversions test is broken - part 2 (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8e6719df Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8e6719df Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8e6719df Branch: refs/heads/hive-14535 Commit: 8e6719df622d03ba337f29cecaa2eac6095eb433 Parents: becf80c Author: Sergey Shelukhin <[email protected]> Authored: Wed Mar 15 16:40:27 2017 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Mar 15 16:40:27 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 32 ++++++++++----- .../apache/hadoop/hive/ql/exec/MoveTask.java | 37 ++++++++++++++---- .../hadoop/hive/ql/plan/LoadMultiFilesDesc.java | 20 ++++++---- .../queries/clientnegative/mm_bucket_convert.q | 18 +++++++++ .../queries/clientpositive/mm_conversions.q | 2 - .../clientnegative/mm_bucket_convert.q.out | 41 ++++++++++++++++++++ 6 files changed, 123 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8e6719df/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index bb9bac7..39f2c53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3994,17 +3994,27 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } private List<Task<?>> generateRemoveMmTasks(Table tbl) throws HiveException { - // To avoid confusion from nested MM directories when table is converted back and forth, - // we will do the following - we will rename mm_ dirs to remove the prefix; we will also - // delete any directories that are not committed. Note that this relies on locks. - // Note also that we only do the renames AFTER the metastore operation commits. - // Deleting uncommitted things is safe, but moving stuff before we convert is data loss. + // To avoid confusion from nested MM directories when table is converted back and forth, we + // want to rename mm_ dirs to remove the prefix; however, given the unpredictable nested + // directory handling in Hive/MR, we will instead move all the files into the root directory. + // We will also delete any directories that are not committed. + // Note that this relies on locks. Note also that we only do the renames AFTER the metastore + // operation commits. Deleting uncommitted things is safe, but moving stuff before we convert + // could cause data loss. List<Path> allMmDirs = new ArrayList<>(); if (tbl.isStoredAsSubDirectories()) { - // TODO: support this? + // TODO: support this? we only bail because it's a PITA and hardly anyone seems to care. throw new HiveException("Converting list bucketed tables stored as subdirectories " + " to and from MM is not supported"); } + List<String> bucketCols = tbl.getBucketCols(); + if (bucketCols != null && !bucketCols.isEmpty() + && HiveConf.getBoolVar(conf, ConfVars.HIVE_STRICT_CHECKS_BUCKETING)) { + throw new HiveException("Converting bucketed tables from MM is not supported by default; " + + "copying files from multiple MM directories may potentially break the buckets. You " + + "can set " + ConfVars.HIVE_STRICT_CHECKS_BUCKETING.varname + + " to false for this query if you want to force the conversion."); + } Hive db = getHive(); ValidWriteIds ids = db.getValidWriteIdsForTable(tbl.getDbName(), tbl.getTableName()); if (tbl.getPartitionKeys().size() > 0) { @@ -4021,16 +4031,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable { handleRemoveMm(tbl.getDataLocation(), ids, allMmDirs); } List<Path> targetPaths = new ArrayList<>(allMmDirs.size()); + List<String> targetPrefix = new ArrayList<>(allMmDirs.size()); int prefixLen = ValidWriteIds.MM_PREFIX.length(); for (int i = 0; i < allMmDirs.size(); ++i) { Path src = allMmDirs.get(i); - Path tgt = new Path(src.getParent(), src.getName().substring(prefixLen + 1)); - Utilities.LOG14535.info("Will move " + src + " to " + tgt); + Path tgt = src.getParent(); + String prefix = src.getName().substring(prefixLen + 1) + "_"; + Utilities.LOG14535.info("Will move " + src + " to " + tgt + " (prefix " + prefix + ")"); targetPaths.add(tgt); + targetPrefix.add(prefix); } // Don't set inputs and outputs - the locks have already been taken so it's pointless. MoveWork mw = new MoveWork(null, null, null, null, false); - mw.setMultiFilesDesc(new LoadMultiFilesDesc(allMmDirs, targetPaths, true, null, null)); + mw.setMultiFilesDesc(new LoadMultiFilesDesc( + allMmDirs, targetPaths, targetPrefix, true, null, null)); return Lists.<Task<?>>newArrayList(TaskFactory.get(mw, conf)); } http://git-wip-us.apache.org/repos/asf/hive/blob/8e6719df/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index fb5e39e..29b72a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -287,17 +287,38 @@ public class MoveTask extends Task<MoveWork> implements Serializable { LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); if (lmfd != null) { boolean isDfsDir = lmfd.getIsDfsDir(); - int i = 0; - while (i <lmfd.getSourceDirs().size()) { + List<String> targetPrefixes = lmfd.getTargetPrefixes(); + for (int i = 0; i <lmfd.getSourceDirs().size(); ++i) { Path srcPath = lmfd.getSourceDirs().get(i); Path destPath = lmfd.getTargetDirs().get(i); - FileSystem fs = destPath.getFileSystem(conf); - if (!fs.exists(destPath.getParent())) { - fs.mkdirs(destPath.getParent()); + String filePrefix = targetPrefixes == null ? null : targetPrefixes.get(i); + FileSystem destFs = destPath.getFileSystem(conf); + if (filePrefix == null) { + if (!destFs.exists(destPath.getParent())) { + destFs.mkdirs(destPath.getParent()); + } + Utilities.LOG14535.info("MoveTask moving LMFD " + srcPath + " to " + destPath); + moveFile(srcPath, destPath, isDfsDir); + } else { + if (!destFs.exists(destPath)) { + destFs.mkdirs(destPath); + } + FileSystem srcFs = srcPath.getFileSystem(conf); + FileStatus[] children = srcFs.listStatus(srcPath); + if (children != null) { + for (FileStatus child : children) { + Path childSrc = child.getPath(); + Path childDest = new Path(destPath, filePrefix + childSrc.getName()); + Utilities.LOG14535.info("MoveTask moving LMFD " + childSrc + " to " + childDest); + moveFile(childSrc, childDest, isDfsDir); + } + } else { + Utilities.LOG14535.info("MoveTask skipping empty directory LMFD " + srcPath); + } + if (!srcFs.delete(srcPath, false)) { + throw new IOException("Couldn't delete " + srcPath + " after moving all the files"); + } } - Utilities.LOG14535.info("MoveTask moving LMFD " + srcPath + " to " + destPath); - moveFile(srcPath, destPath, isDfsDir); - i++; } } http://git-wip-us.apache.org/repos/asf/hive/blob/8e6719df/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java index 2b01712..9d5c6b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java @@ -36,15 +36,23 @@ public class LoadMultiFilesDesc implements Serializable { private String columns; private String columnTypes; private transient List<Path> srcDirs; + private transient List<String> targetPrefixes; public LoadMultiFilesDesc() { } public LoadMultiFilesDesc(final List<Path> sourceDirs, final List<Path> targetDir, final boolean isDfsDir, final String columns, final String columnTypes) { + this(sourceDirs, targetDir, null, isDfsDir, columns, columnTypes); + } + + public LoadMultiFilesDesc(final List<Path> sourceDirs, final List<Path> targetDir, + List<String> targetPrefixes, final boolean isDfsDir, final String columns, + final String columnTypes) { this.srcDirs = sourceDirs; this.targetDirs = targetDir; + this.targetPrefixes = targetPrefixes; this.isDfsDir = isDfsDir; this.columns = columns; this.columnTypes = columnTypes; @@ -60,14 +68,6 @@ public class LoadMultiFilesDesc implements Serializable { return srcDirs; } - public void setSourceDirs(List<Path> srcs) { - this.srcDirs = srcs; - } - - public void setTargetDirs(final List<Path> targetDir) { - this.targetDirs = targetDir; - } - @Explain(displayName = "hdfs directory") public boolean getIsDfsDir() { return isDfsDir; @@ -106,4 +106,8 @@ public class LoadMultiFilesDesc implements Serializable { public void setColumnTypes(String columnTypes) { this.columnTypes = columnTypes; } + + public List<String> getTargetPrefixes() { + return targetPrefixes; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8e6719df/ql/src/test/queries/clientnegative/mm_bucket_convert.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/mm_bucket_convert.q b/ql/src/test/queries/clientnegative/mm_bucket_convert.q new file mode 100644 index 0000000..2ded047 --- /dev/null +++ b/ql/src/test/queries/clientnegative/mm_bucket_convert.q @@ -0,0 +1,18 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.fetch.task.conversion=none; +set hive.exec.dynamic.partition.mode=nonstrict; + +drop table bucket0_mm; +drop table bucket1_mm; +create table bucket0_mm(key int, id int) clustered by (key) into 2 buckets + tblproperties("transactional"="true", "transactional_properties"="insert_only"); +create table bucket1_mm(key int, id int) clustered by (key) into 2 buckets + tblproperties("transactional"="true", "transactional_properties"="insert_only"); + +set hive.strict.checks.bucketing=false; +alter table bucket0_mm unset tblproperties('transactional_properties', 'transactional'); +set hive.strict.checks.bucketing=true; +alter table bucket1_mm unset tblproperties('transactional_properties', 'transactional'); + + http://git-wip-us.apache.org/repos/asf/hive/blob/8e6719df/ql/src/test/queries/clientpositive/mm_conversions.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_conversions.q b/ql/src/test/queries/clientpositive/mm_conversions.q index 69a3315..2dc7a74 100644 --- a/ql/src/test/queries/clientpositive/mm_conversions.q +++ b/ql/src/test/queries/clientpositive/mm_conversions.q @@ -5,8 +5,6 @@ set tez.grouping.min-size=1; set tez.grouping.max-size=2; set hive.exec.dynamic.partition.mode=nonstrict; --- Temporary setting -set mapred.input.dir.recursive=true; -- Force multiple writers when reading drop table intermediate; http://git-wip-us.apache.org/repos/asf/hive/blob/8e6719df/ql/src/test/results/clientnegative/mm_bucket_convert.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/mm_bucket_convert.q.out b/ql/src/test/results/clientnegative/mm_bucket_convert.q.out new file mode 100644 index 0000000..b732d3e --- /dev/null +++ b/ql/src/test/results/clientnegative/mm_bucket_convert.q.out @@ -0,0 +1,41 @@ +PREHOOK: query: drop table bucket0_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table bucket0_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table bucket1_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table bucket1_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table bucket0_mm(key int, id int) clustered by (key) into 2 buckets + tblproperties("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket0_mm +POSTHOOK: query: create table bucket0_mm(key int, id int) clustered by (key) into 2 buckets + tblproperties("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket0_mm +PREHOOK: query: create table bucket1_mm(key int, id int) clustered by (key) into 2 buckets + tblproperties("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket1_mm +POSTHOOK: query: create table bucket1_mm(key int, id int) clustered by (key) into 2 buckets + tblproperties("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket1_mm +PREHOOK: query: alter table bucket0_mm unset tblproperties('transactional_properties', 'transactional') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@bucket0_mm +PREHOOK: Output: default@bucket0_mm +POSTHOOK: query: alter table bucket0_mm unset tblproperties('transactional_properties', 'transactional') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@bucket0_mm +POSTHOOK: Output: default@bucket0_mm +PREHOOK: query: alter table bucket1_mm unset tblproperties('transactional_properties', 'transactional') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@bucket1_mm +PREHOOK: Output: default@bucket1_mm +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Converting bucketed tables from MM is not supported by default; copying files from multiple MM directories may potentially break the buckets. You can set hive.strict.checks.bucketing to false for this query if you want to force the conversion.
