This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d32bfce601 [Fix][HiveSink] Implement overwrite semantics for streaming 
commits to prevent multiple deletions of target directories (#10279)
d32bfce601 is described below

commit d32bfce601ede01a3ff3758d6c1e1fd83f95dd8a
Author: Adam Wang <[email protected]>
AuthorDate: Thu Jan 8 19:12:19 2026 +0800

    [Fix][HiveSink] Implement overwrite semantics for streaming commits to 
prevent multiple deletions of target directories (#10279)
    
    Co-authored-by: wangxiaogang <[email protected]>
---
 docs/en/connector-v2/sink/Hive.md                  |   9 +-
 docs/zh/connector-v2/sink/Hive.md                  |  11 +-
 .../hive/commit/HiveSinkAggregatedCommitter.java   | 169 ++++++++++-
 ...kAggregatedCommitterOverwriteStreamingTest.java | 331 +++++++++++++++++++++
 4 files changed, 505 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index 3b629e6154..4c8625e69b 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -102,6 +102,11 @@ Support writing Parquet INT96 from a timestamp, only valid 
for parquet files.
 
 ### overwrite [boolean]
 
+Flag to decide whether to use overwrite mode when inserting data into Hive. If 
set to true, for non-partitioned tables, the existing data in the table will be 
deleted before inserting new data. For partitioned tables, the data in the 
relevant partition will be deleted before inserting new data.
+
+- Batch mode (BATCH): Delete existing data in the target path before commit 
(for non-partitioned tables, delete the table directory; for partitioned 
tables, delete the related partition directories), then write new data.
+- Streaming mode (STREAMING): In streaming jobs with checkpointing enabled, 
`commit()` is invoked after each completed checkpoint. To avoid deleting on 
every checkpoint (which would wipe previously committed files), SeaTunnel 
deletes each target directory (table directory / partition directory) at most 
once (empty commits will skip deletion). On recovery, the delete step is 
best-effort and may be skipped to avoid deleting already committed data, so 
streaming overwrite is not a strict sna [...]
+
 ### data_save_mode [enum]
 
 Select how to handle existing data on the target before writing new data.
@@ -112,8 +117,6 @@ Select how to handle existing data on the target before 
writing new data.
 
 Note: overwrite=true and data_save_mode=DROP_DATA are equivalent. Use either 
one; do not set both.
 
-Flag to decide whether to use overwrite mode when inserting data into Hive. If 
set to true, for non-partitioned tables, the existing data in the table will be 
deleted before inserting new data. For partitioned tables, the data in the 
relevant partition will be deleted before inserting new data.
-
 ### schema_save_mode [enum]
 
 Before starting the synchronization task, different processing schemes are 
selected for the existing table structure on the target side.
@@ -574,4 +577,4 @@ sink {
 ```
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connector-v2/sink/Hive.md 
b/docs/zh/connector-v2/sink/Hive.md
index af3ad07780..1ba385ba53 100644
--- a/docs/zh/connector-v2/sink/Hive.md
+++ b/docs/zh/connector-v2/sink/Hive.md
@@ -100,7 +100,12 @@ Kerberos 的 keytab 文件路径
 
 支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。
 
-### schema_save_mode [枚举]
+### overwrite [boolean]
+
+是否以覆盖写入(Overwrite)方式写入 Hive。
+
+- 批模式(BATCH):在提交前删除目标路径中已有数据(非分区表删除表目录;分区表删除本次提交涉及的分区目录),再写入新数据。
+- 流模式(STREAMING):在启用 checkpoint 的流式运行时,commit 会在每个 checkpoint 完成后触发一次。为避免每个 
checkpoint 都重复删除导致数据丢失,SeaTunnel 
会对每个目标目录(表目录/分区目录)最多删除一次(空提交会跳过删除)。恢复(recovery)场景下为避免误删已提交数据,删除行为为 
best-effort,可能会被跳过,因此不保证严格的“全量覆盖”语义。
 
 ### data_save_mode [enum]
 
@@ -112,6 +117,8 @@ Kerberos 的 keytab 文件路径
 
 注意:overwrite=true 与 data_save_mode=DROP_DATA 行为等价,二者择一配置即可,勿同时设置。
 
+### schema_save_mode [枚举]
+
 在开始同步任务之前,针对目标端已存在的表结构选择不同的处理方案。
 
 **默认值**: `CREATE_SCHEMA_WHEN_NOT_EXIST`
@@ -563,4 +570,4 @@ sink {
 ```
 ## 变更日志
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 9a142fd66e..2c532c6d55 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -33,6 +33,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -42,6 +43,29 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
     private final boolean abortDropPartitionMetadata;
     private final org.apache.seatunnel.api.sink.DataSaveMode dataSaveMode;
 
+    /**
+     * Guard for overwrite semantics in Flink streaming engine.
+     *
+     * <p>In streaming mode, {@code commit()} is invoked on every completed 
checkpoint. For
+     * overwrite (DROP_DATA), we must avoid deleting target directories on 
every checkpoint;
+     * otherwise previously committed files will be wiped and only the last 
checkpoint's files
+     * remain.
+     *
+     * <p>We delete each target directory (partition directory / table 
directory) at most once per
+     * job attempt so that dynamic partitions can still be overwritten when 
first written.
+     */
+    private final Set<String> deletedTargetDirectories = 
ConcurrentHashMap.newKeySet();
+
+    /**
+     * Best-effort recovery detection based on the first seen checkpoint id 
embedded in transaction
+     * directory name (e.g. .../T_xxx_0_2 means checkpoint 2).
+     *
+     * <p>If the first seen checkpoint id is greater than 1, it usually 
indicates the job is
+     * recovering from a previous checkpoint. In that case, deleting the 
target directories would
+     * destroy already committed data that is consistent with the restored 
state.
+     */
+    private volatile Long minCheckpointIdSeen = null;
+
     private final ReadonlyConfig readonlyConfig;
     private final HiveMetaStoreCatalog hiveMetaStore;
 
@@ -69,8 +93,15 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
             List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws 
IOException {
         log.info("Aggregated commit infos: {}", aggregatedCommitInfos);
         if (dataSaveMode == 
org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA) {
-            log.info("DataSaveMode=DROP_DATA: delete existing target 
directories before commit.");
-            deleteDirectories(aggregatedCommitInfos);
+            updateMinCheckpointIdSeen(aggregatedCommitInfos);
+            if (minCheckpointIdSeen != null && minCheckpointIdSeen > 1) {
+                log.info(
+                        "DataSaveMode=DROP_DATA: skip deleting target 
directories before commit."
+                                + " Recovery is detected, 
minCheckpointIdSeen={}",
+                        minCheckpointIdSeen);
+            } else {
+                deleteDirectories(aggregatedCommitInfos);
+            }
         }
 
         List<FileAggregatedCommitInfo> errorCommitInfos = 
super.commit(aggregatedCommitInfos);
@@ -134,12 +165,14 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
      *
      * @param aggregatedCommitInfos
      */
-    private void deleteDirectories(List<FileAggregatedCommitInfo> 
aggregatedCommitInfos)
+    private boolean deleteDirectories(List<FileAggregatedCommitInfo> 
aggregatedCommitInfos)
             throws IOException {
         if (aggregatedCommitInfos.isEmpty()) {
-            return;
+            return false;
         }
 
+        boolean anyDeleted = false;
+
         for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
             LinkedHashMap<String, LinkedHashMap<String, String>> 
transactionMap =
                     aggregatedCommitInfo.getTransactionMap();
@@ -162,9 +195,19 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
                 if 
(aggregatedCommitInfo.getPartitionDirAndValuesMap().isEmpty()) {
                     // For non-partitioned table, extract and delete table 
directory
                     // Example: 
hdfs://hadoop-master1:8020/warehouse/test_overwrite_1/
-                    String tableDir = targetPath.substring(0, 
targetPath.lastIndexOf('/'));
-                    hadoopFileSystemProxy.deleteFile(tableDir);
-                    log.info("Deleted table directory: {}", tableDir);
+                    int lastSeparator =
+                            Math.max(targetPath.lastIndexOf('/'), 
targetPath.lastIndexOf('\\'));
+                    if (lastSeparator <= 0) {
+                        log.warn(
+                                "Skip deleting table directory because target 
path has no separator: {}",
+                                targetPath);
+                        continue;
+                    }
+                    String tableDir = targetPath.substring(0, lastSeparator);
+                    if (deleteTargetDirectoryOnce(tableDir)) {
+                        log.info("Deleted table directory: {}", tableDir);
+                        anyDeleted = true;
+                    }
                 } else {
                     // For partitioned table, extract and delete partition 
directories
                     // Example:
@@ -172,12 +215,25 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
                     Set<String> partitionDirs =
                             transactionMap.values().stream()
                                     .flatMap(m -> m.values().stream())
-                                    .map(path -> path.substring(0, 
path.lastIndexOf('/')))
+                                    .map(
+                                            path -> {
+                                                int sep =
+                                                        Math.max(
+                                                                
path.lastIndexOf('/'),
+                                                                
path.lastIndexOf('\\'));
+                                                if (sep <= 0) {
+                                                    return null;
+                                                }
+                                                return path.substring(0, sep);
+                                            })
+                                    .filter(p -> p != null && !p.isEmpty())
                                     .collect(Collectors.toSet());
 
                     for (String partitionDir : partitionDirs) {
-                        hadoopFileSystemProxy.deleteFile(partitionDir);
-                        log.info("Deleted partition directory: {}", 
partitionDir);
+                        if (deleteTargetDirectoryOnce(partitionDir)) {
+                            log.info("Deleted partition directory: {}", 
partitionDir);
+                            anyDeleted = true;
+                        }
                     }
                 }
             } catch (IOException e) {
@@ -185,5 +241,98 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
                 throw e;
             }
         }
+
+        return anyDeleted;
+    }
+
+    private boolean deleteTargetDirectoryOnce(String directory) throws 
IOException {
+        if (directory == null || directory.isEmpty()) {
+            return false;
+        }
+
+        String normalized = normalizeDirectoryPath(directory);
+        if (normalized.isEmpty()) {
+            return false;
+        }
+
+        if (!deletedTargetDirectories.add(normalized)) {
+            return false;
+        }
+
+        hadoopFileSystemProxy.deleteFile(directory);
+        return true;
+    }
+
+    private String normalizeDirectoryPath(String directory) {
+        String normalized = directory.replace('\\', '/');
+        while (normalized.endsWith("/")) {
+            normalized = normalized.substring(0, normalized.length() - 1);
+        }
+        return normalized;
+    }
+
+    private void updateMinCheckpointIdSeen(List<FileAggregatedCommitInfo> 
aggregatedCommitInfos) {
+        if (aggregatedCommitInfos == null || aggregatedCommitInfos.isEmpty()) {
+            return;
+        }
+
+        long minInThisCommit = Long.MAX_VALUE;
+        boolean found = false;
+
+        for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
+            if (aggregatedCommitInfo == null || 
aggregatedCommitInfo.getTransactionMap() == null) {
+                continue;
+            }
+            for (String transactionDir : 
aggregatedCommitInfo.getTransactionMap().keySet()) {
+                long checkpointId = 
parseCheckpointIdFromTransactionDir(transactionDir);
+                if (checkpointId > 0) {
+                    minInThisCommit = Math.min(minInThisCommit, checkpointId);
+                    found = true;
+                }
+            }
+        }
+
+        if (!found) {
+            return;
+        }
+
+        if (minCheckpointIdSeen == null) {
+            minCheckpointIdSeen = minInThisCommit;
+        } else {
+            minCheckpointIdSeen = Math.min(minCheckpointIdSeen, 
minInThisCommit);
+        }
+    }
+
+    /**
+     * Parses checkpoint id from transaction directory.
+     *
+     * <p>Expected pattern in transaction dir name: 
.../T_..._<subtaskIndex>_<checkpointId>
+     */
+    private long parseCheckpointIdFromTransactionDir(String transactionDir) {
+        if (transactionDir == null || transactionDir.isEmpty()) {
+            return -1;
+        }
+
+        String normalized = transactionDir.replace('\\', '/');
+        while (normalized.endsWith("/")) {
+            normalized = normalized.substring(0, normalized.length() - 1);
+        }
+        int lastSlash = normalized.lastIndexOf('/');
+        String baseName = lastSlash >= 0 ? normalized.substring(lastSlash + 1) 
: normalized;
+        if (baseName.isEmpty()) {
+            return -1;
+        }
+
+        int lastUnderscore = baseName.lastIndexOf('_');
+        if (lastUnderscore < 0 || lastUnderscore == baseName.length() - 1) {
+            return -1;
+        }
+
+        String lastToken = baseName.substring(lastUnderscore + 1);
+        try {
+            return Long.parseLong(lastToken);
+        } catch (NumberFormatException ignored) {
+            return -1;
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitterOverwriteStreamingTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitterOverwriteStreamingTest.java
new file mode 100644
index 0000000000..fb8a6bf5ee
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitterOverwriteStreamingTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.commit;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+class HiveSinkAggregatedCommitterOverwriteStreamingTest {
+
+    private static class TestableCommitter extends HiveSinkAggregatedCommitter 
{
+        TestableCommitter(
+                ReadonlyConfig cfg, String dbName, String tableName, 
HadoopConf hadoopConf) {
+            super(cfg, dbName, tableName, hadoopConf);
+        }
+
+        void setFileSystemProxy(HadoopFileSystemProxy proxy) {
+            this.hadoopFileSystemProxy = proxy;
+        }
+    }
+
+    @Test
+    void shouldDeletePartitionDirectoryOnlyOnceAcrossStreamingCheckpoints() 
throws Exception {
+        // Given
+        ReadonlyConfig readonlyConfig = minimalHiveReadonlyConfig(true);
+        TestableCommitter committer =
+                new TestableCommitter(readonlyConfig, "db", "tbl", new 
HadoopConf("hdfs://dummy"));
+
+        HiveMetaStoreCatalog hiveMetaStore = 
Mockito.mock(HiveMetaStoreCatalog.class);
+        Mockito.doNothing()
+                .when(hiveMetaStore)
+                .addPartitions(Mockito.anyString(), Mockito.anyString(), 
Mockito.anyList());
+        setHiveMetaStore(committer, hiveMetaStore);
+
+        HadoopFileSystemProxy fs = Mockito.mock(HadoopFileSystemProxy.class);
+        committer.setFileSystemProxy(fs);
+
+        String partitionDir = "/warehouse/db/tbl/pt=2025-12-16";
+
+        // checkpoint 1: empty transaction (matches production log pattern)
+        FileAggregatedCommitInfo cp1Empty =
+                aggregatedCommitInfo(
+                        "/tmp/seatunnel/T_job_0_1", Collections.emptyMap(), 
Collections.emptyMap());
+
+        // checkpoint 2: has one file -> should trigger overwrite deletion once
+        FileAggregatedCommitInfo cp2 =
+                aggregatedCommitInfo(
+                        "/tmp/seatunnel/T_job_0_2",
+                        Collections.singletonMap(
+                                
"/tmp/seatunnel/T_job_0_2/pt=2025-12-16/f1.parquet",
+                                partitionDir + "/f1.parquet"),
+                        Collections.singletonMap(
+                                "pt=2025-12-16", 
Collections.singletonList("2025-12-16")));
+
+        // checkpoint 3: has one more file -> MUST NOT delete partitionDir 
again
+        FileAggregatedCommitInfo cp3 =
+                aggregatedCommitInfo(
+                        "/tmp/seatunnel/T_job_0_3",
+                        Collections.singletonMap(
+                                
"/tmp/seatunnel/T_job_0_3/pt=2025-12-16/f2.parquet",
+                                partitionDir + "/f2.parquet"),
+                        Collections.singletonMap(
+                                "pt=2025-12-16", 
Collections.singletonList("2025-12-16")));
+
+        // When
+        committer.commit(Collections.singletonList(cp1Empty));
+        committer.commit(Collections.singletonList(cp2));
+        committer.commit(Collections.singletonList(cp3));
+
+        // Then
+        // deleteFile is also used to delete transaction dirs in 
super.commit(). We only assert
+        // deletion of the *target* partition directory happens once.
+        Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir);
+    }
+
+    @Test
+    void 
shouldDeleteEachNewPartitionDirectoryOnlyOnceAcrossStreamingCheckpoints()
+            throws Exception {
+        // Given
+        ReadonlyConfig readonlyConfig = minimalHiveReadonlyConfig(true);
+        TestableCommitter committer =
+                new TestableCommitter(readonlyConfig, "db", "tbl", new 
HadoopConf("hdfs://dummy"));
+
+        HiveMetaStoreCatalog hiveMetaStore = 
Mockito.mock(HiveMetaStoreCatalog.class);
+        Mockito.doNothing()
+                .when(hiveMetaStore)
+                .addPartitions(Mockito.anyString(), Mockito.anyString(), 
Mockito.anyList());
+        setHiveMetaStore(committer, hiveMetaStore);
+
+        HadoopFileSystemProxy fs = Mockito.mock(HadoopFileSystemProxy.class);
+        committer.setFileSystemProxy(fs);
+
+        String partitionDir1 = "/warehouse/db/tbl/pt=2025-12-16";
+        String partitionDir2 = "/warehouse/db/tbl/pt=2025-12-17";
+
+        // checkpoint 1: empty transaction
+        FileAggregatedCommitInfo cp1Empty =
+                aggregatedCommitInfo(
+                        "/tmp/seatunnel/T_job_0_1", Collections.emptyMap(), 
Collections.emptyMap());
+
+        // checkpoint 2: first partition
+        FileAggregatedCommitInfo cp2 =
+                aggregatedCommitInfo(
+                        "/tmp/seatunnel/T_job_0_2",
+                        Collections.singletonMap(
+                                
"/tmp/seatunnel/T_job_0_2/pt=2025-12-16/f1.parquet",
+                                partitionDir1 + "/f1.parquet"),
+                        Collections.singletonMap(
+                                "pt=2025-12-16", 
Collections.singletonList("2025-12-16")));
+
+        // checkpoint 3: new partition appears
+        FileAggregatedCommitInfo cp3 =
+                aggregatedCommitInfo(
+                        "/tmp/seatunnel/T_job_0_3",
+                        Collections.singletonMap(
+                                
"/tmp/seatunnel/T_job_0_3/pt=2025-12-17/f2.parquet",
+                                partitionDir2 + "/f2.parquet"),
+                        Collections.singletonMap(
+                                "pt=2025-12-17", 
Collections.singletonList("2025-12-17")));
+
+        // When
+        committer.commit(Collections.singletonList(cp1Empty));
+        committer.commit(Collections.singletonList(cp2));
+        committer.commit(Collections.singletonList(cp3));
+
+        // Then
+        Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir1);
+        Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir2);
+    }
+
+    @Test
+    void 
e2eLikeCommitShouldAccumulateFilesAcrossCheckpointsWhenOverwriteEnabled(
+            @TempDir Path tempDir) throws Exception {
+        // Given
+        ReadonlyConfig readonlyConfig = minimalHiveReadonlyConfig(true);
+        TestableCommitter committer =
+                new TestableCommitter(readonlyConfig, "db", "tbl", new 
HadoopConf("hdfs://dummy"));
+
+        HiveMetaStoreCatalog hiveMetaStore = 
Mockito.mock(HiveMetaStoreCatalog.class);
+        Mockito.doNothing()
+                .when(hiveMetaStore)
+                .addPartitions(Mockito.anyString(), Mockito.anyString(), 
Mockito.anyList());
+        setHiveMetaStore(committer, hiveMetaStore);
+
+        // Build a mock FS proxy that actually moves/deletes on local FS.
+        HadoopFileSystemProxy fs = Mockito.mock(HadoopFileSystemProxy.class);
+        Mockito.doAnswer(
+                        invocation -> {
+                            String oldPath = invocation.getArgument(0);
+                            String newPath = invocation.getArgument(1);
+                            boolean removeWhenExists = 
invocation.getArgument(2);
+
+                            Path oldP = Paths.get(oldPath);
+                            Path newP = Paths.get(newPath);
+
+                            if (!Files.exists(oldP)) {
+                                return null;
+                            }
+
+                            if (removeWhenExists && Files.exists(newP)) {
+                                Files.delete(newP);
+                            }
+                            if (newP.getParent() != null) {
+                                Files.createDirectories(newP.getParent());
+                            }
+                            Files.move(oldP, newP, 
StandardCopyOption.REPLACE_EXISTING);
+                            return null;
+                        })
+                .when(fs)
+                .renameFile(Mockito.anyString(), Mockito.anyString(), 
Mockito.anyBoolean());
+
+        Mockito.doAnswer(
+                        invocation -> {
+                            String pathStr = invocation.getArgument(0);
+                            Path p = Paths.get(pathStr);
+                            if (!Files.exists(p)) {
+                                return null;
+                            }
+                            // delete recursively
+                            try (Stream<Path> walk = Files.walk(p)) {
+                                walk.sorted((a, b) -> b.getNameCount() - 
a.getNameCount())
+                                        .forEach(
+                                                x -> {
+                                                    try {
+                                                        
Files.deleteIfExists(x);
+                                                    } catch (Exception e) {
+                                                        throw new 
RuntimeException(e);
+                                                    }
+                                                });
+                            }
+                            return null;
+                        })
+                .when(fs)
+                .deleteFile(Mockito.anyString());
+
+        committer.setFileSystemProxy(fs);
+
+        Path targetPartitionDir = 
tempDir.resolve("warehouse/db/tbl/pt=2025-12-16");
+        String partitionDir = targetPartitionDir.toString();
+
+        // checkpoint 1: empty transaction
+        FileAggregatedCommitInfo cp1Empty =
+                aggregatedCommitInfo(
+                        tempDir.resolve("txn/T_job_0_1").toString(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+
+        // checkpoint 2: create a temp file to be moved
+        Path txn2 = tempDir.resolve("txn/T_job_0_2");
+        Path tmpFile1 = txn2.resolve("pt=2025-12-16/f1.parquet");
+        Files.createDirectories(tmpFile1.getParent());
+        Files.write(tmpFile1, "file1".getBytes(StandardCharsets.UTF_8));
+
+        FileAggregatedCommitInfo cp2 =
+                aggregatedCommitInfo(
+                        txn2.toString(),
+                        Collections.singletonMap(
+                                tmpFile1.toString(),
+                                
targetPartitionDir.resolve("f1.parquet").toString()),
+                        Collections.singletonMap(
+                                "pt=2025-12-16", 
Collections.singletonList("2025-12-16")));
+
+        // checkpoint 3: another temp file
+        Path txn3 = tempDir.resolve("txn/T_job_0_3");
+        Path tmpFile2 = txn3.resolve("pt=2025-12-16/f2.parquet");
+        Files.createDirectories(tmpFile2.getParent());
+        Files.write(tmpFile2, "file2".getBytes(StandardCharsets.UTF_8));
+
+        FileAggregatedCommitInfo cp3 =
+                aggregatedCommitInfo(
+                        txn3.toString(),
+                        Collections.singletonMap(
+                                tmpFile2.toString(),
+                                
targetPartitionDir.resolve("f2.parquet").toString()),
+                        Collections.singletonMap(
+                                "pt=2025-12-16", 
Collections.singletonList("2025-12-16")));
+
+        // When
+        committer.commit(Collections.singletonList(cp1Empty));
+        committer.commit(Collections.singletonList(cp2));
+        committer.commit(Collections.singletonList(cp3));
+
+        // Then
+        Assertions.assertTrue(Files.isDirectory(targetPartitionDir));
+        
Assertions.assertTrue(Files.exists(targetPartitionDir.resolve("f1.parquet")));
+        
Assertions.assertTrue(Files.exists(targetPartitionDir.resolve("f2.parquet")));
+
+        long fileCount;
+        try (Stream<Path> stream = Files.list(targetPartitionDir)) {
+            fileCount = stream.count();
+        }
+        Assertions.assertEquals(2, fileCount);
+
+        // sanity: partition deletion should only happen once
+        Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir);
+    }
+
+    private static FileAggregatedCommitInfo aggregatedCommitInfo(
+            String transactionDir,
+            Map<String, String> fileMoves,
+            Map<String, List<String>> partitions) {
+        LinkedHashMap<String, LinkedHashMap<String, String>> transactionMap = 
new LinkedHashMap<>();
+        LinkedHashMap<String, String> moveMap = new LinkedHashMap<>();
+        moveMap.putAll(fileMoves);
+        transactionMap.put(transactionDir, moveMap);
+
+        LinkedHashMap<String, List<String>> partitionMap = new 
LinkedHashMap<>();
+        partitionMap.putAll(partitions);
+
+        return new FileAggregatedCommitInfo(transactionMap, partitionMap);
+    }
+
+    private static ReadonlyConfig minimalHiveReadonlyConfig(boolean overwrite) 
{
+        LinkedHashMap<String, Object> map = new LinkedHashMap<>();
+        // Required by HiveMetaStoreCatalog ctor
+        map.put(HiveOptions.METASTORE_URI.key(), "thrift://dummy:9083");
+        map.put(HiveConfig.HADOOP_CONF_PATH.key(), "/tmp");
+        map.put(HiveConfig.HIVE_SITE_PATH.key(), "/tmp/hive-site.xml");
+
+        // Used by HiveSinkAggregatedCommitter
+        map.put(HiveSinkOptions.OVERWRITE.key(), overwrite);
+        // other options are defaulted
+
+        return ReadonlyConfig.fromMap(map);
+    }
+
+    private static void setHiveMetaStore(
+            HiveSinkAggregatedCommitter committer, HiveMetaStoreCatalog 
hiveMetaStore)
+            throws Exception {
+        Field f = 
HiveSinkAggregatedCommitter.class.getDeclaredField("hiveMetaStore");
+        f.setAccessible(true);
+        f.set(committer, hiveMetaStore);
+    }
+}

Reply via email to