This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c3507f5bcc4 [bugfix](hive)fix after insert overwrite hive table, data error (#43049) c3507f5bcc4 is described below commit c3507f5bcc41ff9626d8912b36d499e09562bf13 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Sat Nov 2 11:19:29 2024 +0800 [bugfix](hive)fix after insert overwrite hive table, data error (#43049) ### What problem does this PR solve? 1. Different remoteFs should correspond to different nativeFs. 2. If it is s3, we do not need to delete the stage directory. 3. When an error occurs when deleting a directory, we need to roll back. --- .../doris/datasource/hive/HMSTransaction.java | 30 ++++++++++++++++------ .../doris/fs/remote/RemoteFSPhantomManager.java | 9 +++++++ .../org/apache/doris/fs/remote/S3FileSystem.java | 15 +++++++++-- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 6 ++--- .../org/apache/doris/planner/HiveTableSink.java | 2 +- 5 files changed, 48 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 6183c277c1b..02c99a695c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -101,7 +101,7 @@ public class HMSTransaction implements Transaction { private final Executor fileSystemExecutor; private HmsCommitter hmsCommitter; private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList(); - private String declaredIntentionsToWrite; + private Optional<String> stagingDirectory; private boolean isMockedPartitionUpdate = false; private static class UncompletedMpuPendingUpload { @@ -184,10 +184,14 @@ public class HMSTransaction implements Transaction { } public void beginInsertTable(HiveInsertCommandContext ctx) { - declaredIntentionsToWrite = ctx.getWritePath(); queryId = ctx.getQueryId(); isOverwrite = ctx.isOverwrite(); fileType = ctx.getFileType(); + if (fileType == TFileType.FILE_S3) { + stagingDirectory = Optional.empty(); + } else { + stagingDirectory = Optional.of(ctx.getWritePath()); + } } public void finishInsertTable(SimpleTableInfo tableInfo) { @@ -207,10 +211,12 @@ public class HMSTransaction implements Transaction { } }); } else { - fs.makeDir(declaredIntentionsToWrite); - setLocation(new THiveLocationParams() {{ - setWritePath(declaredIntentionsToWrite); - } + stagingDirectory.ifPresent((v) -> { + fs.makeDir(v); + setLocation(new THiveLocationParams() {{ + setWritePath(v); + } + }); }); } } @@ -643,15 +649,23 @@ public class HMSTransaction implements Transaction { if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) { LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.", directory.toString(), deleteResult.getNotDeletedEligibleItems()); + throw new RuntimeException( + "Failed to delete directory for files: " + deleteResult.getNotDeletedEligibleItems()); } else if (deleteEmptyDir && !deleteResult.dirNotExists()) { LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString()); + throw new RuntimeException("Failed to delete directory for empty dir: " + directory.toString()); } } private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir, boolean reverse) { try { - if (!fs.directoryExists(directory.toString()).ok()) { + Status status = fs.directoryExists(directory.toString()); + if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) { return new DeleteRecursivelyResult(true, ImmutableList.of()); + } else if (!status.ok()) { + ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); + notDeletedEligibleItems.add(directory.toString() + "/*"); + return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); } } catch (Exception e) { ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); @@ -1447,7 +1461,7 @@ public class HMSTransaction implements Transaction { } private void pruneAndDeleteStagingDirectories() { - recursiveDeleteItems(new Path(declaredIntentionsToWrite), true, false); + stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new Path(v), true, false)); } private void abortMultiUploads() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java index 282361c4cb6..c0e48a13466 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -19,6 +19,7 @@ package org.apache.doris.fs.remote; import org.apache.doris.common.CustomThreadFactory; +import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +28,7 @@ import java.io.IOException; import java.lang.ref.PhantomReference; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -63,6 +65,8 @@ public class RemoteFSPhantomManager { private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap = new ConcurrentHashMap<>(); + private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet(); + // Flag indicating whether the cleanup thread has been started private static final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -77,9 +81,13 @@ public class RemoteFSPhantomManager { start(); isStarted.set(true); } + if (fsSet.contains(remoteFileSystem.dfsFileSystem)) { + throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri()); + } RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem, referenceQueue); referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem); + fsSet.add(remoteFileSystem.dfsFileSystem); } /** @@ -102,6 +110,7 @@ public class RemoteFSPhantomManager { if (fs != null) { try { fs.close(); + fsSet.remove(fs); LOG.info("Closed file system: {}", fs.getUri()); } catch (IOException e) { LOG.warn("Failed to close file system", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 87ba086baec..f8805bd0d4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -20,6 +20,8 @@ package org.apache.doris.fs.remote; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.obj.S3ObjStorage; import org.apache.doris.fs.remote.dfs.DFSFileSystem; @@ -34,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -74,12 +77,20 @@ public class S3FileSystem extends ObjFileSystem { PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream() .filter(entry -> entry.getKey() != null && entry.getValue() != null) .forEach(entry -> conf.set(entry.getKey(), entry.getValue())); + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); try { - dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf); + dfsFileSystem = authenticator.doAs(() -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + RemoteFSPhantomManager.registerPhantomReference(this); } catch (Exception e) { throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); } - RemoteFSPhantomManager.registerPhantomReference(this); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 7034641a9fc..2146472aec7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -99,11 +99,11 @@ public class DFSFileSystem extends RemoteFileSystem { throw new RuntimeException(e); } }); + operations = new HDFSFileOperations(dfsFileSystem); + RemoteFSPhantomManager.registerPhantomReference(this); } catch (Exception e) { - throw new UserException(e); + throw new UserException("Failed to get dfs FileSystem for " + e.getMessage(), e); } - operations = new HDFSFileOperations(dfsFileSystem); - RemoteFSPhantomManager.registerPhantomReference(this); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index d1f8ab411ea..168f92c113c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -133,7 +133,7 @@ public class HiveTableSink extends BaseExternalTableDataSink { if (insertCtx.isPresent()) { HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); tSink.setOverwrite(context.isOverwrite()); - context.setWritePath(storageLocation); + context.setWritePath(location); context.setFileType(fileType); } } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org