This is an automated email from the ASF dual-hosted git repository.
zhangbutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 20d26ad269a HIVE-28473:INSERT OVERWRITE LOCAL DIRECTORY writes staging
files to wrong hdfs directory (#5407)(yu liang, reviewed by Butao Zhang)
20d26ad269a is described below
commit 20d26ad269af3c281f845df76d3b8d260cabc904
Author: yu liang <[email protected]>
AuthorDate: Fri Dec 20 13:46:53 2024 +0800
HIVE-28473:INSERT OVERWRITE LOCAL DIRECTORY writes staging files to wrong
hdfs directory (#5407)(yu liang, reviewed by Butao Zhang)
---
ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java | 8 +++++---
.../java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 9 ++++++++-
2 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 2721977d6f9..6ab137b4187 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -438,9 +438,11 @@ public class MoveTask extends Task<MoveWork> implements
Serializable {
}
}
else {
- FileSystem targetFs = targetPath.getFileSystem(conf);
- if (!targetFs.exists(targetPath.getParent())){
- targetFs.mkdirs(targetPath.getParent());
+ if (lfd.getIsDfsDir()) {
+ FileSystem targetFs = targetPath.getFileSystem(conf);
+ if (!targetFs.exists(targetPath.getParent())) {
+ targetFs.mkdirs(targetPath.getParent());
+ }
}
moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 5f811571241..04ddb857c0d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8468,6 +8468,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
RowSchema fsRS, boolean canBeMerged,
Table dest_tab, boolean isMmCtas,
Integer dest_type, QB qb, boolean
isDirectInsert, AcidUtils.Operation acidOperation, String moveTaskId) throws
SemanticException {
boolean isInsertOverwrite = false;
+ boolean isLocal = false;
Context.Operation writeOperation = getWriteOperation(dest);
switch (dest_type) {
case QBMetaData.DEST_PARTITION:
@@ -8492,6 +8493,7 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
break;
case QBMetaData.DEST_LOCAL_FILE:
+ isLocal = true;
case QBMetaData.DEST_DFS_FILE:
//CTAS path or insert into file/directory
break;
@@ -8545,7 +8547,12 @@ public class SemanticAnalyzer extends
BaseSemanticAnalyzer {
fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
if (!destTableIsMaterialization &&
HiveConf.getVar(conf,
HIVE_STATS_DBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
- String statsTmpLoc =
ctx.getTempDirForInterimJobPath(dest_path).toString();
+ String statsTmpLoc;
+ if (isLocal){
+ statsTmpLoc = ctx.getMRTmpPath().toString();
+ } else {
+ statsTmpLoc = ctx.getTempDirForInterimJobPath(dest_path).toString();
+ }
fileSinkDesc.setStatsTmpDir(statsTmpLoc);
LOG.debug("Set stats collection dir : " + statsTmpLoc);
}