Repository: hive Updated Branches: refs/heads/master 90149de71 -> addeab8d0
HIVE-17608: REPL LOAD should overwrite the data files if exists instead of duplicating it (Sankar Hariappan, reviewed by Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/addeab8d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/addeab8d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/addeab8d Branch: refs/heads/master Commit: addeab8d0810c62fc2246166e91d10c347abe58c Parents: 90149de Author: sankarh <sank...@apache.org> Authored: Wed Oct 4 23:50:36 2017 +0530 Committer: sankarh <sank...@apache.org> Committed: Wed Oct 4 23:50:36 2017 +0530 ---------------------------------------------------------------------- .../hadoop/hive/ql/history/TestHiveHistory.java | 3 +- .../hive/ql/parse/TestReplicationScenarios.java | 63 +++++++++++++++++- .../apache/hadoop/hive/ql/exec/MoveTask.java | 9 +-- .../hadoop/hive/ql/exec/ReplCopyTask.java | 18 ++++- .../apache/hadoop/hive/ql/exec/StatsTask.java | 9 ++- .../bootstrap/load/table/LoadPartitions.java | 3 +- .../repl/bootstrap/load/table/LoadTable.java | 4 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 69 ++++++++++++-------- .../hive/ql/parse/ImportSemanticAnalyzer.java | 6 +- .../hive/ql/parse/LoadSemanticAnalyzer.java | 4 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 17 +++-- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 6 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 43 +++++++----- .../hadoop/hive/ql/exec/TestExecDriver.java | 4 +- .../hive/ql/metadata/TestHiveCopyFiles.java | 12 ++-- 15 files changed, 192 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index bec715d..8a6fe3d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.tools.LineageInfo; import org.apache.hadoop.mapred.TextInputFormat; @@ -104,7 +105,7 @@ public class TestHiveHistory extends TestCase { db.dropTable(Warehouse.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, false, false, false, false); + db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, false, false, false, false); i++; } http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 7cf1498..276c464 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -1406,7 +1406,7 @@ public class TestReplicationScenarios { // Skip all the events belong to other DBs/tables. if (event.getDbName().equalsIgnoreCase(dbName)) { - if (event.getEventType() == "INSERT") { + if (event.getEventType().equalsIgnoreCase("INSERT")) { // If an insert event is found, then return null hence no event is dumped. LOG.error("Encountered INSERT event when it was not expected to"); return null; @@ -1425,7 +1425,66 @@ public class TestReplicationScenarios { eventTypeValidator.assertInjectionsPerformed(true,false); InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour - verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1)", ptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1)", ptn_data, driverMirror); + } + + @Test + public void testIdempotentMoveTaskForInsertFiles() throws IOException { + String name = testName.getMethodName(); + final String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + Tuple bootstrap = bootstrapLoadAndVerify(dbName, replDbName); + + String[] unptn_data = new String[]{ "ten"}; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); + + // Inject a behaviour where it repeats the INSERT event twice with different event IDs + BehaviourInjection<NotificationEventResponse,NotificationEventResponse> insertEventRepeater + = new BehaviourInjection<NotificationEventResponse,NotificationEventResponse>(){ + + @Nullable + @Override + public NotificationEventResponse apply(@Nullable NotificationEventResponse eventsList) { + if (null != eventsList) { + List<NotificationEvent> events = eventsList.getEvents(); + List<NotificationEvent> outEvents = new ArrayList<>(); + long insertEventId = -1; + + for (int i = 0; i < events.size(); i++) { + NotificationEvent event = events.get(i); + + // Skip all the events belong to other DBs/tables. + if (event.getDbName().equalsIgnoreCase(dbName)) { + if (event.getEventType().equalsIgnoreCase("INSERT")) { + // Add insert event twice with different event ID to allow apply of both events. + NotificationEvent newEvent = new NotificationEvent(event); + outEvents.add(newEvent); + insertEventId = newEvent.getEventId(); + } + } + + NotificationEvent newEvent = new NotificationEvent(event); + if (insertEventId != -1) { + insertEventId++; + newEvent.setEventId(insertEventId); + } + outEvents.add(newEvent); + } + eventsList.setEvents(outEvents); + injectionPathCalled = true; + } + return eventsList; + } + }; + InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(insertEventRepeater); + + incrementalLoadAndVerify(dbName, bootstrap.lastReplId, replDbName); + + insertEventRepeater.assertInjectionsPerformed(true,false); + InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour + + verifyRun("SELECT a from " + replDbName + ".unptned", unptn_data, driverMirror); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index c8ad795..0dadd51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -376,7 +377,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { DataContainer dc = null; if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); - db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getReplace(), + db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask()); @@ -452,7 +453,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), - tbd.getReplace(), + tbd.getLoadFileType(), dpCtx.getNumDPCols(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, @@ -520,7 +521,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { tbd.getPartitionSpec()); db.validatePartitionNameCharacters(partVals); db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), - tbd.getPartitionSpec(), tbd.getReplace(), + tbd.getPartitionSpec(), tbd.getLoadFileType(), tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask()); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); @@ -592,7 +593,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { * has done it's job before the query ran. */ WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation operation) { - if(tbd.getReplace()) { + if (tbd.getLoadFileType() == LoadFileType.REPLACE_ALL) { return WriteEntity.WriteType.INSERT_OVERWRITE; } switch (operation) { http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 6e722f7..39e5bf1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -130,7 +130,23 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(toPath, srcFiles); + new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(dstFs, toPath, srcFiles); + + // If a file is copied from CM path, then need to rename them using original source file name + // This is needed to avoid having duplicate files in target if same event is applied twice + // where the first event refers to source path and second event refers to CM path + for (ReplChangeManager.FileInfo srcFile : srcFiles) { + if (srcFile.isUseSourcePath()) { + continue; + } + String destFileName = srcFile.getCmPath().getName(); + Path destFile = new Path(toPath, destFileName); + if (dstFs.exists(destFile)) { + String destFileWithSourceName = srcFile.getSourcePath().getName(); + Path newDestFile = new Path(toPath, destFileWithSourceName); + dstFs.rename(destFile, newDestFile); + } + } return 0; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 4db6806..bdf3710 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.stats.StatsAggregator; @@ -182,7 +183,8 @@ public class StatsTask extends Task<StatsWork> implements Serializable { if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); } else if (work.getTableSpecs() != null - || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace()) + || (work.getLoadTableDesc() != null + && (work.getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL)) || (work.getLoadFileDesc() != null && !work.getLoadFileDesc() .getDestinationCreateTable().isEmpty())) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); @@ -283,7 +285,8 @@ public class StatsTask extends Task<StatsWork> implements Serializable { if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); } else if (work.getTableSpecs() != null - || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace()) + || (work.getLoadTableDesc() != null + && (work.getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL)) || (work.getLoadFileDesc() != null && !work.getLoadFileDesc() .getDestinationCreateTable().isEmpty())) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); @@ -409,7 +412,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { long longValue = Long.parseLong(value); if (work.getLoadTableDesc() != null && - !work.getLoadTableDesc().getReplace()) { + (work.getLoadTableDesc().getLoadFileType() != LoadFileType.REPLACE_ALL)) { String originalValue = parameters.get(statType); if (originalValue != null) { longValue += Long.parseLong(originalValue); // todo: invalid + valid = invalid http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 5c6ef9f..821d7df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.datanucleus.util.StringUtils; import org.slf4j.Logger; @@ -238,7 +239,7 @@ public class LoadPartitions { Path tmpPath) { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace()); + event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING); loadTableWork.setInheritTableSpecs(false); MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, context.sessionStateLineageState); http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index a9a9162..25a2532 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,7 +225,8 @@ public class LoadTable { ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace()); + tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), + replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, context.sessionStateLineageState); http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index b0e68b1..26003f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -144,6 +144,7 @@ import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrun import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.Deserializer; @@ -1623,7 +1624,7 @@ public class Hive { * @param loadPath * @param tableName * @param partSpec - * @param replace + * @param loadFileType * @param inheritTableSpecs * @param isSkewedStoreAsSubdir * @param isSrcLocal @@ -1633,11 +1634,11 @@ public class Hive { * @throws HiveException */ public void loadPartition(Path loadPath, String tableName, - Map<String, String> partSpec, boolean replace, + Map<String, String> partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { Table tbl = getTable(tableName); - loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs, + loadPartition(loadPath, tbl, partSpec, loadFileType, inheritTableSpecs, isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask); } @@ -1653,9 +1654,9 @@ public class Hive { * name of table to be loaded. * @param partSpec * defines which partition needs to be loaded - * @param replace - * if true - replace files in the partition, otherwise add files to - * the partition + * @param loadFileType + * if REPLACE_ALL - replace files in the table, + * otherwise add files to table (KEEP_EXISTING, OVERWRITE_EXISTING) * @param inheritTableSpecs if true, on [re]creating the partition, take the * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal @@ -1667,7 +1668,7 @@ public class Hive { * @return Partition object being loaded with data */ public Partition loadPartition(Path loadPath, Table tbl, - Map<String, String> partSpec, boolean replace, + Map<String, String> partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { @@ -1719,13 +1720,14 @@ public class Hive { newFiles = Collections.synchronizedList(new ArrayList<Path>()); } - if (replace || (oldPart == null && !isAcid)) { + if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) { boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal, isAutoPurge, newFiles); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, + (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles); } perfLogger.PerfLogEnd("MoveTask", "FileMoves"); Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); @@ -1735,7 +1737,7 @@ public class Hive { // Generate an insert event only if inserting into an existing partition // When inserting into a new partition, the add partition event takes care of insert event if ((null != oldPart) && (null != newFiles)) { - fireInsertEvent(tbl, partSpec, replace, newFiles); + fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); } else { LOG.debug("No new files were created, and is not a replace, or we're inserting into a " + "partition that does not exist yet. Skipping generating INSERT event."); @@ -1950,7 +1952,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param loadPath * @param tableName * @param partSpec - * @param replace + * @param loadFileType * @param numDP number of dynamic partitions * @param listBucketingEnabled * @param isAcid true if this is an ACID operation @@ -1959,7 +1961,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @throws HiveException */ public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path loadPath, - final String tableName, final Map<String, String> partSpec, final boolean replace, + final String tableName, final Map<String, String> partSpec, final LoadFileType loadFileType, final int numDP, final boolean listBucketingEnabled, final boolean isAcid, final long txnId, final boolean hasFollowingStatsTask, final AcidUtils.Operation operation) throws HiveException { @@ -2005,7 +2007,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // load the partition Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, - replace, true, listBucketingEnabled, + loadFileType, true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); partitionsMap.put(fullPartSpec, newPartition); @@ -2029,7 +2031,7 @@ private void constructOneLBLocationMap(FileStatus fSta, + " partPath=" + partPath + ", " + " table=" + tbl.getTableName() + ", " + " partSpec=" + fullPartSpec + ", " - + " replace=" + replace + ", " + + " loadFileType=" + loadFileType.toString() + ", " + " listBucketingEnabled=" + listBucketingEnabled + ", " + " isAcid=" + isAcid + ", " + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); @@ -2086,8 +2088,9 @@ private void constructOneLBLocationMap(FileStatus fSta, * Directory containing files to load into Table * @param tableName * name of table to be loaded. - * @param replace - * if true - replace files in the table, otherwise add files to table + * @param loadFileType + * if REPLACE_ALL - replace files in the table, + * otherwise add files to table (KEEP_EXISTING, OVERWRITE_EXISTING) * @param isSrcLocal * If the source directory is LOCAL * @param isSkewedStoreAsSubdir @@ -2096,8 +2099,8 @@ private void constructOneLBLocationMap(FileStatus fSta, * if there is any following stats task * @param isAcid true if this is an ACID based write */ - public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal, - boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask) + public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, + boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { List<Path> newFiles = null; @@ -2106,7 +2109,7 @@ private void constructOneLBLocationMap(FileStatus fSta, if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList<Path>()); } - if (replace) { + if (loadFileType == LoadFileType.REPLACE_ALL) { Path tableDest = tbl.getPath(); boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge, newFiles); @@ -2114,7 +2117,8 @@ private void constructOneLBLocationMap(FileStatus fSta, FileSystem fs; try { fs = tbl.getDataLocation().getFileSystem(sessionConf); - copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles); + copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, + (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } @@ -2151,7 +2155,7 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } - fireInsertEvent(tbl, null, replace, newFiles); + fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); } /** @@ -2972,8 +2976,8 @@ private void constructOneLBLocationMap(FileStatus fSta, } private static void copyFiles(final HiveConf conf, final FileSystem destFs, - FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, final List<Path> newFiles) - throws HiveException { + FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, + boolean isOverwrite, final List<Path> newFiles) throws HiveException { final HdfsUtils.HadoopFileStatus fullDestStatus; try { @@ -3016,7 +3020,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // copy from source to destination, we will inherit the destination's parent group ownership. if (null == pool) { try { - Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed); + Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, isRenameAllowed); if (null != newFiles) { newFiles.add(destPath); @@ -3032,7 +3036,7 @@ private void constructOneLBLocationMap(FileStatus fSta, try { Path destPath = - mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed); + mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, isRenameAllowed); if (null != newFiles) { newFiles.add(destPath); @@ -3121,6 +3125,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param destFs the {@link FileSystem} to move the file to * @param destDirPath the {@link Path} to move the file to * @param isSrcLocal if the source file is on the local filesystem + * @param isOverwrite if true, then overwrite destination file if exist else make a duplicate copy * @param isRenameAllowed true if the data should be renamed and not copied, false otherwise * * @return the {@link Path} the source file was moved to @@ -3128,7 +3133,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @throws IOException if there was an issue moving the file */ private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, Path destDirPath, - boolean isSrcLocal, boolean isRenameAllowed) throws IOException { + boolean isSrcLocal, boolean isOverwrite, boolean isRenameAllowed) throws IOException { // Strip off the file type, if any so we don't make: // 000000_0.gz -> 000000_0.gz_copy_1 @@ -3147,6 +3152,10 @@ private void constructOneLBLocationMap(FileStatus fSta, * I'll leave the below loop for now until a better approach is found. */ for (int counter = 1; destFs.exists(destFilePath); counter++) { + if (isOverwrite) { + destFs.delete(destFilePath, false); + break; + } destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : "")); } @@ -3429,12 +3438,14 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param fs Filesystem * @param isSrcLocal true if source is on local file system * @param isAcid true if this is an ACID based write + * @param isOverwrite if true, then overwrite if destination file exist, else add a duplicate copy * @param newFiles if this is non-null, a list of files that were created as a result of this * move will be returned. * @throws HiveException */ - static protected void copyFiles(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) throws HiveException { + static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, + boolean isSrcLocal, boolean isAcid, + boolean isOverwrite, List<Path> newFiles) throws HiveException { try { // create the destination if it does not exist if (!fs.exists(destf)) { @@ -3466,7 +3477,7 @@ private void constructOneLBLocationMap(FileStatus fSta, if (isAcid) { moveAcidFiles(srcFs, srcs, destf, newFiles); } else { - copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); + copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite, newFiles); } } http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 751bda0..51b4f36 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; @@ -350,7 +351,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), - replace); + replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING); Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf()); @@ -420,7 +421,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { x.getOutputs(), addPartitionDesc), x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, Utilities.getTableDesc(table), - partSpec.getPartSpec(), replicationSpec.isReplace()); + partSpec.getPartSpec(), + replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING); loadTableWork.setInheritTableSpecs(false); Task<?> loadPartTask = TaskFactory.get(new MoveWork( x.getInputs(), x.getOutputs(), loadTableWork, null, false, http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 8879b80..033235b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -275,7 +276,8 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc loadTableWork; loadTableWork = new LoadTableDesc(new Path(fromURI), - Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite); + Utilities.getTableDesc(ts.tableHandle), partSpec, + isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING); if (preservePartitionSpecs){ // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but // but preservePartitionSpecs=false(default) here is not sufficient enough http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 9c6c556..c8277f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -190,6 +190,7 @@ import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; @@ -6917,8 +6918,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { currentTransactionId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up - ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsAcid); + LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), + dest_tab.getTableName()) && !destTableIsAcid) + ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; + ltd.setLoadFileType(loadType); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -7035,8 +7038,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { currentTransactionId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up - ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsAcid); + LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), + dest_tab.getTableName()) && !destTableIsAcid) + ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; + ltd.setLoadFileType(loadType); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); @@ -13658,8 +13663,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // and don't have a rational way to guess, so assume the most // conservative case. if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE; - else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - getWriteType(dest)); + else return ((ltd.getLoadFileType() == LoadFileType.REPLACE_ALL) + ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType(dest)); } private WriteEntity.WriteType getWriteType(String dest) { http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 71cdbde..f24d1b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -62,11 +62,9 @@ public class CopyUtils { // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(Path destination, List<ReplChangeManager.FileInfo> srcFiles) - throws IOException, LoginException { + public void copyAndVerify(FileSystem destinationFs, Path destination, + List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException { Map<FileSystem, List<ReplChangeManager.FileInfo>> map = fsToFileMap(srcFiles); - FileSystem destinationFs = destination.getFileSystem(hiveConf); - for (Map.Entry<FileSystem, List<ReplChangeManager.FileInfo>> entry : map.entrySet()) { FileSystem sourceFs = entry.getKey(); List<ReplChangeManager.FileInfo> fileInfoList = entry.getValue(); http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 90a970c..e15f59c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -32,7 +32,7 @@ import java.util.Map; */ public class LoadTableDesc extends LoadDesc implements Serializable { private static final long serialVersionUID = 1L; - private boolean replace; + private LoadFileType loadFileType; private DynamicPartitionCtx dpCtx; private ListBucketingCtx lbCtx; private boolean inheritTableSpecs = true; //For partitions, flag controlling whether the current @@ -46,10 +46,15 @@ public class LoadTableDesc extends LoadDesc implements Serializable { private org.apache.hadoop.hive.ql.plan.TableDesc table; private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map + public enum LoadFileType { + REPLACE_ALL, // Remove all existing data before copy/move + KEEP_EXISTING, // If any file exist while copy, then just duplicate the file + OVERWRITE_EXISTING // If any file exist while copy, then just overwrite the file + } public LoadTableDesc(final LoadTableDesc o) { super(o.getSourcePath(), o.getWriteType()); - this.replace = o.replace; + this.loadFileType = o.loadFileType; this.dpCtx = o.dpCtx; this.lbCtx = o.lbCtx; this.inheritTableSpecs = o.inheritTableSpecs; @@ -61,11 +66,11 @@ public class LoadTableDesc extends LoadDesc implements Serializable { public LoadTableDesc(final Path sourcePath, final TableDesc table, final Map<String, String> partitionSpec, - final boolean replace, + final LoadFileType loadFileType, final AcidUtils.Operation writeType, Long currentTransactionId) { super(sourcePath, writeType); this.currentTransactionId = currentTransactionId; - init(table, partitionSpec, replace); + init(table, partitionSpec, loadFileType); } /** @@ -73,13 +78,13 @@ public class LoadTableDesc extends LoadDesc implements Serializable { * @param sourcePath * @param table * @param partitionSpec - * @param replace + * @param loadFileType */ public LoadTableDesc(final Path sourcePath, final TableDesc table, final Map<String, String> partitionSpec, - final boolean replace) { - this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, + final LoadFileType loadFileType) { + this(sourcePath, table, partitionSpec, loadFileType, AcidUtils.Operation.NOT_ACID, null); } @@ -87,7 +92,8 @@ public class LoadTableDesc extends LoadDesc implements Serializable { final TableDesc table, final Map<String, String> partitionSpec, final AcidUtils.Operation writeType, Long currentTransactionId) { - this(sourcePath, table, partitionSpec, true, writeType, currentTransactionId); + this(sourcePath, table, partitionSpec, LoadFileType.REPLACE_ALL, + writeType, currentTransactionId); } /** @@ -99,7 +105,8 @@ public class LoadTableDesc extends LoadDesc implements Serializable { public LoadTableDesc(final Path sourcePath, final TableDesc table, final Map<String, String> partitionSpec) { - this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, null); + this(sourcePath, table, partitionSpec, LoadFileType.REPLACE_ALL, + AcidUtils.Operation.NOT_ACID, null); } public LoadTableDesc(final Path sourcePath, @@ -110,19 +117,19 @@ public class LoadTableDesc extends LoadDesc implements Serializable { this.dpCtx = dpCtx; this.currentTransactionId = currentTransactionId; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { - init(table, dpCtx.getPartSpec(), true); + init(table, dpCtx.getPartSpec(), LoadFileType.REPLACE_ALL); } else { - init(table, new LinkedHashMap<>(), true); + init(table, new LinkedHashMap<>(), LoadFileType.REPLACE_ALL); } } private void init( final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec, - final boolean replace) { + final LoadFileType loadFileType) { this.table = table; this.partitionSpec = partitionSpec; - this.replace = replace; + this.loadFileType = loadFileType; } @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -145,11 +152,15 @@ public class LoadTableDesc extends LoadDesc implements Serializable { @Explain(displayName = "replace") public boolean getReplace() { - return replace; + return (loadFileType == LoadFileType.REPLACE_ALL); + } + + public LoadFileType getLoadFileType() { + return loadFileType; } - public void setReplace(boolean replace) { - this.replace = replace; + public void setLoadFileType(LoadFileType loadFileType) { + this.loadFileType = loadFileType; } public DynamicPartitionCtx getDPCtx() { http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index c0c496f..e523049 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -140,7 +141,8 @@ public class TestExecDriver extends TestCase { db.dropTable(Warehouse.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, true, false, false, false); + db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, + true, false, false, false); i++; } http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java index 0c1c230..cc1d857 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java @@ -83,7 +83,7 @@ public class TestHiveCopyFiles { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false,null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -107,7 +107,7 @@ public class TestHiveCopyFiles { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -127,7 +127,7 @@ public class TestHiveCopyFiles { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -158,7 +158,7 @@ public class TestHiveCopyFiles { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -185,7 +185,7 @@ public class TestHiveCopyFiles { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -205,7 +205,7 @@ public class TestHiveCopyFiles { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, null); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false);