This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 736f936a [FLINK-29297] Group Table Store file writers into 
SingleFileWriter and RollingFileWriter
736f936a is described below

commit 736f936aa1c8b1ad4f3acae8d52821ed6597f751
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Wed Sep 21 10:25:22 2022 +0800

    [FLINK-29297] Group Table Store file writers into SingleFileWriter and 
RollingFileWriter
    
    This closes #295
---
 .../source/FileStoreSourceSplitReaderTest.java     |   2 +-
 .../source/TestChangelogDataReadWrite.java         |   2 +-
 .../table/store/file/data/AppendOnlyWriter.java    | 135 ++--------------
 .../table/store/file/data/DataFileReader.java      |  56 +------
 .../table/store/file/data/DataFileWriter.java      | 179 ++++-----------------
 .../store/file/{writer => io}/FileWriter.java      |  54 ++++---
 .../KeyValueDataFileRecordReader.java}             |  33 ++--
 .../store/file/io/KeyValueDataFileWriter.java      | 146 +++++++++++++++++
 .../file/{writer => io}/RollingFileWriter.java     |  87 ++++++----
 .../RowDataFileRecordReader.java}                  |  14 +-
 .../table/store/file/io/RowDataFileWriter.java     |  79 +++++++++
 .../store/file/io/RowDataRollingFileWriter.java    |  50 ++++++
 .../table/store/file/io/SingleFileWriter.java      | 152 +++++++++++++++++
 .../file/io/StatsCollectingSingleFileWriter.java   |  76 +++++++++
 .../table/store/file/manifest/ManifestFile.java    |  52 ++----
 .../store/file/mergetree/MergeTreeWriter.java      |   5 +-
 .../file/operation/AppendOnlyFileStoreRead.java    |   4 +-
 .../file/operation/AppendOnlyFileStoreWrite.java   |  11 +-
 .../table/store/file/operation/FileStoreWrite.java |   2 +-
 .../file/operation/KeyValueFileStoreWrite.java     |   2 +-
 .../store/file/{writer => utils}/RecordWriter.java |  13 +-
 .../table/store/file/writer/BaseFileWriter.java    | 118 --------------
 .../flink/table/store/file/writer/Metric.java      |  48 ------
 .../table/store/file/writer/MetricFileWriter.java  | 179 ---------------------
 .../store/table/AppendOnlyFileStoreTable.java      |   2 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   2 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   2 +-
 .../table/store/table/sink/AbstractTableWrite.java |   2 +-
 .../table/store/table/sink/MemoryTableWrite.java   |   2 +-
 .../flink/table/store/file/TestFileStore.java      |   2 +-
 .../store/file/data/AppendOnlyWriterTest.java      |  12 +-
 .../flink/table/store/file/data/DataFileTest.java  |  11 +-
 .../store/file/format/FileFormatSuffixTest.java    |   7 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  14 +-
 34 files changed, 723 insertions(+), 832 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 25211fe6..049b6c85 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 8c7a504b..4145bf54 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -36,8 +36,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.TableRead;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 1476a287..355467e3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -20,33 +20,20 @@
 package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.io.RowDataRollingFileWriter;
 import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.stats.BinaryTableStats;
-import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.writer.BaseFileWriter;
-import org.apache.flink.table.store.file.writer.FileWriter;
-import org.apache.flink.table.store.file.writer.Metric;
-import org.apache.flink.table.store.file.writer.MetricFileWriter;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
-import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-import java.util.function.Supplier;
 
-import static 
org.apache.flink.table.store.file.data.AppendOnlyWriter.RowRollingWriter.createRollingRowWriter;
 import static 
org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
 
 /**
@@ -66,7 +53,7 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
 
-    private RowRollingWriter writer;
+    private RowDataRollingFileWriter writer;
 
     public AppendOnlyWriter(
             long schemaId,
@@ -87,14 +74,7 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
         this.compactBefore = new ArrayList<>();
         this.compactAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
-        this.writer =
-                createRollingRowWriter(
-                        schemaId,
-                        fileFormat,
-                        targetFileSize,
-                        writeSchema,
-                        pathFactory,
-                        seqNumCounter);
+        this.writer = createRollingRowWriter();
     }
 
     @Override
@@ -116,14 +96,7 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
             // Reopen the writer to accept further records.
             seqNumCounter.resetLocal();
             seqNumCounter.add(getMaxSequenceNumber(newFiles) + 1);
-            writer =
-                    createRollingRowWriter(
-                            schemaId,
-                            fileFormat,
-                            targetFileSize,
-                            writeSchema,
-                            pathFactory,
-                            seqNumCounter);
+            writer = createRollingRowWriter();
         }
         // add new generated files
         newFiles.forEach(compactManager::addNewFile);
@@ -141,21 +114,20 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
     }
 
     @Override
-    public List<DataFileMeta> close() throws Exception {
+    public void close() throws Exception {
         // cancel compaction so that it does not block job cancelling
         compactManager.cancelCompaction();
         sync();
 
-        List<DataFileMeta> result = new ArrayList<>();
         if (writer != null) {
-            // Abort this writer to clear uncommitted files.
             writer.abort();
-
-            result.addAll(writer.result());
             writer = null;
         }
+    }
 
-        return result;
+    private RowDataRollingFileWriter createRollingRowWriter() {
+        return new RowDataRollingFileWriter(
+                schemaId, fileFormat, targetFileSize, writeSchema, 
pathFactory, seqNumCounter);
     }
 
     private void submitCompaction() throws ExecutionException, 
InterruptedException {
@@ -182,91 +154,4 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
         compactAfter.clear();
         return increment;
     }
-
-    /** Rolling file writer for append-only table. */
-    public static class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
-
-        public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long 
targetFileSize) {
-            super(writerFactory, targetFileSize);
-        }
-
-        public static RowRollingWriter createRollingRowWriter(
-                long schemaId,
-                FileFormat fileFormat,
-                long targetFileSize,
-                RowType writeSchema,
-                DataFilePathFactory pathFactory,
-                LongCounter seqNumCounter) {
-            return new RowRollingWriter(
-                    () ->
-                            new RowFileWriter(
-                                    MetricFileWriter.createFactory(
-                                            
fileFormat.createWriterFactory(writeSchema),
-                                            Function.identity(),
-                                            writeSchema,
-                                            fileFormat
-                                                    
.createStatsExtractor(writeSchema)
-                                                    .orElse(null)),
-                                    pathFactory.newPath(),
-                                    writeSchema,
-                                    schemaId,
-                                    seqNumCounter),
-                    targetFileSize);
-        }
-
-        public List<DataFileMeta> write(CloseableIterator<RowData> iterator) 
throws Exception {
-            try {
-                super.write(iterator);
-                super.close();
-                return super.result();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            } finally {
-                iterator.close();
-            }
-        }
-    }
-
-    /**
-     * A {@link BaseFileWriter} impl with a counter with an initial value to 
record each row's
-     * sequence number.
-     */
-    public static class RowFileWriter extends BaseFileWriter<RowData, 
DataFileMeta> {
-
-        private final FieldStatsArraySerializer statsArraySerializer;
-        private final long schemaId;
-        private final LongCounter seqNumCounter;
-
-        public RowFileWriter(
-                FileWriter.Factory<RowData, Metric> writerFactory,
-                Path path,
-                RowType writeSchema,
-                long schemaId,
-                LongCounter seqNumCounter) {
-            super(writerFactory, path);
-            this.statsArraySerializer = new 
FieldStatsArraySerializer(writeSchema);
-            this.schemaId = schemaId;
-            this.seqNumCounter = seqNumCounter;
-        }
-
-        @Override
-        public void write(RowData row) throws IOException {
-            super.write(row);
-            seqNumCounter.add(1L);
-        }
-
-        @Override
-        protected DataFileMeta createResult(Path path, Metric metric) throws 
IOException {
-            BinaryTableStats stats = 
statsArraySerializer.toBinary(metric.fieldStats());
-
-            return DataFileMeta.forAppend(
-                    path.getName(),
-                    FileUtils.getFileSize(path),
-                    recordCount(),
-                    stats,
-                    seqNumCounter.getLocalValue() - super.recordCount(),
-                    seqNumCounter.getLocalValue() - 1,
-                    schemaId);
-        }
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index 7a0c6eef..3d578dea 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -20,16 +20,13 @@ package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.connector.file.src.util.RecordAndPosition;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.io.KeyValueDataFileRecordReader;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.utils.Projection;
@@ -74,55 +71,8 @@ public class DataFileReader {
     }
 
     public RecordReader<KeyValue> read(String fileName) throws IOException {
-        return new DataFileRecordReader(pathFactory.toPath(fileName));
-    }
-
-    private class DataFileRecordReader implements RecordReader<KeyValue> {
-
-        private final BulkFormat.Reader<RowData> reader;
-        private final KeyValueSerializer serializer;
-
-        private DataFileRecordReader(Path path) throws IOException {
-            this.reader = FileUtils.createFormatReader(readerFactory, path);
-            this.serializer = new KeyValueSerializer(keyType, valueType);
-        }
-
-        @Nullable
-        @Override
-        public RecordIterator<KeyValue> readBatch() throws IOException {
-            BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
-            return iterator == null ? null : new 
DataFileRecordIterator(iterator, serializer);
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-        }
-    }
-
-    private static class DataFileRecordIterator implements 
RecordReader.RecordIterator<KeyValue> {
-
-        private final BulkFormat.RecordIterator<RowData> iterator;
-        private final KeyValueSerializer serializer;
-
-        private DataFileRecordIterator(
-                BulkFormat.RecordIterator<RowData> iterator, 
KeyValueSerializer serializer) {
-            this.iterator = iterator;
-            this.serializer = serializer;
-        }
-
-        @Override
-        public KeyValue next() throws IOException {
-            RecordAndPosition<RowData> result = iterator.next();
-
-            // TODO schema evolution
-            return result == null ? null : 
serializer.fromRow(result.getRecord());
-        }
-
-        @Override
-        public void releaseBatch() {
-            iterator.releaseBatch();
-        }
+        return new KeyValueDataFileRecordReader(
+                readerFactory, pathFactory.toPath(fileName), keyType, 
valueType);
     }
 
     /** Creates {@link DataFileReader}. */
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
index 22ce2aa5..febc1cd7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
@@ -23,48 +23,32 @@ import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
-import org.apache.flink.table.store.file.stats.BinaryTableStats;
-import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.io.KeyValueDataFileWriter;
+import org.apache.flink.table.store.file.io.RollingFileWriter;
+import org.apache.flink.table.store.file.io.SingleFileWriter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.writer.BaseFileWriter;
-import org.apache.flink.table.store.file.writer.FileWriter;
-import org.apache.flink.table.store.file.writer.Metric;
-import org.apache.flink.table.store.file.writer.MetricFileWriter;
-import org.apache.flink.table.store.file.writer.RollingFileWriter;
-import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.CloseableIterator;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Supplier;
 
 /** Writes {@link KeyValue}s into data files. */
 public class DataFileWriter {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(DataFileWriter.class);
-
     private final long schemaId;
     private final RowType keyType;
     private final RowType valueType;
     private final BulkWriter.Factory<RowData> writerFactory;
     private final FileStatsExtractor fileStatsExtractor;
-    private final FieldStatsArraySerializer keyStatsConverter;
-    private final FieldStatsArraySerializer valueStatsConverter;
     private final DataFilePathFactory pathFactory;
     private final long suggestedFileSize;
 
@@ -81,8 +65,6 @@ public class DataFileWriter {
         this.valueType = valueType;
         this.writerFactory = writerFactory;
         this.fileStatsExtractor = fileStatsExtractor;
-        this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
-        this.valueStatsConverter = new FieldStatsArraySerializer(valueType);
 
         this.pathFactory = pathFactory;
         this.suggestedFileSize = suggestedFileSize;
@@ -108,9 +90,18 @@ public class DataFileWriter {
 
     /** Write raw {@link KeyValue} iterator into a changelog file. */
     public Path writeLevel0Changelog(CloseableIterator<KeyValue> iterator) 
throws Exception {
-        FileWriter.Factory<KeyValue, Metric> writerFactory = 
createFileWriterFactory();
         Path changelogPath = pathFactory.newChangelogPath();
-        doWrite(writerFactory.create(changelogPath), iterator);
+        KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
+        SingleFileWriter<KeyValue, Void> writer =
+                new SingleFileWriter<KeyValue, Void>(
+                        writerFactory, changelogPath, kvSerializer::toRow) {
+                    @Override
+                    public Void result() throws IOException {
+                        return null;
+                    }
+                };
+        writer.write(iterator);
+        writer.close();
         return changelogPath;
     }
 
@@ -137,21 +128,25 @@ public class DataFileWriter {
             throws Exception {
         // Don't roll file for level 0
         long suggestedFileSize = level == 0 ? Long.MAX_VALUE : 
this.suggestedFileSize;
-        return doWrite(createRollingKvWriter(level, suggestedFileSize), 
iterator);
+        RollingFileWriter<KeyValue, DataFileMeta> writer =
+                new RollingFileWriter<>(() -> createDataFileWriter(level), 
suggestedFileSize);
+        writer.write(iterator);
+        writer.close();
+        return writer.result();
     }
 
-    private <R> R doWrite(FileWriter<KeyValue, R> fileWriter, 
CloseableIterator<KeyValue> iterator)
-            throws Exception {
-        try (FileWriter<KeyValue, R> writer = fileWriter) {
-            writer.write(iterator);
-        } catch (Throwable e) {
-            LOG.warn("Exception occurs when writing data files. Cleaning up.", 
e);
-            fileWriter.abort();
-            throw e;
-        } finally {
-            iterator.close();
-        }
-        return fileWriter.result();
+    private KeyValueDataFileWriter createDataFileWriter(int level) {
+        Path path = pathFactory.newPath();
+        KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
+        return new KeyValueDataFileWriter(
+                writerFactory,
+                path,
+                kvSerializer::toRow,
+                keyType,
+                valueType,
+                fileStatsExtractor,
+                schemaId,
+                level);
     }
 
     public void delete(DataFileMeta file) {
@@ -162,118 +157,6 @@ public class DataFileWriter {
         FileUtils.deleteOrWarn(pathFactory.toPath(file));
     }
 
-    private class KvFileWriter extends BaseFileWriter<KeyValue, DataFileMeta> {
-        private final int level;
-        private final RowDataSerializer keySerializer;
-
-        private BinaryRowData minKey = null;
-        private RowData maxKey = null;
-        private long minSeqNumber = Long.MAX_VALUE;
-        private long maxSeqNumber = Long.MIN_VALUE;
-
-        public KvFileWriter(
-                FileWriter.Factory<KeyValue, Metric> writerFactory, Path path, 
int level)
-                throws IOException {
-            super(writerFactory, path);
-
-            this.level = level;
-            this.keySerializer = new RowDataSerializer(keyType);
-        }
-
-        @Override
-        public void write(KeyValue kv) throws IOException {
-            super.write(kv);
-
-            updateMinKey(kv);
-            updateMaxKey(kv);
-
-            updateMinSeqNumber(kv);
-            updateMaxSeqNumber(kv);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Write key value " + kv.toString(keyType, 
valueType));
-            }
-        }
-
-        private void updateMinKey(KeyValue kv) {
-            if (minKey == null) {
-                minKey = keySerializer.toBinaryRow(kv.key()).copy();
-            }
-        }
-
-        private void updateMaxKey(KeyValue kv) {
-            maxKey = kv.key();
-        }
-
-        private void updateMinSeqNumber(KeyValue kv) {
-            minSeqNumber = Math.min(minSeqNumber, kv.sequenceNumber());
-        }
-
-        private void updateMaxSeqNumber(KeyValue kv) {
-            maxSeqNumber = Math.max(maxSeqNumber, kv.sequenceNumber());
-        }
-
-        @Override
-        protected DataFileMeta createResult(Path path, Metric metric) throws 
IOException {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Closing data file " + path);
-            }
-
-            FieldStats[] rowStats = metric.fieldStats();
-            int numKeyFields = keyType.getFieldCount();
-
-            FieldStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, 
numKeyFields);
-            BinaryTableStats keyStats = 
keyStatsConverter.toBinary(keyFieldStats);
-
-            FieldStats[] valFieldStats =
-                    Arrays.copyOfRange(rowStats, numKeyFields + 2, 
rowStats.length);
-            BinaryTableStats valueStats = 
valueStatsConverter.toBinary(valFieldStats);
-
-            return new DataFileMeta(
-                    path.getName(),
-                    FileUtils.getFileSize(path),
-                    recordCount(),
-                    minKey,
-                    keySerializer.toBinaryRow(maxKey).copy(),
-                    keyStats,
-                    valueStats,
-                    minSeqNumber,
-                    maxSeqNumber,
-                    schemaId,
-                    level);
-        }
-    }
-
-    private static class RollingKvWriter extends RollingFileWriter<KeyValue, 
DataFileMeta> {
-
-        public RollingKvWriter(Supplier<KvFileWriter> writerFactory, long 
targetFileSize) {
-            super(writerFactory, targetFileSize);
-        }
-    }
-
-    private Supplier<KvFileWriter> createWriterFactory(int level) {
-        return () -> {
-            try {
-                return new KvFileWriter(createFileWriterFactory(), 
pathFactory.newPath(), level);
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
-        };
-    }
-
-    private FileWriter.Factory<KeyValue, Metric> createFileWriterFactory() {
-        KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
-        return MetricFileWriter.createFactory(
-                writerFactory,
-                kvSerializer::toRow,
-                KeyValue.schema(keyType, valueType),
-                fileStatsExtractor);
-    }
-
-    private RollingKvWriter createRollingKvWriter(int level, long 
targetFileSize) {
-        return new RollingKvWriter(createWriterFactory(level), targetFileSize);
-    }
-
     /** Creates {@link DataFileWriter}. */
     public static class Factory {
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/FileWriter.java
similarity index 65%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/FileWriter.java
index 87f7a188..d8feb947 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/FileWriter.java
@@ -17,13 +17,12 @@
  * under the License.
  */
 
-package org.apache.flink.table.store.file.writer;
+package org.apache.flink.table.store.file.io;
 
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.CloseableIterator;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Iterator;
 
 /**
@@ -37,6 +36,9 @@ public interface FileWriter<T, R> extends Closeable {
     /**
      * Add only one record to this file writer.
      *
+     * <p>NOTE: If any exception occurs during writing, the writer should 
clean up useless files for
+     * the user.
+     *
      * @param record to write.
      * @throws IOException if encounter any IO error.
      */
@@ -45,18 +47,43 @@ public interface FileWriter<T, R> extends Closeable {
     /**
      * Add records from {@link Iterator} to this file writer.
      *
+     * <p>NOTE: If any exception occurs during writing, the writer should 
clean up useless files for
+     * the user.
+     *
      * @param records to write
      * @throws IOException if encounter any IO error.
      */
-    default void write(Iterator<T> records) throws IOException {
+    default void write(Iterator<T> records) throws Exception {
         while (records.hasNext()) {
             write(records.next());
         }
     }
 
+    /**
+     * Add records from {@link CloseableIterator} to this file writer.
+     *
+     * <p>NOTE: If any exception occurs during writing, the writer should 
clean up useless files for
+     * the user.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(CloseableIterator<T> records) throws Exception {
+        try {
+            while (records.hasNext()) {
+                write(records.next());
+            }
+        } finally {
+            records.close();
+        }
+    }
+
     /**
      * Add records from {@link Iterable} to file writer.
      *
+     * <p>NOTE: If any exception occurs during writing, the writer should 
clean up useless files for
+     * the user.
+     *
      * @param records to write.
      * @throws IOException if encounter any IO error.
      */
@@ -81,22 +108,13 @@ public interface FileWriter<T, R> extends Closeable {
      */
     long length() throws IOException;
 
-    /** Abort to clear orphan file(s) if encounter any error. */
+    /**
+     * Abort to clear orphan file(s) if encounter any error.
+     *
+     * <p>NOTE: This implementation must be reentrant.
+     */
     void abort();
 
     /** @return the result for this closed file writer. */
     R result() throws IOException;
-
-    /** A factory that creates a {@link FileWriter}. */
-    interface Factory<T, R> extends Serializable {
-
-        /**
-         * Creates a writer that writes to the given stream.
-         *
-         * @param path the path to write records.
-         * @return the file format writer.
-         * @throws IOException if any IO error was encountered then open the 
writer.
-         */
-        FileWriter<T, R> create(Path path) throws IOException;
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java
similarity index 60%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java
index 536918f8..cd03c3de 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java
@@ -16,35 +16,44 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.data;
+package org.apache.flink.table.store.file.io;
 
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.RecordAndPosition;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-/** Reads {@link RowData} from data files. */
-public class AppendOnlyReader implements RecordReader<RowData> {
+/** {@link RecordReader} for reading {@link KeyValue} data files. */
+public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
 
     private final BulkFormat.Reader<RowData> reader;
+    private final KeyValueSerializer serializer;
 
-    public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> 
readerFactory)
+    public KeyValueDataFileRecordReader(
+            BulkFormat<RowData, FileSourceSplit> readerFactory,
+            Path path,
+            RowType keyType,
+            RowType valueType)
             throws IOException {
         this.reader = FileUtils.createFormatReader(readerFactory, path);
+        this.serializer = new KeyValueSerializer(keyType, valueType);
     }
 
     @Nullable
     @Override
-    public RecordIterator<RowData> readBatch() throws IOException {
+    public RecordIterator<KeyValue> readBatch() throws IOException {
         BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
-        return iterator == null ? null : new 
AppendOnlyRecordIterator(iterator);
+        return iterator == null ? null : new 
KeyValueDataFileRecordIterator(iterator, serializer);
     }
 
     @Override
@@ -52,20 +61,24 @@ public class AppendOnlyReader implements 
RecordReader<RowData> {
         reader.close();
     }
 
-    private static class AppendOnlyRecordIterator implements 
RecordReader.RecordIterator<RowData> {
+    private static class KeyValueDataFileRecordIterator
+            implements RecordReader.RecordIterator<KeyValue> {
 
         private final BulkFormat.RecordIterator<RowData> iterator;
+        private final KeyValueSerializer serializer;
 
-        private AppendOnlyRecordIterator(BulkFormat.RecordIterator<RowData> 
iterator) {
+        private KeyValueDataFileRecordIterator(
+                BulkFormat.RecordIterator<RowData> iterator, 
KeyValueSerializer serializer) {
             this.iterator = iterator;
+            this.serializer = serializer;
         }
 
         @Override
-        public RowData next() throws IOException {
+        public KeyValue next() throws IOException {
             RecordAndPosition<RowData> result = iterator.next();
 
             // TODO schema evolution
-            return result == null ? null : result.getRecord();
+            return result == null ? null : 
serializer.fromRow(result.getRecord());
         }
 
         @Override
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileWriter.java
new file mode 100644
index 00000000..3aad8089
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileWriter.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.io;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+/**
+ * A {@link StatsCollectingSingleFileWriter} to write data files containing 
{@link KeyValue}s. Also
+ * produces {@link DataFileMeta} after writing a file.
+ */
+public class KeyValueDataFileWriter
+        extends StatsCollectingSingleFileWriter<KeyValue, DataFileMeta> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueDataFileWriter.class);
+
+    private final RowType keyType;
+    private final RowType valueType;
+    private final long schemaId;
+    private final int level;
+
+    private final FieldStatsArraySerializer keyStatsConverter;
+    private final FieldStatsArraySerializer valueStatsConverter;
+    private final RowDataSerializer keySerializer;
+
+    private BinaryRowData minKey = null;
+    private RowData maxKey = null;
+    private long minSeqNumber = Long.MAX_VALUE;
+    private long maxSeqNumber = Long.MIN_VALUE;
+
+    public KeyValueDataFileWriter(
+            BulkWriter.Factory<RowData> factory,
+            Path path,
+            Function<KeyValue, RowData> converter,
+            RowType keyType,
+            RowType valueType,
+            @Nullable FileStatsExtractor fileStatsExtractor,
+            long schemaId,
+            int level) {
+        super(factory, path, converter, KeyValue.schema(keyType, valueType), 
fileStatsExtractor);
+
+        this.keyType = keyType;
+        this.valueType = valueType;
+        this.schemaId = schemaId;
+        this.level = level;
+
+        this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
+        this.valueStatsConverter = new FieldStatsArraySerializer(valueType);
+        this.keySerializer = new RowDataSerializer(keyType);
+    }
+
+    @Override
+    public void write(KeyValue kv) throws IOException {
+        super.write(kv);
+
+        updateMinKey(kv);
+        updateMaxKey(kv);
+
+        updateMinSeqNumber(kv);
+        updateMaxSeqNumber(kv);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Write key value " + kv.toString(keyType, valueType));
+        }
+    }
+
+    private void updateMinKey(KeyValue kv) {
+        if (minKey == null) {
+            minKey = keySerializer.toBinaryRow(kv.key()).copy();
+        }
+    }
+
+    private void updateMaxKey(KeyValue kv) {
+        maxKey = kv.key();
+    }
+
+    private void updateMinSeqNumber(KeyValue kv) {
+        minSeqNumber = Math.min(minSeqNumber, kv.sequenceNumber());
+    }
+
+    private void updateMaxSeqNumber(KeyValue kv) {
+        maxSeqNumber = Math.max(maxSeqNumber, kv.sequenceNumber());
+    }
+
+    @Override
+    public DataFileMeta result() throws IOException {
+        FieldStats[] rowStats = fieldStats();
+        int numKeyFields = keyType.getFieldCount();
+
+        FieldStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, 
numKeyFields);
+        BinaryTableStats keyStats = keyStatsConverter.toBinary(keyFieldStats);
+
+        FieldStats[] valFieldStats =
+                Arrays.copyOfRange(rowStats, numKeyFields + 2, 
rowStats.length);
+        BinaryTableStats valueStats = 
valueStatsConverter.toBinary(valFieldStats);
+
+        return new DataFileMeta(
+                path.getName(),
+                FileUtils.getFileSize(path),
+                recordCount(),
+                minKey,
+                keySerializer.toBinaryRow(maxKey).copy(),
+                keyStats,
+                valueStats,
+                minSeqNumber,
+                maxSeqNumber,
+                schemaId,
+                level);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
similarity index 59%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
index 298d5014..f635ac00 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java
@@ -17,13 +17,15 @@
  * under the License.
  */
 
-package org.apache.flink.table.store.file.writer;
+package org.apache.flink.table.store.file.io;
 
-import org.apache.flink.util.IOUtils;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Supplier;
@@ -36,36 +38,53 @@ import java.util.function.Supplier;
  */
 public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
 
-    private final Supplier<? extends FileWriter<T, R>> writerFactory;
+    private static final Logger LOG = 
LoggerFactory.getLogger(RollingFileWriter.class);
+
+    private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
     private final long targetFileSize;
-    private final List<FileWriter<T, R>> openedWriters;
+    private final List<SingleFileWriter<T, R>> openedWriters;
     private final List<R> results;
 
-    private FileWriter<T, R> currentWriter = null;
+    private SingleFileWriter<T, R> currentWriter = null;
     private long lengthOfClosedFiles = 0L;
     private long recordCount = 0;
     private boolean closed = false;
 
     public RollingFileWriter(
-            Supplier<? extends FileWriter<T, R>> writerFactory, long 
targetFileSize) {
+            Supplier<? extends SingleFileWriter<T, R>> writerFactory, long 
targetFileSize) {
         this.writerFactory = writerFactory;
         this.targetFileSize = targetFileSize;
         this.openedWriters = new ArrayList<>();
         this.results = new ArrayList<>();
     }
 
+    @VisibleForTesting
+    public long targetFileSize() {
+        return targetFileSize;
+    }
+
     @Override
     public void write(T row) throws IOException {
-        // Open the current writer if write the first record or roll over 
happen before.
-        if (currentWriter == null) {
-            openCurrentWriter();
-        }
+        try {
+            // Open the current writer if write the first record or roll over 
happen before.
+            if (currentWriter == null) {
+                openCurrentWriter();
+            }
 
-        currentWriter.write(row);
-        recordCount += 1;
+            currentWriter.write(row);
+            recordCount += 1;
 
-        if (currentWriter.length() >= targetFileSize) {
-            closeCurrentWriter();
+            if (currentWriter.length() >= targetFileSize) {
+                closeCurrentWriter();
+            }
+        } catch (Throwable e) {
+            LOG.warn(
+                    "Exception occurs when writing file "
+                            + (currentWriter == null ? null : 
currentWriter.path())
+                            + ". Cleaning up.",
+                    e);
+            abort();
+            throw e;
         }
     }
 
@@ -74,19 +93,15 @@ public class RollingFileWriter<T, R> implements 
FileWriter<T, List<R>> {
         openedWriters.add(currentWriter);
     }
 
-    private void closeCurrentWriter() {
-        if (currentWriter != null) {
-            try {
-                lengthOfClosedFiles += currentWriter.length();
-
-                currentWriter.close();
-                results.add(currentWriter.result());
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
-
-            currentWriter = null;
+    private void closeCurrentWriter() throws IOException {
+        if (currentWriter == null) {
+            return;
         }
+
+        lengthOfClosedFiles += currentWriter.length();
+        currentWriter.close();
+        results.add(currentWriter.result());
+        currentWriter = null;
     }
 
     @Override
@@ -106,9 +121,6 @@ public class RollingFileWriter<T, R> implements 
FileWriter<T, List<R>> {
 
     @Override
     public void abort() {
-        IOUtils.closeQuietly(this);
-
-        // Abort all those writers.
         for (FileWriter<T, R> writer : openedWriters) {
             writer.abort();
         }
@@ -117,15 +129,24 @@ public class RollingFileWriter<T, R> implements 
FileWriter<T, List<R>> {
     @Override
     public List<R> result() {
         Preconditions.checkState(closed, "Cannot access the results unless 
close all writers.");
-
         return results;
     }
 
     @Override
     public void close() throws IOException {
-        if (!closed) {
-            closeCurrentWriter();
+        if (closed) {
+            return;
+        }
 
+        try {
+            closeCurrentWriter();
+        } catch (IOException e) {
+            LOG.warn(
+                    "Exception occurs when writing file " + 
currentWriter.path() + ". Cleaning up.",
+                    e);
+            abort();
+            throw e;
+        } finally {
             closed = true;
         }
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileRecordReader.java
similarity index 79%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileRecordReader.java
index 536918f8..89d87fea 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileRecordReader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.data;
+package org.apache.flink.table.store.file.io;
 
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
@@ -31,20 +31,20 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /** Reads {@link RowData} from data files. */
-public class AppendOnlyReader implements RecordReader<RowData> {
+public class RowDataFileRecordReader implements RecordReader<RowData> {
 
     private final BulkFormat.Reader<RowData> reader;
 
-    public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> 
readerFactory)
+    public RowDataFileRecordReader(Path path, BulkFormat<RowData, 
FileSourceSplit> readerFactory)
             throws IOException {
         this.reader = FileUtils.createFormatReader(readerFactory, path);
     }
 
     @Nullable
     @Override
-    public RecordIterator<RowData> readBatch() throws IOException {
+    public RecordReader.RecordIterator<RowData> readBatch() throws IOException 
{
         BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
-        return iterator == null ? null : new 
AppendOnlyRecordIterator(iterator);
+        return iterator == null ? null : new 
RowDataFileRecordIterator(iterator);
     }
 
     @Override
@@ -52,11 +52,11 @@ public class AppendOnlyReader implements 
RecordReader<RowData> {
         reader.close();
     }
 
-    private static class AppendOnlyRecordIterator implements 
RecordReader.RecordIterator<RowData> {
+    private static class RowDataFileRecordIterator implements 
RecordReader.RecordIterator<RowData> {
 
         private final BulkFormat.RecordIterator<RowData> iterator;
 
-        private AppendOnlyRecordIterator(BulkFormat.RecordIterator<RowData> 
iterator) {
+        private RowDataFileRecordIterator(BulkFormat.RecordIterator<RowData> 
iterator) {
             this.iterator = iterator;
         }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileWriter.java
new file mode 100644
index 00000000..8983ba43
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileWriter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.io;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * A {@link StatsCollectingSingleFileWriter} to write data files containing 
{@link RowData}. Also
+ * produces {@link DataFileMeta} after writing a file.
+ */
+public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<RowData, DataFileMeta> {
+
+    private final long schemaId;
+    private final LongCounter seqNumCounter;
+    private final FieldStatsArraySerializer statsArraySerializer;
+
+    public RowDataFileWriter(
+            BulkWriter.Factory<RowData> factory,
+            Path path,
+            RowType writeSchema,
+            @Nullable FileStatsExtractor fileStatsExtractor,
+            long schemaId,
+            LongCounter seqNumCounter) {
+        super(factory, path, Function.identity(), writeSchema, 
fileStatsExtractor);
+        this.schemaId = schemaId;
+        this.seqNumCounter = seqNumCounter;
+        this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
+    }
+
+    @Override
+    public void write(RowData row) throws IOException {
+        super.write(row);
+        seqNumCounter.add(1L);
+    }
+
+    @Override
+    public DataFileMeta result() throws IOException {
+        BinaryTableStats stats = statsArraySerializer.toBinary(fieldStats());
+        return DataFileMeta.forAppend(
+                path.getName(),
+                FileUtils.getFileSize(path),
+                recordCount(),
+                stats,
+                seqNumCounter.getLocalValue() - super.recordCount(),
+                seqNumCounter.getLocalValue() - 1,
+                schemaId);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataRollingFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataRollingFileWriter.java
new file mode 100644
index 00000000..60376442
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataRollingFileWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.io;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link RollingFileWriter} for data files containing {@link RowData}. */
+public class RowDataRollingFileWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
+
+    public RowDataRollingFileWriter(
+            long schemaId,
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            DataFilePathFactory pathFactory,
+            LongCounter seqNumCounter) {
+        super(
+                () ->
+                        new RowDataFileWriter(
+                                fileFormat.createWriterFactory(writeSchema),
+                                pathFactory.newPath(),
+                                writeSchema,
+                                
fileFormat.createStatsExtractor(writeSchema).orElse(null),
+                                schemaId,
+                                seqNumCounter),
+                targetFileSize);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java
new file mode 100644
index 00000000..38402aea
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.io;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.function.Function;
+
+/**
+ * A {@link FileWriter} to produce a single file.
+ *
+ * @param <T> type of records to write.
+ * @param <R> type of result to produce after writing a file.
+ */
+public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SingleFileWriter.class);
+
+    protected final Path path;
+    private final Function<T, RowData> converter;
+
+    private final BulkWriter<RowData> writer;
+    private FSDataOutputStream out;
+
+    private long recordCount;
+    private long length;
+    protected boolean closed;
+
+    public SingleFileWriter(
+            BulkWriter.Factory<RowData> factory, Path path, Function<T, 
RowData> converter) {
+        this.path = path;
+        this.converter = converter;
+
+        try {
+            FileSystem fs = path.getFileSystem();
+            out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
+            writer = factory.create(out);
+        } catch (IOException e) {
+            LOG.warn(
+                    "Failed to open the bulk writer, closing the output stream 
and throw the error.",
+                    e);
+            if (out != null) {
+                abort();
+            }
+            throw new UncheckedIOException(e);
+        }
+
+        this.recordCount = 0;
+        this.length = 0;
+        this.closed = false;
+    }
+
+    public Path path() {
+        return path;
+    }
+
+    @Override
+    public void write(T record) throws IOException {
+        writeImpl(record);
+    }
+
+    protected RowData writeImpl(T record) throws IOException {
+        if (closed) {
+            throw new RuntimeException("Writer has already closed!");
+        }
+
+        try {
+            RowData rowData = converter.apply(record);
+            writer.addElement(rowData);
+            recordCount++;
+            return rowData;
+        } catch (Throwable e) {
+            LOG.warn("Exception occurs when writing file " + path + ". 
Cleaning up.", e);
+            abort();
+            throw e;
+        }
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        if (closed) {
+            return length;
+        } else {
+            return out.getPos();
+        }
+    }
+
+    @Override
+    public void abort() {
+        IOUtils.closeQuietly(out);
+        FileUtils.deleteOrWarn(path);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Closing file " + path);
+        }
+
+        try {
+            writer.flush();
+            writer.finish();
+
+            out.flush();
+            length = out.getPos();
+            out.close();
+        } catch (IOException e) {
+            LOG.warn("Exception occurs when closing file " + path + ". 
Cleaning up.", e);
+            abort();
+            throw e;
+        } finally {
+            closed = true;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/StatsCollectingSingleFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/StatsCollectingSingleFileWriter.java
new file mode 100644
index 00000000..d2539b28
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/StatsCollectingSingleFileWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.io;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.format.FieldStatsCollector;
+import org.apache.flink.table.store.format.FileStatsExtractor;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * A {@link SingleFileWriter} which also produces statistics for each written 
field.
+ *
+ * @param <T> type of records to write.
+ * @param <R> type of result to produce after writing a file.
+ */
+public abstract class StatsCollectingSingleFileWriter<T, R> extends 
SingleFileWriter<T, R> {
+
+    @Nullable private final FileStatsExtractor fileStatsExtractor;
+    @Nullable private FieldStatsCollector fieldStatsCollector = null;
+
+    public StatsCollectingSingleFileWriter(
+            BulkWriter.Factory<RowData> factory,
+            Path path,
+            Function<T, RowData> converter,
+            RowType writeSchema,
+            @Nullable FileStatsExtractor fileStatsExtractor) {
+        super(factory, path, converter);
+        this.fileStatsExtractor = fileStatsExtractor;
+        if (this.fileStatsExtractor == null) {
+            this.fieldStatsCollector = new FieldStatsCollector(writeSchema);
+        }
+    }
+
+    @Override
+    public void write(T record) throws IOException {
+        RowData rowData = writeImpl(record);
+        if (fieldStatsCollector != null) {
+            fieldStatsCollector.collect(rowData);
+        }
+    }
+
+    public FieldStats[] fieldStats() throws IOException {
+        Preconditions.checkState(closed, "Cannot access metric unless the 
writer is closed.");
+        if (fileStatsExtractor != null) {
+            return fileStatsExtractor.extract(path);
+        } else {
+            return fieldStatsCollector.extract();
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 3e3c4f35..e600db1d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -24,28 +24,20 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.io.RollingFileWriter;
+import org.apache.flink.table.store.file.io.SingleFileWriter;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
-import org.apache.flink.table.store.file.writer.BaseFileWriter;
-import org.apache.flink.table.store.file.writer.FileWriter;
-import org.apache.flink.table.store.file.writer.Metric;
-import org.apache.flink.table.store.file.writer.MetricFileWriter;
-import org.apache.flink.table.store.file.writer.RollingFileWriter;
 import org.apache.flink.table.store.format.FieldStatsCollector;
 import org.apache.flink.table.store.format.FileFormat;
-import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -58,26 +50,22 @@ import java.util.function.Supplier;
  */
 public class ManifestFile {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFile.class);
-
     private final SchemaManager schemaManager;
     private final long schemaId;
     private final RowType partitionType;
     private final ManifestEntrySerializer serializer;
     private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+    private final BulkWriter.Factory<RowData> writerFactory;
     private final FileStorePathFactory pathFactory;
     private final long suggestedFileSize;
-    private final FileWriter.Factory<ManifestEntry, Metric> fileWriterFactory;
 
     private ManifestFile(
             SchemaManager schemaManager,
             long schemaId,
             RowType partitionType,
-            RowType entryType,
             ManifestEntrySerializer serializer,
             BulkFormat<RowData, FileSourceSplit> readerFactory,
             BulkWriter.Factory<RowData> writerFactory,
-            FileStatsExtractor fileStatsExtractor,
             FileStorePathFactory pathFactory,
             long suggestedFileSize) {
         this.schemaManager = schemaManager;
@@ -85,13 +73,9 @@ public class ManifestFile {
         this.partitionType = partitionType;
         this.serializer = serializer;
         this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
         this.pathFactory = pathFactory;
         this.suggestedFileSize = suggestedFileSize;
-
-        // Initialize the metric file writer factory to write manifest entry 
and generate metrics.
-        this.fileWriterFactory =
-                MetricFileWriter.createFactory(
-                        writerFactory, serializer::toRow, entryType, 
fileStatsExtractor);
     }
 
     @VisibleForTesting
@@ -132,18 +116,12 @@ public class ManifestFile {
      * <p>NOTE: This method is atomic.
      */
     public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
-
         ManifestRollingWriter rollingWriter = 
createManifestRollingWriter(suggestedFileSize);
         try (ManifestRollingWriter writer = rollingWriter) {
             writer.write(entries);
-
         } catch (Exception e) {
-            LOG.warn("Exception occurs when writing manifest files. Cleaning 
up.", e);
-
-            rollingWriter.abort();
             throw new RuntimeException(e);
         }
-
         return rollingWriter.result();
     }
 
@@ -151,16 +129,16 @@ public class ManifestFile {
         FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
     }
 
-    private class ManifestEntryWriter extends BaseFileWriter<ManifestEntry, 
ManifestFileMeta> {
+    private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, 
ManifestFileMeta> {
 
         private final FieldStatsCollector partitionStatsCollector;
         private final FieldStatsArraySerializer partitionStatsSerializer;
+
         private long numAddedFiles = 0;
         private long numDeletedFiles = 0;
 
-        ManifestEntryWriter(FileWriter.Factory<ManifestEntry, Metric> 
writerFactory, Path path)
-                throws IOException {
-            super(writerFactory, path);
+        ManifestEntryWriter(BulkWriter.Factory<RowData> factory, Path path) {
+            super(factory, path, serializer::toRow);
 
             this.partitionStatsCollector = new 
FieldStatsCollector(partitionType);
             this.partitionStatsSerializer = new 
FieldStatsArraySerializer(partitionType);
@@ -185,9 +163,7 @@ public class ManifestFile {
         }
 
         @Override
-        protected ManifestFileMeta createResult(Path path, Metric ignore) 
throws IOException {
-            // The input metric will be ignored because it includes all the 
column's stats, rather
-            // than the partition stats.
+        public ManifestFileMeta result() throws IOException {
             return new ManifestFileMeta(
                     path.getName(),
                     path.getFileSystem().getFileStatus(path).getLen(),
@@ -208,13 +184,7 @@ public class ManifestFile {
     }
 
     private Supplier<ManifestEntryWriter> createWriterFactory() {
-        return () -> {
-            try {
-                return new ManifestEntryWriter(fileWriterFactory, 
pathFactory.newManifestFile());
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
-        };
+        return () -> new ManifestEntryWriter(writerFactory, 
pathFactory.newManifestFile());
     }
 
     private ManifestRollingWriter createManifestRollingWriter(long 
targetFileSize) {
@@ -255,11 +225,9 @@ public class ManifestFile {
                     schemaManager,
                     schemaId,
                     partitionType,
-                    entryType,
                     new ManifestEntrySerializer(),
                     fileFormat.createReaderFactory(entryType),
                     fileFormat.createWriterFactory(entryType),
-                    fileFormat.createStatsExtractor(entryType).orElse(null),
                     pathFactory,
                     suggestedFileSize);
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index b48942f0..1956f96d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -29,7 +29,7 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.CloseableIterator;
 
@@ -228,7 +228,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     }
 
     @Override
-    public List<DataFileMeta> close() throws Exception {
+    public void close() throws Exception {
         // cancel compaction so that it does not block job cancelling
         compactManager.cancelCompaction();
         sync();
@@ -247,6 +247,5 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         }
         newFiles.clear();
         compactAfter.clear();
-        return delete;
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
index 7f4f21a6..4b12d2c5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.data.AppendOnlyReader;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.io.RowDataFileRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.SchemaManager;
@@ -91,7 +91,7 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<RowData> {
         for (DataFileMeta file : split.files()) {
             suppliers.add(
                     () ->
-                            new AppendOnlyReader(
+                            new RowDataFileRecordReader(
                                     
dataFilePathFactory.toPath(file.fileName()), readerFactory));
         }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 8911df14..979462c8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -29,10 +29,11 @@ import 
org.apache.flink.table.store.file.data.AppendOnlyCompactManager;
 import org.apache.flink.table.store.file.data.AppendOnlyWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.io.RowDataRollingFileWriter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
@@ -149,17 +150,19 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<RowData> {
             if (toCompact.isEmpty()) {
                 return Collections.emptyList();
             }
-            AppendOnlyWriter.RowRollingWriter rewriter =
-                    AppendOnlyWriter.RowRollingWriter.createRollingRowWriter(
+            RowDataRollingFileWriter rewriter =
+                    new RowDataRollingFileWriter(
                             schemaId,
                             fileFormat,
                             targetFileSize,
                             rowType,
                             pathFactory.createDataFilePathFactory(partition, 
bucket),
                             new 
LongCounter(toCompact.get(0).minSequenceNumber()));
-            return rewriter.write(
+            rewriter.write(
                     new RecordReaderIterator<>(
                             read.createReader(new Split(partition, bucket, 
toCompact, false))));
+            rewriter.close();
+            return rewriter.result();
         };
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 95cf0991..ebd68ba0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 
 import javax.annotation.Nullable;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 9e430b01..d354dedf 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -41,8 +41,8 @@ import 
org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
similarity index 84%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index e072a797..41ac970d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -16,13 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.writer;
+package org.apache.flink.table.store.file.utils;
 
-import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.Increment;
 
-import java.util.List;
-
 /**
  * The {@code RecordWriter} is responsible for writing data and handling 
in-progress files used to
  * write yet un-staged data. The incremental files ready to commit is returned 
to the system by the
@@ -49,10 +46,6 @@ public interface RecordWriter<T> {
      */
     void sync() throws Exception;
 
-    /**
-     * Close this writer, the call will delete newly generated but not 
committed files.
-     *
-     * @return Deleted files.
-     */
-    List<DataFileMeta> close() throws Exception;
+    /** Close this writer, the call will delete newly generated but not 
committed files. */
+    void close() throws Exception;
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
deleted file mode 100644
index ab444781..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.table.store.file.writer;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * The abstracted base file writer implementation for {@link FileWriter}.
- *
- * @param <T> record data type.
- * @param <R> file meta data type.
- */
-public abstract class BaseFileWriter<T, R> implements FileWriter<T, R> {
-
-    private final FileWriter.Factory<T, Metric> writerFactory;
-    private final Path path;
-
-    private FileWriter<T, Metric> writer = null;
-    private Metric metric = null;
-
-    private boolean closed = false;
-
-    public BaseFileWriter(FileWriter.Factory<T, Metric> writerFactory, Path 
path) {
-        this.writerFactory = writerFactory;
-        this.path = path;
-    }
-
-    public Path path() {
-        return path;
-    }
-
-    private void openCurrentWriter() throws IOException {
-        this.writer = writerFactory.create(path);
-    }
-
-    @Override
-    public void write(T row) throws IOException {
-        if (writer == null) {
-            openCurrentWriter();
-        }
-
-        writer.write(row);
-    }
-
-    @Override
-    public long recordCount() {
-        if (writer != null) {
-            return writer.recordCount();
-        } else if (metric != null) {
-            return metric.recordCount();
-        }
-        return 0L;
-    }
-
-    @Override
-    public long length() throws IOException {
-        if (writer != null) {
-            return writer.length();
-        } else if (metric != null) {
-            return metric.length();
-        }
-        return 0;
-    }
-
-    protected abstract R createResult(Path path, Metric metric) throws 
IOException;
-
-    @Override
-    public void abort() {
-        IOUtils.closeQuietly(this);
-
-        // Abort to clean the orphan file.
-        FileUtils.deleteOrWarn(path);
-    }
-
-    @Override
-    public R result() throws IOException {
-        Preconditions.checkState(closed, "Cannot access the file meta unless 
close this writer.");
-        Preconditions.checkNotNull(metric, "Metric cannot be null.");
-
-        return createResult(path, metric);
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!closed) {
-            if (writer != null) {
-                writer.close();
-                metric = writer.result();
-
-                writer = null;
-            }
-
-            closed = true;
-        }
-    }
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
deleted file mode 100644
index b41545c3..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.table.store.file.writer;
-
-import org.apache.flink.table.store.format.FieldStats;
-
-/** Metric information to describe the column's max-min values, record count 
etc. */
-public class Metric {
-
-    private final FieldStats[] fieldStats;
-    private final long recordCount;
-    private final long length;
-
-    public Metric(FieldStats[] stats, long recordCount, long length) {
-        this.fieldStats = stats;
-        this.recordCount = recordCount;
-        this.length = length;
-    }
-
-    public FieldStats[] fieldStats() {
-        return fieldStats;
-    }
-
-    public long recordCount() {
-        return recordCount;
-    }
-
-    public long length() {
-        return length;
-    }
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
deleted file mode 100644
index 26e26953..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.table.store.file.writer;
-
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.format.FieldStats;
-import org.apache.flink.table.store.format.FieldStatsCollector;
-import org.apache.flink.table.store.format.FileStatsExtractor;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.function.Function;
-
-/**
- * An {@link FileWriter} to write generic record and generate {@link Metric} 
once closing it.
- *
- * @param <T> generic record type.
- */
-public class MetricFileWriter<T> implements FileWriter<T, Metric> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MetricFileWriter.class);
-
-    private final BulkWriter<RowData> writer;
-    private final Function<T, RowData> converter;
-    private final FSDataOutputStream out;
-    private final Path path;
-    @Nullable private final FileStatsExtractor fileStatsExtractor;
-
-    private FieldStatsCollector fieldStatsCollector = null;
-
-    private long recordCount;
-    private long length;
-    private boolean closed = false;
-
-    private MetricFileWriter(
-            BulkWriter<RowData> writer,
-            Function<T, RowData> converter,
-            FSDataOutputStream out,
-            Path path,
-            RowType writeSchema,
-            @Nullable FileStatsExtractor fileStatsExtractor) {
-        this.writer = writer;
-        this.converter = converter;
-        this.out = out;
-        this.path = path;
-
-        this.fileStatsExtractor = fileStatsExtractor;
-        if (this.fileStatsExtractor == null) {
-            this.fieldStatsCollector = new FieldStatsCollector(writeSchema);
-        }
-
-        this.recordCount = 0L;
-        this.length = 0L;
-    }
-
-    @Override
-    public void write(T record) throws IOException {
-        RowData rowData = converter.apply(record);
-        writer.addElement(rowData);
-
-        if (fieldStatsCollector != null) {
-            fieldStatsCollector.collect(rowData);
-        }
-
-        recordCount += 1;
-    }
-
-    @Override
-    public long recordCount() {
-        return recordCount;
-    }
-
-    @Override
-    public long length() {
-        try {
-            return out.getPos();
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    @Override
-    public void abort() {
-        try {
-            close();
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-
-        FileUtils.deleteOrWarn(path);
-    }
-
-    @Override
-    public Metric result() throws IOException {
-        Preconditions.checkState(closed, "Cannot access metric unless the 
writer is closed.");
-
-        FieldStats[] stats;
-        if (fileStatsExtractor != null) {
-            stats = fileStatsExtractor.extract(path);
-        } else {
-            stats = fieldStatsCollector.extract();
-        }
-
-        return new Metric(stats, recordCount, length);
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!closed) {
-            if (writer != null) {
-                writer.flush();
-                writer.finish();
-            }
-
-            if (out != null) {
-                out.flush();
-
-                length = out.getPos();
-                out.close();
-            }
-
-            closed = true;
-        }
-    }
-
-    public static <T> FileWriter.Factory<T, Metric> createFactory(
-            BulkWriter.Factory<RowData> factory,
-            Function<T, RowData> converter,
-            RowType writeSchema,
-            @Nullable FileStatsExtractor fileStatsExtractor) {
-
-        return path -> {
-            // Open the output stream.
-            FileSystem fs = path.getFileSystem();
-            FSDataOutputStream out = fs.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
-
-            try {
-                return new MetricFileWriter<>(
-                        factory.create(out), converter, out, path, 
writeSchema, fileStatsExtractor);
-            } catch (Throwable e) {
-                LOG.warn(
-                        "Failed to open the bulk writer, closing the output 
stream and throw the error.",
-                        e);
-
-                IOUtils.closeQuietly(out);
-                throw e;
-            }
-        };
-    }
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 65975cc6..5cc90e27 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 9dea5237..e39216ed 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -33,7 +33,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.table.sink.MemoryTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index b72cc634..02e457cd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.table.sink.MemoryTableWrite;
 import org.apache.flink.table.store.table.sink.SequenceGenerator;
 import org.apache.flink.table.store.table.sink.SinkRecord;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 50eed358..b5063503 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import java.util.ArrayList;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
index 1f49b1b7..06a217d7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 339f8011..c3f467b0 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -41,8 +41,8 @@ import 
org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index c450116c..037faeeb 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
@@ -95,15 +95,15 @@ public class AppendOnlyWriterTest {
     public void testSingleWrite() throws Exception {
         RecordWriter<RowData> writer = createEmptyWriter(1024 * 1024L);
         writer.write(row(1, "AAA", PART));
+        Increment increment = writer.prepareCommit(true);
+        writer.close();
 
-        List<DataFileMeta> result = writer.close();
-
-        assertThat(result.size()).isEqualTo(1);
-        DataFileMeta meta = result.get(0);
+        assertThat(increment.newFiles().size()).isEqualTo(1);
+        DataFileMeta meta = increment.newFiles().get(0);
         assertThat(meta).isNotNull();
 
         Path path = pathFactory.toPath(meta.fileName());
-        assertThat(path.getFileSystem().exists(path)).isFalse();
+        assertThat(path.getFileSystem().exists(path)).isTrue();
 
         assertThat(meta.rowCount()).isEqualTo(1L);
         assertThat(meta.minKey()).isEqualTo(EMPTY_ROW);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 58c1c424..8968db64 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -122,8 +122,15 @@ public class DataFileTest {
         try {
             writer.write(CloseableIterator.fromList(data.content, kv -> {}), 
0);
         } catch (Throwable e) {
-            assertThat(e)
-                    
.isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+            if (e.getCause() != null) {
+                assertThat(e)
+                        .hasRootCauseExactlyInstanceOf(
+                                
FailingAtomicRenameFileSystem.ArtificialException.class);
+            } else {
+                assertThat(e)
+                        .isExactlyInstanceOf(
+                                
FailingAtomicRenameFileSystem.ArtificialException.class);
+            }
             Path root = new Path(tempDir.toString());
             FileSystem fs = root.getFileSystem();
             for (FileStatus bucketStatus : fs.listStatus(root)) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 39ed935e..1e9bf22f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.data.DataFileTest;
 import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -39,7 +40,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.LinkedList;
-import java.util.List;
 
 /** test file format suffix. */
 public class FileFormatSuffixTest extends DataFileTest {
@@ -72,9 +72,10 @@ public class FileFormatSuffixTest extends DataFileTest {
                         dataFilePathFactory);
         appendOnlyWriter.write(
                 GenericRowData.of(1, StringData.fromString("aaa"), 
StringData.fromString("1")));
-        List<DataFileMeta> result = appendOnlyWriter.close();
+        Increment increment = appendOnlyWriter.prepareCommit(true);
+        appendOnlyWriter.close();
 
-        DataFileMeta meta = result.get(0);
+        DataFileMeta meta = increment.newFiles().get(0);
         Assertions.assertTrue(meta.fileName().endsWith(format));
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 4a82d35f..c95f03d9 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -175,16 +175,6 @@ public class MergeTreeTest {
         assertRecords(expected);
     }
 
-    @Test
-    public void testClose() throws Exception {
-        doTestWriteRead(6);
-        List<DataFileMeta> files = writer.close();
-        for (DataFileMeta file : files) {
-            Path path = dataFileWriter.pathFactory().toPath(file.fileName());
-            assertThat(path.getFileSystem().exists(path)).isFalse();
-        }
-    }
-
     @ParameterizedTest
     @ValueSource(longs = {1, 1024 * 1024})
     public void testCloseUpgrade(long targetFileSize) throws Exception {
@@ -253,6 +243,8 @@ public class MergeTreeTest {
         // assert records from increment compacted files
         assertRecords(expected, compactedFiles, true);
 
+        writer.close();
+
         Path bucketDir = 
dataFileWriter.pathFactory().toPath("ignore").getParent();
         Set<String> files =
                 Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))

Reply via email to