This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9892c2ad19 [core] format table: refactor overwrite to both Flink and 
Spark (#6497)
9892c2ad19 is described below

commit 9892c2ad19838e1c68114531cc1370c6e29e01d3
Author: jerry <[email protected]>
AuthorDate: Sat Nov 1 12:53:49 2025 +0800

    [core] format table: refactor overwrite to both Flink and Spark (#6497)
---
 .../table/format/FormatBatchWriteBuilder.java      |  49 +++++-
 .../paimon/table/format/FormatTableCommit.java     | 195 +++++++++++++++++++++
 .../paimon/table/format/FormatTableWrite.java      |  28 ---
 .../org/apache/paimon/catalog/CatalogTestBase.java | 124 ++++++++++++-
 .../flink/sink/FlinkFormatTableDataStreamSink.java |  45 +++--
 .../paimon/flink/sink/FlinkFormatTableSink.java    |  15 +-
 .../paimon/flink/sink/FlinkTableSinkBase.java      |   4 +-
 .../paimon/flink/source/FormatTableITCase.java     |  87 +++++++++
 .../paimon/spark/format/PaimonFormatTable.scala    | 127 ++++++--------
 9 files changed, 551 insertions(+), 123 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
index abb10f0535..78fef13ab6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -38,6 +39,8 @@ public class FormatBatchWriteBuilder implements 
BatchWriteBuilder {
 
     private final FormatTable table;
     protected final CoreOptions options;
+    private Map<String, String> staticPartition;
+    private boolean overwrite = false;
 
     public FormatBatchWriteBuilder(FormatTable table) {
         this.table = table;
@@ -71,11 +74,53 @@ public class FormatBatchWriteBuilder implements 
BatchWriteBuilder {
 
     @Override
     public BatchTableCommit newCommit() {
-        throw new UnsupportedOperationException("FormatTable does not support 
commit");
+        boolean formatTablePartitionOnlyValueInPath =
+                (new 
CoreOptions(table.options())).formatTablePartitionOnlyValueInPath();
+        return new FormatTableCommit(
+                table.location(),
+                table.partitionKeys(),
+                table.fileIO(),
+                formatTablePartitionOnlyValueInPath,
+                overwrite,
+                staticPartition);
     }
 
     @Override
     public BatchWriteBuilder withOverwrite(@Nullable Map<String, String> 
staticPartition) {
-        throw new UnsupportedOperationException("FormatTable does not support 
commit");
+        this.overwrite = true;
+        validateStaticPartition(staticPartition, table.partitionKeys());
+        this.staticPartition = staticPartition;
+        return this;
+    }
+
+    protected static void validateStaticPartition(
+            Map<String, String> staticPartition, List<String> partitionKeys) {
+        if (staticPartition != null && !staticPartition.isEmpty()) {
+            if (partitionKeys == null || partitionKeys.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Format table is not partitioned, static partition 
values are not allowed.");
+            }
+
+            boolean missingLeadingKey = false;
+            for (String partitionKey : partitionKeys) {
+                boolean contains = staticPartition.containsKey(partitionKey);
+                if (missingLeadingKey && contains) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Static partition column '%s' cannot be 
specified without its leading partition.",
+                                    partitionKey));
+                }
+                if (!contains) {
+                    missingLeadingKey = true;
+                }
+            }
+
+            for (String key : staticPartition.keySet()) {
+                if (!partitionKeys.contains(key)) {
+                    throw new IllegalArgumentException(
+                            String.format("Unknown static partition column 
'%s'.", key));
+                }
+            }
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
new file mode 100644
index 0000000000..54c8cac541
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommit;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringJoiner;
+
+import static 
org.apache.paimon.table.format.FormatBatchWriteBuilder.validateStaticPartition;
+
+/** Commit for Format Table. */
+public class FormatTableCommit implements BatchTableCommit {
+
+    private String location;
+    private final boolean formatTablePartitionOnlyValueInPath;
+    private FileIO fileIO;
+    private List<String> partitionKeys;
+    protected Map<String, String> staticPartitions;
+    protected boolean overwrite = false;
+
+    public FormatTableCommit(
+            String location,
+            List<String> partitionKeys,
+            FileIO fileIO,
+            boolean formatTablePartitionOnlyValueInPath,
+            boolean overwrite,
+            @Nullable Map<String, String> staticPartitions) {
+        this.location = location;
+        this.fileIO = fileIO;
+        this.formatTablePartitionOnlyValueInPath = 
formatTablePartitionOnlyValueInPath;
+        validateStaticPartition(staticPartitions, partitionKeys);
+        this.staticPartitions = staticPartitions;
+        this.overwrite = overwrite;
+        this.partitionKeys = partitionKeys;
+    }
+
+    @Override
+    public void commit(List<CommitMessage> commitMessages) {
+        try {
+            List<TwoPhaseOutputStream.Committer> committers = new 
ArrayList<>();
+            for (CommitMessage commitMessage : commitMessages) {
+                if (commitMessage instanceof TwoPhaseCommitMessage) {
+                    committers.add(((TwoPhaseCommitMessage) 
commitMessage).getCommitter());
+                } else {
+                    throw new RuntimeException(
+                            "Unsupported commit message type: "
+                                    + commitMessage.getClass().getName());
+                }
+            }
+            if (overwrite && staticPartitions != null && 
!staticPartitions.isEmpty()) {
+                Path partitionPath =
+                        buildPartitionPath(
+                                location,
+                                staticPartitions,
+                                formatTablePartitionOnlyValueInPath,
+                                partitionKeys);
+                deletePreviousDataFile(partitionPath);
+            } else if (overwrite) {
+                Set<Path> partitionPaths = new HashSet<>();
+                for (TwoPhaseOutputStream.Committer c : committers) {
+                    partitionPaths.add(c.targetFilePath().getParent());
+                }
+                for (Path p : partitionPaths) {
+                    deletePreviousDataFile(p);
+                }
+            }
+            for (TwoPhaseOutputStream.Committer committer : committers) {
+                committer.commit(this.fileIO);
+            }
+        } catch (Exception e) {
+            this.abort(commitMessages);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Path buildPartitionPath(
+            String location,
+            Map<String, String> partitionSpec,
+            boolean formatTablePartitionOnlyValueInPath,
+            List<String> partitionKeys) {
+        if (partitionSpec.isEmpty() || partitionKeys.isEmpty()) {
+            throw new IllegalArgumentException("partitionSpec or partitionKeys 
is empty.");
+        }
+        StringJoiner joiner = new StringJoiner("/");
+        for (int i = 0; i < partitionSpec.size(); i++) {
+            String key = partitionKeys.get(i);
+            if (partitionSpec.containsKey(key)) {
+                if (formatTablePartitionOnlyValueInPath) {
+                    joiner.add(partitionSpec.get(key));
+                } else {
+                    joiner.add(key + "=" + partitionSpec.get(key));
+                }
+            } else {
+                throw new RuntimeException("partitionSpec does not contain 
key: " + key);
+            }
+        }
+        return new Path(location, joiner.toString());
+    }
+
+    @Override
+    public void abort(List<CommitMessage> commitMessages) {
+        try {
+            for (CommitMessage commitMessage : commitMessages) {
+                if (commitMessage instanceof TwoPhaseCommitMessage) {
+                    TwoPhaseCommitMessage twoPhaseCommitMessage =
+                            (TwoPhaseCommitMessage) commitMessage;
+                    twoPhaseCommitMessage.getCommitter().discard(this.fileIO);
+                } else {
+                    throw new RuntimeException(
+                            "Unsupported commit message type: "
+                                    + commitMessage.getClass().getName());
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {}
+
+    private void deletePreviousDataFile(Path partitionPath) throws IOException 
{
+        if (fileIO.exists(partitionPath)) {
+            FileStatus[] files = fileIO.listFiles(partitionPath, true);
+            for (FileStatus file : files) {
+                if (FormatTableScan.isDataFileName(file.getPath().getName())) {
+                    try {
+                        fileIO.delete(file.getPath(), false);
+                    } catch (FileNotFoundException ignore) {
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void truncateTable() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void truncatePartitions(List<Map<String, String>> partitionSpecs) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void updateStatistics(Statistics statistics) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void compactManifests() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TableCommit withMetricRegistry(MetricRegistry registry) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
index 08a940ba15..27b47b129c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.metrics.MetricRegistry;
@@ -43,7 +42,6 @@ import java.util.stream.Collectors;
 /** {@link TableWrite} implementation for format table. */
 public class FormatTableWrite implements BatchTableWrite {
 
-    private FileIO fileIO;
     private RowType rowType;
     private final FormatTableFileWriter write;
     private final FormatTableRowPartitionKeyExtractor partitionKeyExtractor;
@@ -57,7 +55,6 @@ public class FormatTableWrite implements BatchTableWrite {
             CoreOptions options,
             RowType partitionType,
             List<String> partitionKeys) {
-        this.fileIO = fileIO;
         this.rowType = rowType;
         this.write = new FormatTableFileWriter(fileIO, rowType, options, 
partitionType);
         this.partitionKeyExtractor =
@@ -102,31 +99,6 @@ public class FormatTableWrite implements BatchTableWrite {
         return write.prepareCommit();
     }
 
-    public void commit(List<CommitMessage> commitMessages) throws Exception {
-        applyCommitterAction(commitMessages, 
TwoPhaseOutputStream.Committer::commit);
-    }
-
-    public void discard(List<CommitMessage> commitMessages) throws Exception {
-        applyCommitterAction(commitMessages, 
TwoPhaseOutputStream.Committer::discard);
-    }
-
-    private interface CommitterAction {
-        void apply(TwoPhaseOutputStream.Committer committer, FileIO fileIO) 
throws Exception;
-    }
-
-    private void applyCommitterAction(List<CommitMessage> commitMessages, 
CommitterAction action)
-            throws Exception {
-        for (CommitMessage commitMessage : commitMessages) {
-            if (commitMessage instanceof TwoPhaseCommitMessage) {
-                TwoPhaseCommitMessage twoPhaseCommitMessage = 
(TwoPhaseCommitMessage) commitMessage;
-                action.apply(twoPhaseCommitMessage.getCommitter(), 
this.fileIO);
-            } else {
-                throw new RuntimeException(
-                        "Unsupported commit message type: " + 
commitMessage.getClass().getName());
-            }
-        }
-    }
-
     @Override
     public void close() throws Exception {
         write.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index ea3e2ebd86..4ddd6ab05f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -41,7 +41,6 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.format.FormatTableWrite;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -710,9 +709,126 @@ public abstract class CatalogTestBase {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testFormatTableOverwrite(boolean partitionPathOnlyValue) 
throws Exception {
+        if (!supportsFormatTable()) {
+            return;
+        }
+        String dbName = "format_overwrite_db";
+        catalog.createDatabase(dbName, true);
+
+        Identifier id = Identifier.create(dbName, "format_overwrite_table");
+        Schema nonPartitionedSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .options(getFormatTableOptions())
+                        .option("file.format", "csv")
+                        .option("file.compression", 
HadoopCompressionType.GZIP.value())
+                        .option(
+                                "format-table.partition-path-only-value",
+                                "" + partitionPathOnlyValue)
+                        .build();
+        catalog.createTable(id, nonPartitionedSchema, true);
+        FormatTable nonPartitionedTable = (FormatTable) catalog.getTable(id);
+        BatchWriteBuilder nonPartitionedTableWriteBuilder =
+                nonPartitionedTable.newBatchWriteBuilder();
+        try (BatchTableWrite write = 
nonPartitionedTableWriteBuilder.newWrite();
+                BatchTableCommit commit = 
nonPartitionedTableWriteBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 10));
+            write.write(GenericRow.of(2, 20));
+            commit.commit(write.prepareCommit());
+        }
+
+        try (BatchTableWrite write = 
nonPartitionedTableWriteBuilder.newWrite();
+                BatchTableCommit commit =
+                        
nonPartitionedTableWriteBuilder.withOverwrite().newCommit()) {
+            write.write(GenericRow.of(3, 30));
+            commit.commit(write.prepareCommit());
+        }
+
+        List<InternalRow> fullOverwriteRows = read(nonPartitionedTable, null, 
null, null, null);
+        
assertThat(fullOverwriteRows).containsExactlyInAnyOrder(GenericRow.of(3, 30));
+        catalog.dropTable(id, true);
+
+        Identifier pid = Identifier.create(dbName, 
"format_overwrite_partitioned");
+        Schema partitionedSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("year", DataTypes.INT())
+                        .column("month", DataTypes.INT())
+                        .partitionKeys("year", "month")
+                        .options(getFormatTableOptions())
+                        .option("file.format", "csv")
+                        .option("file.compression", 
HadoopCompressionType.GZIP.value())
+                        .option(
+                                "format-table.partition-path-only-value",
+                                "" + partitionPathOnlyValue)
+                        .build();
+        catalog.createTable(pid, partitionedSchema, true);
+        FormatTable partitionedTable = (FormatTable) catalog.getTable(pid);
+        BatchWriteBuilder partitionedTableWriteBuilder = 
partitionedTable.newBatchWriteBuilder();
+        try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite();
+                BatchTableCommit commit = 
partitionedTableWriteBuilder.newCommit()) {
+            write.write(GenericRow.of(1, 100, 2024, 10));
+            write.write(GenericRow.of(2, 200, 2025, 10));
+            write.write(GenericRow.of(3, 300, 2025, 11));
+            commit.commit(write.prepareCommit());
+        }
+
+        Map<String, String> staticPartition = new HashMap<>();
+        staticPartition.put("year", "2024");
+        staticPartition.put("month", "10");
+        try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite();
+                BatchTableCommit commit =
+                        
partitionedTableWriteBuilder.withOverwrite(staticPartition).newCommit()) {
+            write.write(GenericRow.of(10, 1000, 2024, 10));
+            commit.commit(write.prepareCommit());
+        }
+
+        List<InternalRow> partitionOverwriteRows = read(partitionedTable, 
null, null, null, null);
+        assertThat(partitionOverwriteRows)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(10, 1000, 2024, 10),
+                        GenericRow.of(2, 200, 2025, 10),
+                        GenericRow.of(3, 300, 2025, 11));
+
+        staticPartition = new HashMap<>();
+        staticPartition.put("year", "2025");
+        try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite();
+                BatchTableCommit commit =
+                        
partitionedTableWriteBuilder.withOverwrite(staticPartition).newCommit()) {
+            write.write(GenericRow.of(10, 1000, 2025, 10));
+            commit.commit(write.prepareCommit());
+        }
+
+        partitionOverwriteRows = read(partitionedTable, null, null, null, 
null);
+        assertThat(partitionOverwriteRows)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(10, 1000, 2024, 10), GenericRow.of(10, 
1000, 2025, 10));
+
+        try (BatchTableWrite write = partitionedTableWriteBuilder.newWrite()) {
+            write.write(GenericRow.of(10, 1000, 2025, 10));
+            assertThrows(
+                    RuntimeException.class,
+                    () -> {
+                        Map<String, String> staticOverwritePartition = new 
HashMap<>();
+                        staticOverwritePartition.put("month", "10");
+                        partitionedTableWriteBuilder
+                                .withOverwrite(staticOverwritePartition)
+                                .newCommit();
+                    });
+        }
+        catalog.dropTable(pid, true);
+    }
+
     private void writeAndCheckCommitFormatTable(
-            Table table, InternalRow[] datas, InternalRow 
dataWithDiffPartition) throws Exception {
-        try (FormatTableWrite write = (FormatTableWrite) 
table.newBatchWriteBuilder().newWrite()) {
+            FormatTable table, InternalRow[] datas, InternalRow 
dataWithDiffPartition)
+            throws Exception {
+        try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite();
+                BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
             for (InternalRow row : datas) {
                 write.write(row);
             }
@@ -722,7 +838,7 @@ public abstract class CatalogTestBase {
             List<CommitMessage> committers = write.prepareCommit();
             List<InternalRow> readData = read(table, null, null, null, null);
             assertThat(readData).isEmpty();
-            write.commit(committers);
+            commit.commit(committers);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
index d340af1d65..3d133a9cea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.format.FormatTableWrite;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 
@@ -34,26 +35,37 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.data.RowData;
 
 import java.util.List;
+import java.util.Map;
 
 /** DataStream sink for format tables. */
 public class FlinkFormatTableDataStreamSink {
 
     private final FormatTable table;
+    private final boolean overwrite;
+    private final Map<String, String> staticPartitions;
 
-    public FlinkFormatTableDataStreamSink(FormatTable table) {
+    public FlinkFormatTableDataStreamSink(
+            FormatTable table, boolean overwrite, Map<String, String> 
staticPartitions) {
         this.table = table;
+        this.overwrite = overwrite;
+        this.staticPartitions = staticPartitions;
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<RowData> dataStream) {
-        return dataStream.sinkTo(new FormatTableSink(table));
+        return dataStream.sinkTo(new FormatTableSink(table, overwrite, 
staticPartitions));
     }
 
     private static class FormatTableSink implements Sink<RowData> {
 
         private final FormatTable table;
+        private final boolean overwrite;
+        private final Map<String, String> staticPartitions;
 
-        public FormatTableSink(FormatTable table) {
+        public FormatTableSink(
+                FormatTable table, boolean overwrite, Map<String, String> 
staticPartitions) {
             this.table = table;
+            this.overwrite = overwrite;
+            this.staticPartitions = staticPartitions;
         }
 
         /**
@@ -61,7 +73,7 @@ public class FlinkFormatTableDataStreamSink {
          * 2.0+.
          */
         public SinkWriter<RowData> createWriter(InitContext context) {
-            return new FormatTableSinkWriter(table);
+            return new FormatTableSinkWriter(table, overwrite, 
staticPartitions);
         }
 
         /**
@@ -69,18 +81,25 @@ public class FlinkFormatTableDataStreamSink {
          * 1.18-.
          */
         public SinkWriter<RowData> createWriter(WriterInitContext context) {
-            return new FormatTableSinkWriter(table);
+            return new FormatTableSinkWriter(table, overwrite, 
staticPartitions);
         }
 
         /** Sink writer for format tables using Flink v2 API. */
         private static class FormatTableSinkWriter implements 
SinkWriter<RowData> {
 
-            private transient FormatTableWrite tableWrite;
             private transient BatchWriteBuilder writeBuilder;
+            private transient FormatTableWrite tableWrite;
+            private transient BatchTableCommit tableCommit;
 
-            public FormatTableSinkWriter(FormatTable table) {
+            public FormatTableSinkWriter(
+                    FormatTable table, boolean overwrite, Map<String, String> 
staticPartitions) {
                 this.writeBuilder = table.newBatchWriteBuilder();
                 this.tableWrite = (FormatTableWrite) writeBuilder.newWrite();
+                if (overwrite) {
+                    this.tableCommit = 
writeBuilder.withOverwrite(staticPartitions).newCommit();
+                } else {
+                    this.tableCommit = writeBuilder.newCommit();
+                }
             }
 
             @Override
@@ -99,16 +118,16 @@ public class FlinkFormatTableDataStreamSink {
             @Override
             public void close() throws Exception {
                 if (tableWrite != null) {
-                    List<CommitMessage> committers = null;
+                    List<CommitMessage> commitMessages = null;
                     try {
                         // Prepare commit and commit the data
-                        committers = tableWrite.prepareCommit();
-                        if (!committers.isEmpty()) {
-                            tableWrite.commit(committers);
+                        commitMessages = tableWrite.prepareCommit();
+                        if (!commitMessages.isEmpty()) {
+                            tableCommit.commit(commitMessages);
                         }
                     } catch (Exception e) {
-                        if (committers != null && !committers.isEmpty()) {
-                            tableWrite.discard(committers);
+                        if (commitMessages != null && 
!commitMessages.isEmpty()) {
+                            tableCommit.abort(commitMessages);
                         }
                         throw new RuntimeException(e);
                     } finally {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
index 83c2f9072f..361323f016 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
@@ -24,6 +24,7 @@ import org.apache.paimon.table.FormatTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.factories.DynamicTableFactory;
 
@@ -31,12 +32,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 /** Table sink for format tables. */
-public class FlinkFormatTableSink implements DynamicTableSink, 
SupportsPartitioning {
+public class FlinkFormatTableSink
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
 
     private final ObjectIdentifier tableIdentifier;
     private final FormatTable table;
     private final DynamicTableFactory.Context context;
     private Map<String, String> staticPartitions = new HashMap<>();
+    protected boolean overwrite = false;
 
     public FlinkFormatTableSink(
             ObjectIdentifier tableIdentifier,
@@ -55,13 +58,16 @@ public class FlinkFormatTableSink implements 
DynamicTableSink, SupportsPartition
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         return new PaimonDataStreamSinkProvider(
-                (dataStream) -> new 
FlinkFormatTableDataStreamSink(table).sinkFrom(dataStream));
+                (dataStream) ->
+                        new FlinkFormatTableDataStreamSink(table, overwrite, 
staticPartitions)
+                                .sinkFrom(dataStream));
     }
 
     @Override
     public DynamicTableSink copy() {
         FlinkFormatTableSink copied = new 
FlinkFormatTableSink(tableIdentifier, table, context);
         copied.staticPartitions = new HashMap<>(staticPartitions);
+        copied.overwrite = overwrite;
         return copied;
     }
 
@@ -81,4 +87,9 @@ public class FlinkFormatTableSink implements 
DynamicTableSink, SupportsPartition
                             }
                         });
     }
+
+    @Override
+    public void applyOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index d7847f1889..79e449407a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -123,7 +123,9 @@ public abstract class FlinkTableSinkBase
             FormatTable formatTable = (FormatTable) table;
             return new PaimonDataStreamSinkProvider(
                     (dataStream) ->
-                            new 
FlinkFormatTableDataStreamSink(formatTable).sinkFrom(dataStream));
+                            new FlinkFormatTableDataStreamSink(
+                                            formatTable, overwrite, 
staticPartitions)
+                                    .sinkFrom(dataStream));
         }
         LogSinkProvider logSinkProvider = null;
         if (logStoreTableFactory != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
index 8a086116bd..2e14093baf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java
@@ -58,4 +58,91 @@ public class FormatTableITCase extends RESTCatalogITCaseBase 
{
                         Row.of(new BigDecimal(bigDecimalStr), 2));
         sql("Drop TABLE %s", tableName);
     }
+
+    @Test
+    public void testPartitionedTableInsertOverwrite() {
+
+        String ptTableName = "format_table_overwrite";
+        Identifier ptIdentifier = Identifier.create("default", ptTableName);
+        sql(
+                "CREATE TABLE %s (a DECIMAL(8, 3), b INT, c INT) PARTITIONED 
BY (c) WITH ('file.format'='parquet', 'type'='format-table')",
+                ptTableName);
+        RESTToken expiredDataToken =
+                new RESTToken(
+                        ImmutableMap.of(
+                                "akId", "akId-expire", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() + 1000_000);
+        restCatalogServer.setDataToken(ptIdentifier, expiredDataToken);
+
+        String ptBigDecimalStr1 = "10.001";
+        String ptBigDecimalStr2 = "12.345";
+        Decimal ptDecimal1 = Decimal.fromBigDecimal(new 
BigDecimal(ptBigDecimalStr1), 8, 3);
+        Decimal ptDecimal2 = Decimal.fromBigDecimal(new 
BigDecimal(ptBigDecimalStr2), 8, 3);
+
+        sql(
+                "INSERT INTO %s PARTITION (c = 1) VALUES (%s, 10), (%s, 20)",
+                ptTableName, ptDecimal1, ptDecimal1);
+        sql("INSERT INTO %s PARTITION (c = 2) VALUES (%s, 30)", ptTableName, 
ptDecimal1);
+
+        assertThat(sql("SELECT a, b, c FROM %s", ptTableName))
+                .containsExactlyInAnyOrder(
+                        Row.of(new BigDecimal(ptBigDecimalStr1), 10, 1),
+                        Row.of(new BigDecimal(ptBigDecimalStr1), 20, 1),
+                        Row.of(new BigDecimal(ptBigDecimalStr1), 30, 2));
+
+        sql(
+                "INSERT OVERWRITE %s PARTITION (c = 1) VALUES (%s, 100), (%s, 
200)",
+                ptTableName, ptDecimal2, ptDecimal2);
+
+        assertThat(sql("SELECT a, b, c FROM %s", ptTableName))
+                .containsExactlyInAnyOrder(
+                        Row.of(new BigDecimal(ptBigDecimalStr2), 100, 1),
+                        Row.of(new BigDecimal(ptBigDecimalStr2), 200, 1),
+                        Row.of(new BigDecimal(ptBigDecimalStr1), 30, 2));
+
+        sql(
+                "INSERT OVERWRITE %s VALUES (%s, 100, 1), (%s, 200, 2)",
+                ptTableName, ptDecimal1, ptDecimal2);
+
+        assertThat(sql("SELECT a, b, c FROM %s", ptTableName))
+                .containsExactlyInAnyOrder(
+                        Row.of(new BigDecimal(ptBigDecimalStr1), 100, 1),
+                        Row.of(new BigDecimal(ptBigDecimalStr2), 200, 2));
+
+        sql("Drop TABLE %s", ptTableName);
+    }
+
+    @Test
+    public void testUnPartitionedTableInsertOverwrite() {
+        String tableName = "format_table_overwrite_test";
+        String bigDecimalStr1 = "10.001";
+        String bigDecimalStr2 = "12.345";
+        Decimal decimal1 = Decimal.fromBigDecimal(new 
BigDecimal(bigDecimalStr1), 8, 3);
+        Decimal decimal2 = Decimal.fromBigDecimal(new 
BigDecimal(bigDecimalStr2), 8, 3);
+
+        Identifier identifier = Identifier.create("default", tableName);
+        sql(
+                "CREATE TABLE %s (a DECIMAL(8, 3), b INT, c INT) WITH 
('file.format'='parquet', 'type'='format-table')",
+                tableName);
+        RESTToken expiredDataToken =
+                new RESTToken(
+                        ImmutableMap.of(
+                                "akId", "akId-expire", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() + 1000_000);
+        restCatalogServer.setDataToken(identifier, expiredDataToken);
+
+        sql("INSERT INTO %s VALUES (%s, 1, 1), (%s, 2, 2)", tableName, 
decimal1, decimal1);
+        assertThat(sql("SELECT a, b FROM %s", tableName))
+                .containsExactlyInAnyOrder(
+                        Row.of(new BigDecimal(bigDecimalStr1), 1),
+                        Row.of(new BigDecimal(bigDecimalStr1), 2));
+
+        sql("INSERT OVERWRITE %s VALUES (%s, 3, 3), (%s, 4, 4)", tableName, 
decimal2, decimal2);
+        assertThat(sql("SELECT a, b FROM %s", tableName))
+                .containsExactlyInAnyOrder(
+                        Row.of(new BigDecimal(bigDecimalStr2), 3),
+                        Row.of(new BigDecimal(bigDecimalStr2), 4));
+
+        sql("Drop TABLE %s", tableName);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 421dc6206c..449346e456 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -18,13 +18,13 @@
 
 package org.apache.paimon.spark.format
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.format.csv.CsvOptions
-import org.apache.paimon.fs.TwoPhaseOutputStream
 import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder, 
SparkInternalRowWrapper}
 import org.apache.paimon.spark.write.BaseV2WriteBuilder
 import org.apache.paimon.table.FormatTable
-import org.apache.paimon.table.format.TwoPhaseCommitMessage
-import org.apache.paimon.table.sink.BatchTableWrite
+import org.apache.paimon.table.format.{FormatTableCommit, 
TwoPhaseCommitMessage}
+import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, 
CommitMessage}
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.internal.Logging
@@ -37,7 +37,6 @@ import 
org.apache.spark.sql.connector.write.streaming.StreamingWrite
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-import java.io.FileNotFoundException
 import java.util
 
 import scala.collection.JavaConverters._
@@ -57,7 +56,11 @@ case class PaimonFormatTable(table: FormatTable)
       properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
     }
     if (FormatTable.Format.CSV == table.format) {
-      properties.put("sep", properties.get(CsvOptions.FIELD_DELIMITER.key()))
+      properties.put(
+        "sep",
+        properties.getOrDefault(
+          CsvOptions.FIELD_DELIMITER.key(),
+          CsvOptions.FIELD_DELIMITER.defaultValue()))
     }
     properties
   }
@@ -101,37 +104,28 @@ private case class FormatTableBatchWrite(
     !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
     "Cannot overwrite dynamically and by filter both")
 
+  private val batchWriteBuilder = {
+    val builder = table.newBatchWriteBuilder()
+    if (overwriteDynamic) {
+      builder.withOverwrite()
+    } else {
+      overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
+    }
+    builder
+  }
+
   override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
-    FormatTableWriterFactory(table, writeSchema)
+    FormatTableWriterFactory(batchWriteBuilder, writeSchema)
 
   override def useCommitCoordinator(): Boolean = false
 
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     logInfo(s"Committing to FormatTable ${table.name()}")
-
-    val committers = messages
-      .collect {
-        case taskCommit: FormatTableTaskCommit => taskCommit.committers()
-        case other =>
-          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
-      }
-      .flatten
-      .toSeq
-
+    val batchTableCommit = batchWriteBuilder.newCommit()
+    val commitMessages = getPaimonCommitMessages(messages)
     try {
       val start = System.currentTimeMillis()
-      if (overwritePartitions.isDefined && overwritePartitions.get.nonEmpty) {
-        val child = org.apache.paimon.partition.PartitionUtils
-          .buildPartitionName(overwritePartitions.get.asJava)
-        val partitionPath = new org.apache.paimon.fs.Path(table.location(), 
child)
-        deletePreviousDataFile(partitionPath)
-      } else if (overwritePartitions.isDefined && 
overwritePartitions.get.isEmpty) {
-        committers
-          .map(c => c.targetFilePath().getParent)
-          .distinct
-          .foreach(deletePreviousDataFile)
-      }
-      committers.foreach(c => c.commit(table.fileIO()))
+      batchTableCommit.commit(commitMessages)
       logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
     } catch {
       case e: Exception =>
@@ -140,53 +134,38 @@ private case class FormatTableBatchWrite(
     }
   }
 
-  private def deletePreviousDataFile(partitionPath: 
org.apache.paimon.fs.Path): Unit = {
-    if (table.fileIO().exists(partitionPath)) {
-      val files = table.fileIO().listFiles(partitionPath, true)
-      files
-        .filter(f => !f.getPath.getName.startsWith(".") && 
!f.getPath.getName.startsWith("_"))
-        .foreach(
-          f => {
-            try {
-              table.fileIO().deleteQuietly(f.getPath)
-            } catch {
-              case _: FileNotFoundException => logInfo(s"File ${f.getPath} 
already deleted")
-              case other => throw new RuntimeException(other)
-            }
-          })
-    }
-  }
-
   override def abort(messages: Array[WriterCommitMessage]): Unit = {
     logInfo(s"Aborting write to FormatTable ${table.name()}")
-    val committers = messages.collect {
-      case taskCommit: FormatTableTaskCommit => taskCommit.committers()
-    }.flatten
-
-    committers.foreach {
-      committer =>
-        try {
-          committer.discard(table.fileIO())
-        } catch {
-          case e: Exception => logWarning(s"Failed to abort committer: 
${e.getMessage}")
-        }
-    }
+    val batchTableCommit = batchWriteBuilder.newCommit()
+    val commitMessages = getPaimonCommitMessages(messages)
+    batchTableCommit.abort(commitMessages)
+  }
+
+  private def getPaimonCommitMessages(
+      messages: Array[WriterCommitMessage]): util.List[CommitMessage] = {
+    messages
+      .collect {
+        case taskCommit: FormatTableTaskCommit => taskCommit.commitMessages()
+        case other =>
+          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
+      }
+      .flatten
+      .toList
+      .asJava
   }
 }
 
-private case class FormatTableWriterFactory(table: FormatTable, writeSchema: 
StructType)
+private case class FormatTableWriterFactory(
+    batchWriteBuilder: BatchWriteBuilder,
+    writeSchema: StructType)
   extends DataWriterFactory {
 
   override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
-    val formatTableWrite = table.newBatchWriteBuilder().newWrite()
-    new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+    new FormatTableDataWriter(batchWriteBuilder, writeSchema)
   }
 }
 
-private class FormatTableDataWriter(
-    table: FormatTable,
-    formatTableWrite: BatchTableWrite,
-    writeSchema: StructType)
+private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder, 
writeSchema: StructType)
   extends DataWriter[InternalRow]
   with Logging {
 
@@ -197,24 +176,26 @@ private class FormatTableDataWriter(
     }
   }
 
+  private val write: BatchTableWrite = batchWriteBuilder.newWrite()
+
   override def write(record: InternalRow): Unit = {
     val paimonRow = rowConverter.apply(record)
-    formatTableWrite.write(paimonRow)
+    write.write(paimonRow)
   }
 
   override def commit(): WriterCommitMessage = {
     try {
-      val committers = formatTableWrite
+      val commitMessages = write
         .prepareCommit()
         .asScala
         .map {
-          case committer: TwoPhaseCommitMessage => committer.getCommitter
+          case commitMessage: TwoPhaseCommitMessage => commitMessage
           case other =>
             throw new IllegalArgumentException(
               "Unsupported commit message type: " + 
other.getClass.getSimpleName)
         }
         .toSeq
-      FormatTableTaskCommit(committers)
+      FormatTableTaskCommit(commitMessages)
     } finally {
       close()
     }
@@ -227,7 +208,7 @@ private class FormatTableDataWriter(
 
   override def close(): Unit = {
     try {
-      formatTableWrite.close()
+      write.close()
     } catch {
       case e: Exception =>
         logError("Error closing FormatTableDataWriter", e)
@@ -237,14 +218,14 @@ private class FormatTableDataWriter(
 }
 
 /** Commit message container for FormatTable writes, holding committers that 
need to be executed. */
-class FormatTableTaskCommit private (private val _committers: 
Seq[TwoPhaseOutputStream.Committer])
+class FormatTableTaskCommit private (private val _commitMessages: 
Seq[CommitMessage])
   extends WriterCommitMessage {
 
-  def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
+  def commitMessages(): Seq[CommitMessage] = _commitMessages
 }
 
 object FormatTableTaskCommit {
-  def apply(committers: Seq[TwoPhaseOutputStream.Committer]): 
FormatTableTaskCommit = {
-    new FormatTableTaskCommit(committers)
+  def apply(commitMessages: Seq[CommitMessage]): FormatTableTaskCommit = {
+    new FormatTableTaskCommit(commitMessages)
   }
 }


Reply via email to