This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c3507f5bcc4 [bugfix](hive)fix after insert overwrite hive table, data 
error (#43049)
c3507f5bcc4 is described below

commit c3507f5bcc41ff9626d8912b36d499e09562bf13
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Sat Nov 2 11:19:29 2024 +0800

    [bugfix](hive)fix after insert overwrite hive table, data error (#43049)
    
    ### What problem does this PR solve?
    
    1. Different remoteFs should correspond to different nativeFs.
    2. If it is s3,  we do not need to delete the stage directory.
    3. When an error occurs when deleting a directory, we need to roll back.
---
 .../doris/datasource/hive/HMSTransaction.java      | 30 ++++++++++++++++------
 .../doris/fs/remote/RemoteFSPhantomManager.java    |  9 +++++++
 .../org/apache/doris/fs/remote/S3FileSystem.java   | 15 +++++++++--
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  6 ++---
 .../org/apache/doris/planner/HiveTableSink.java    |  2 +-
 5 files changed, 48 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 6183c277c1b..02c99a695c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -101,7 +101,7 @@ public class HMSTransaction implements Transaction {
     private final Executor fileSystemExecutor;
     private HmsCommitter hmsCommitter;
     private List<THivePartitionUpdate> hivePartitionUpdates = 
Lists.newArrayList();
-    private String declaredIntentionsToWrite;
+    private Optional<String> stagingDirectory;
     private boolean isMockedPartitionUpdate = false;
 
     private static class UncompletedMpuPendingUpload {
@@ -184,10 +184,14 @@ public class HMSTransaction implements Transaction {
     }
 
     public void beginInsertTable(HiveInsertCommandContext ctx) {
-        declaredIntentionsToWrite = ctx.getWritePath();
         queryId = ctx.getQueryId();
         isOverwrite = ctx.isOverwrite();
         fileType = ctx.getFileType();
+        if (fileType == TFileType.FILE_S3) {
+            stagingDirectory = Optional.empty();
+        } else {
+            stagingDirectory = Optional.of(ctx.getWritePath());
+        }
     }
 
     public void finishInsertTable(SimpleTableInfo tableInfo) {
@@ -207,10 +211,12 @@ public class HMSTransaction implements Transaction {
                             }
                         });
                     } else {
-                        fs.makeDir(declaredIntentionsToWrite);
-                        setLocation(new THiveLocationParams() {{
-                                setWritePath(declaredIntentionsToWrite);
-                            }
+                        stagingDirectory.ifPresent((v) -> {
+                            fs.makeDir(v);
+                            setLocation(new THiveLocationParams() {{
+                                    setWritePath(v);
+                                }
+                            });
                         });
                     }
                 }
@@ -643,15 +649,23 @@ public class HMSTransaction implements Transaction {
         if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
             LOG.warn("Failed to delete directory {}. Some eligible items can't 
be deleted: {}.",
                     directory.toString(), 
deleteResult.getNotDeletedEligibleItems());
+            throw new RuntimeException(
+                "Failed to delete directory for files: " + 
deleteResult.getNotDeletedEligibleItems());
         } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
             LOG.warn("Failed to delete directory {} due to dir isn't empty", 
directory.toString());
+            throw new RuntimeException("Failed to delete directory for empty 
dir: " + directory.toString());
         }
     }
 
     private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir, boolean reverse) {
         try {
-            if (!fs.directoryExists(directory.toString()).ok()) {
+            Status status = fs.directoryExists(directory.toString());
+            if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) {
                 return new DeleteRecursivelyResult(true, ImmutableList.of());
+            } else if (!status.ok()) {
+                ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
+                notDeletedEligibleItems.add(directory.toString() + "/*");
+                return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
             }
         } catch (Exception e) {
             ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
@@ -1447,7 +1461,7 @@ public class HMSTransaction implements Transaction {
         }
 
         private void pruneAndDeleteStagingDirectories() {
-            recursiveDeleteItems(new Path(declaredIntentionsToWrite), true, 
false);
+            stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new 
Path(v), true, false));
         }
 
         private void abortMultiUploads() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
index 282361c4cb6..c0e48a13466 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.fs.remote;
 
 import org.apache.doris.common.CustomThreadFactory;
 
+import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -27,6 +28,7 @@ import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -63,6 +65,8 @@ public class RemoteFSPhantomManager {
     private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, 
FileSystem> referenceMap
             = new ConcurrentHashMap<>();
 
+    private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();
+
     // Flag indicating whether the cleanup thread has been started
     private static final AtomicBoolean isStarted = new AtomicBoolean(false);
 
@@ -77,9 +81,13 @@ public class RemoteFSPhantomManager {
             start();
             isStarted.set(true);
         }
+        if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
+            throw new RuntimeException("FileSystem already exists: " + 
remoteFileSystem.dfsFileSystem.getUri());
+        }
         RemoteFileSystemPhantomReference phantomReference = new 
RemoteFileSystemPhantomReference(remoteFileSystem,
                 referenceQueue);
         referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
+        fsSet.add(remoteFileSystem.dfsFileSystem);
     }
 
     /**
@@ -102,6 +110,7 @@ public class RemoteFSPhantomManager {
                             if (fs != null) {
                                 try {
                                     fs.close();
+                                    fsSet.remove(fs);
                                     LOG.info("Closed file system: {}", 
fs.getUri());
                                 } catch (IOException e) {
                                     LOG.warn("Failed to close file system", e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 87ba086baec..f8805bd0d4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -20,6 +20,8 @@ package org.apache.doris.fs.remote;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.fs.obj.S3ObjStorage;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -34,6 +36,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -74,12 +77,20 @@ public class S3FileSystem extends ObjFileSystem {
                     
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
                             .filter(entry -> entry.getKey() != null && 
entry.getValue() != null)
                             .forEach(entry -> conf.set(entry.getKey(), 
entry.getValue()));
+                    AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(conf);
+                    HadoopAuthenticator authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
                     try {
-                        dfsFileSystem = FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                        dfsFileSystem = authenticator.doAs(() -> {
+                            try {
+                                return FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+                        RemoteFSPhantomManager.registerPhantomReference(this);
                     } catch (Exception e) {
                         throw new UserException("Failed to get S3 FileSystem 
for " + e.getMessage(), e);
                     }
-                    RemoteFSPhantomManager.registerPhantomReference(this);
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 7034641a9fc..2146472aec7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -99,11 +99,11 @@ public class DFSFileSystem extends RemoteFileSystem {
                                 throw new RuntimeException(e);
                             }
                         });
+                        operations = new HDFSFileOperations(dfsFileSystem);
+                        RemoteFSPhantomManager.registerPhantomReference(this);
                     } catch (Exception e) {
-                        throw new UserException(e);
+                        throw new UserException("Failed to get dfs FileSystem 
for " + e.getMessage(), e);
                     }
-                    operations = new HDFSFileOperations(dfsFileSystem);
-                    RemoteFSPhantomManager.registerPhantomReference(this);
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index d1f8ab411ea..168f92c113c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -133,7 +133,7 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
             if (insertCtx.isPresent()) {
                 HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
                 tSink.setOverwrite(context.isOverwrite());
-                context.setWritePath(storageLocation);
+                context.setWritePath(location);
                 context.setFileType(fileType);
             }
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to