This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db03a49c54 [fix](external-write) Delete the actual data file on write 
failure to avoid orphan files (#64678)
5db03a49c54 is described below

commit 5db03a49c54ba29707208e95da0b34f07627ca9e
Author: daidai <[email protected]>
AuthorDate: Mon Jun 29 17:02:13 2026 +0800

    [fix](external-write) Delete the actual data file on write failure to avoid 
orphan files (#64678)
    
    When a partition writer fails to close (e.g. hdfs sync error), it cleans
    up the partial file using `_file_name`, but the file is actually created
    with `_get_target_file_name()` ("{_file_name}-{_file_name_index}{ext}").
    The mismatched path makes the cleanup a no-op, leaving an orphan data
    file on the storage.
    
    Use `_get_target_file_name()` for the deletion so it matches the created
    path, for both the Iceberg and Hive partition writers.
---
 .../writer/iceberg/viceberg_partition_writer.cpp   |   3 +-
 be/src/exec/sink/writer/vhive_partition_writer.cpp |  78 ++++++++---
 be/src/exec/sink/writer/vhive_partition_writer.h   |   2 +
 .../doris/datasource/hive/HMSTransaction.java      |  30 +++--
 .../datasource/hive/HMSTransactionPathTest.java    | 145 ++++++++++++++++++++-
 5 files changed, 226 insertions(+), 32 deletions(-)

diff --git a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 434488266bb..5f28c41c24d 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -124,7 +124,8 @@ Status VIcebergPartitionWriter::close(const Status& status) 
{
     }
     bool status_ok = result_status.ok() && status.ok();
     if (!status_ok && _fs != nullptr) {
-        auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
+        // delete the actual created file, otherwise an orphan file is left 
behind
+        auto path = fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name());
         Status st = _fs->delete_file(path);
         if (!st.ok()) {
             LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", 
path, st.to_string());
diff --git a/be/src/exec/sink/writer/vhive_partition_writer.cpp 
b/be/src/exec/sink/writer/vhive_partition_writer.cpp
index 4bfc728c8c9..5e2582ceb5f 100644
--- a/be/src/exec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/exec/sink/writer/vhive_partition_writer.cpp
@@ -131,11 +131,16 @@ Status VHivePartitionWriter::close(const Status& status) {
         }
     }
     bool status_ok = result_status.ok() && status.ok();
-    if (!status_ok && _fs != nullptr) {
-        auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
-        Status st = _fs->delete_file(path);
-        if (!st.ok()) {
-            LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", 
path, st.to_string());
+    if (!status_ok) {
+        _add_s3_mpu_pending_upload_for_rollback();
+        if (_fs != nullptr) {
+            // delete the actual created file, otherwise an orphan file is 
left behind
+            auto path = fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name());
+            Status st = _fs->delete_file(path);
+            if (!st.ok()) {
+                LOG(WARNING) << fmt::format("Delete file {} failed, reason: 
{}", path,
+                                            st.to_string());
+            }
         }
     }
     if (status_ok) {
@@ -164,26 +169,59 @@ THivePartitionUpdate 
VHivePartitionWriter::_build_partition_update() {
     DCHECK(_file_format_transformer != nullptr);
     
hive_partition_update.__set_file_size(_file_format_transformer->written_len());
 
-    if (_write_info.file_type == TFileType::FILE_S3) {
-        DCHECK(_file_writer != nullptr);
-        doris::io::S3FileWriter* s3_mpu_file_writer =
-                dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
-        DCHECK(s3_mpu_file_writer != nullptr);
-        TS3MPUPendingUpload s3_mpu_pending_upload;
-        s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
-        s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
-        s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id());
-
-        std::map<int, std::string> etags;
-        for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
-            etags.insert({completed_part.part_num, completed_part.etag});
-        }
-        s3_mpu_pending_upload.__set_etags(etags);
+    TS3MPUPendingUpload s3_mpu_pending_upload;
+    if (_build_s3_mpu_pending_upload(&s3_mpu_pending_upload)) {
         
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
     }
     return hive_partition_update;
 }
 
+bool VHivePartitionWriter::_build_s3_mpu_pending_upload(TS3MPUPendingUpload* 
pending_upload) {
+    DCHECK(pending_upload != nullptr);
+    if (_write_info.file_type != TFileType::FILE_S3 || _file_writer == 
nullptr) {
+        return false;
+    }
+
+    doris::io::S3FileWriter* s3_mpu_file_writer =
+            dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
+    DCHECK(s3_mpu_file_writer != nullptr);
+    std::string upload_id = s3_mpu_file_writer->upload_id();
+    if (upload_id.empty()) {
+        return false;
+    }
+
+    pending_upload->__set_bucket(s3_mpu_file_writer->bucket());
+    pending_upload->__set_key(s3_mpu_file_writer->key());
+    pending_upload->__set_upload_id(upload_id);
+
+    std::map<int, std::string> etags;
+    for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
+        etags.insert({completed_part.part_num, completed_part.etag});
+    }
+    pending_upload->__set_etags(etags);
+    return true;
+}
+
+void VHivePartitionWriter::_add_s3_mpu_pending_upload_for_rollback() {
+    TS3MPUPendingUpload s3_mpu_pending_upload;
+    if (!_build_s3_mpu_pending_upload(&s3_mpu_pending_upload)) {
+        return;
+    }
+
+    THivePartitionUpdate hive_partition_update;
+    hive_partition_update.__set_name(_partition_name);
+    hive_partition_update.__set_update_mode(_update_mode);
+    THiveLocationParams location;
+    location.__set_write_path(_write_info.original_write_path);
+    location.__set_target_path(_write_info.target_path);
+    hive_partition_update.__set_location(location);
+    hive_partition_update.__set_file_names({});
+    hive_partition_update.__set_row_count(0);
+    hive_partition_update.__set_file_size(0);
+    
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
+    _state->add_hive_partition_updates(hive_partition_update);
+}
+
 std::string VHivePartitionWriter::_get_file_extension(TFileFormatType::type 
file_format_type,
                                                       TFileCompressType::type 
write_compress_type) {
     std::string compress_name;
diff --git a/be/src/exec/sink/writer/vhive_partition_writer.h 
b/be/src/exec/sink/writer/vhive_partition_writer.h
index e958db505c1..0b124108623 100644
--- a/be/src/exec/sink/writer/vhive_partition_writer.h
+++ b/be/src/exec/sink/writer/vhive_partition_writer.h
@@ -76,6 +76,8 @@ private:
 
 private:
     THivePartitionUpdate _build_partition_update();
+    bool _build_s3_mpu_pending_upload(TS3MPUPendingUpload* pending_upload);
+    void _add_s3_mpu_pending_upload_for_rollback();
 
     std::string _get_file_extension(TFileFormatType::type file_format_type,
                                     TFileCompressType::type 
write_compress_type);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 0e2bd1d531c..0fa6a21f59e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -165,9 +165,30 @@ public class HMSTransaction implements Transaction {
         return new ArrayList<>(mm.values());
     }
 
+    private void 
collectUncompletedMpuPendingUploads(List<THivePartitionUpdate> hivePUs) {
+        for (THivePartitionUpdate pu : hivePUs) {
+            if (pu.getS3MpuPendingUploads() != null) {
+                for (TS3MPUPendingUpload s3MPUPendingUpload : 
pu.getS3MpuPendingUploads()) {
+                    uncompletedMpuPendingUploads.add(
+                            new 
UncompletedMpuPendingUpload(s3MPUPendingUpload, 
pu.getLocation().getWritePath()));
+                }
+            }
+        }
+    }
+
     @Override
     public void rollback() {
         if (hmsCommitter == null) {
+            collectUncompletedMpuPendingUploads(hivePartitionUpdates);
+            if (uncompletedMpuPendingUploads.isEmpty()) {
+                return;
+            }
+            hmsCommitter = new HmsCommitter();
+            try {
+                hmsCommitter.rollback();
+            } finally {
+                hmsCommitter.shutdownExecutorService();
+            }
             return;
         }
         try {
@@ -224,14 +245,7 @@ public class HMSTransaction implements Transaction {
         }
 
         List<THivePartitionUpdate> mergedPUs = 
mergePartitions(hivePartitionUpdates);
-        for (THivePartitionUpdate pu : mergedPUs) {
-            if (pu.getS3MpuPendingUploads() != null) {
-                for (TS3MPUPendingUpload s3MPUPendingUpload : 
pu.getS3MpuPendingUploads()) {
-                    uncompletedMpuPendingUploads.add(
-                            new 
UncompletedMpuPendingUpload(s3MPUPendingUpload, 
pu.getLocation().getWritePath()));
-                }
-            }
-        }
+        collectUncompletedMpuPendingUploads(mergedPUs);
         List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
insertExistsPartitions = new ArrayList<>();
         for (THivePartitionUpdate pu : mergedPUs) {
             TUpdateMode updateMode = pu.getUpdateMode();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
index 1468d0d5b3c..35bdb686544 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSTransactionPathTest.java
@@ -23,9 +23,17 @@ import org.apache.doris.filesystem.FileEntry;
 import org.apache.doris.filesystem.FileIterator;
 import org.apache.doris.filesystem.FileSystem;
 import org.apache.doris.filesystem.Location;
+import org.apache.doris.filesystem.UploadPartResult;
 import org.apache.doris.filesystem.local.LocalFileSystem;
+import org.apache.doris.filesystem.spi.ObjFileSystem;
+import org.apache.doris.filesystem.spi.ObjStorage;
+import org.apache.doris.filesystem.spi.RemoteObject;
+import org.apache.doris.filesystem.spi.RemoteObjects;
+import org.apache.doris.filesystem.spi.RequestBody;
 import org.apache.doris.fs.SpiSwitchingFileSystem;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THiveLocationParams;
+import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TS3MPUPendingUpload;
 
 import org.junit.After;
@@ -173,6 +181,12 @@ public class HMSTransactionPathTest {
         return new HMSTransaction(null, spiFs, Runnable::run);
     }
 
+    private static void setEmptyStagingDirectory(HMSTransaction tx) throws 
Exception {
+        Field stagingDirField = 
HMSTransaction.class.getDeclaredField("stagingDirectory");
+        stagingDirField.setAccessible(true);
+        stagingDirField.set(tx, java.util.Optional.empty());
+    }
+
     private static class FakeFileSystem implements FileSystem {
         IOException listDirectoriesThrows;
         IOException listFilesThrows;
@@ -302,9 +316,7 @@ public class HMSTransactionPathTest {
         uploads.add(upload);
 
         // stagingDirectory is only initialized inside commit(); initialize it 
here to avoid NPE in rollback().
-        Field stagingDirField = 
HMSTransaction.class.getDeclaredField("stagingDirectory");
-        stagingDirField.setAccessible(true);
-        stagingDirField.set(tx, java.util.Optional.empty());
+        setEmptyStagingDirectory(tx);
 
         // Instantiate HmsCommitter (package-private inner class) for this 
transaction.
         Class<?> committerClass = 
Arrays.stream(HMSTransaction.class.getDeclaredClasses())
@@ -323,4 +335,131 @@ public class HMSTransactionPathTest {
         // After rollback, uncompletedMpuPendingUploads must be cleared.
         Assert.assertTrue("uncompletedMpuPendingUploads must be cleared after 
rollback", uploads.isEmpty());
     }
+
+    @Test
+    public void testRollbackAbortsPendingMpuBeforeCommitterCreated() throws 
Exception {
+        TrackingObjStorage storage = new TrackingObjStorage();
+        HMSTransaction tx = createTransaction(new TestObjFileSystem(storage));
+
+        TS3MPUPendingUpload mpu = new TS3MPUPendingUpload();
+        mpu.setBucket("test-bucket");
+        mpu.setKey("warehouse/table/data-0.parquet");
+        mpu.setUploadId("upload-id-1");
+
+        THiveLocationParams location = new THiveLocationParams();
+        location.setWritePath("s3://test-bucket/warehouse/table");
+
+        THivePartitionUpdate update = new THivePartitionUpdate();
+        update.setLocation(location);
+        update.setFileNames(Collections.emptyList());
+        update.setRowCount(0);
+        update.setFileSize(0);
+        update.setS3MpuPendingUploads(Collections.singletonList(mpu));
+        tx.updateHivePartitionUpdates(Collections.singletonList(update));
+        setEmptyStagingDirectory(tx);
+
+        tx.rollback();
+
+        
Assert.assertEquals(Collections.singletonList("s3://test-bucket/warehouse/table/data-0.parquet"),
+                storage.abortedPaths);
+        Assert.assertEquals(Collections.singletonList("upload-id-1"), 
storage.abortedUploadIds);
+    }
+
+    private static class TestObjFileSystem extends ObjFileSystem {
+        TestObjFileSystem(ObjStorage<?> storage) {
+            super(storage);
+        }
+
+        @Override
+        public void mkdirs(Location location) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void delete(Location location, boolean recursive) throws 
IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void rename(Location src, Location dst) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public FileIterator list(Location location) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public DorisInputFile newInputFile(Location location) throws 
IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public DorisOutputFile newOutputFile(Location location) throws 
IOException {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class TrackingObjStorage implements ObjStorage<Object> {
+        private final List<String> abortedPaths = new ArrayList<>();
+        private final List<String> abortedUploadIds = new ArrayList<>();
+
+        @Override
+        public Object getClient() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public RemoteObjects listObjects(String remotePath, String 
continuationToken) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public RemoteObject headObject(String remotePath) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void putObject(String remotePath, RequestBody requestBody) 
throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void deleteObject(String remotePath) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void copyObject(String srcPath, String dstPath) throws 
IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String initiateMultipartUpload(String remotePath) throws 
IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public UploadPartResult uploadPart(String remotePath, String uploadId, 
int partNum,
+                RequestBody body) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void completeMultipartUpload(String remotePath, String uploadId,
+                List<UploadPartResult> parts) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void abortMultipartUpload(String remotePath, String uploadId) 
throws IOException {
+            abortedPaths.add(remotePath);
+            abortedUploadIds.add(uploadId);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to