This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3194143693 [core] Enable Auto Spill in Streaming mode (#5857)
3194143693 is described below

commit 31941436936c09df696e0c1d3f1d221522a61b80
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 10 17:35:32 2025 +0800

    [core] Enable Auto Spill in Streaming mode (#5857)
---
 .../shortcodes/generated/core_configuration.html   |   4 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  15 +--
 .../paimon/operation/AbstractFileStoreWrite.java   |   8 +-
 .../paimon/operation/BaseAppendFileStoreWrite.java |   3 +-
 .../apache/paimon/operation/FileStoreWrite.java    |   7 --
 .../paimon/operation/KeyValueFileStoreWrite.java   |   8 +-
 .../paimon/table/sink/BatchWriteBuilderImpl.java   |   4 +-
 .../apache/paimon/table/sink/InnerTableWrite.java  |   3 -
 .../apache/paimon/table/sink/TableWriteImpl.java   |   6 -
 .../apache/paimon/append/AppendOnlyWriterTest.java |  48 --------
 .../BucketedAppendFileStoreWriteTest.java          |   5 -
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |   1 -
 .../apache/paimon/flink/sink/FlinkSinkTest.java    | 130 ---------------------
 13 files changed, 10 insertions(+), 232 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 87afc279b4..f39ffdbcd8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1166,9 +1166,9 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
         </tr>
         <tr>
             <td><h5>write-buffer-spillable</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
-            <td>Whether the write buffer can be spillable. Enabled by default 
when using object storage or when 'target-file-size' is greater than 
'write-buffer-size'.</td>
+            <td>Whether the write buffer can be spillable.</td>
         </tr>
         <tr>
             <td><h5>write-max-writers-to-spill</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 7f2dc0217c..7fa95afcd5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -546,9 +546,8 @@ public class CoreOptions implements Serializable {
     public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE =
             key("write-buffer-spillable")
                     .booleanType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Whether the write buffer can be spillable. 
Enabled by default when using object storage or when 'target-file-size' is 
greater than 'write-buffer-size'.");
+                    .defaultValue(true)
+                    .withDescription("Whether the write buffer can be 
spillable.");
 
     public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
             key("write-buffer-for-append")
@@ -2172,14 +2171,8 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_BUFFER_SIZE).getBytes();
     }
 
-    public boolean writeBufferSpillable(
-            boolean usingObjectStore, boolean isStreaming, boolean 
hasPrimaryKey) {
-        // if not streaming mode, we turn spillable on by default.
-        return options.getOptional(WRITE_BUFFER_SPILLABLE)
-                .orElse(
-                        usingObjectStore
-                                || !isStreaming
-                                || targetFileSize(hasPrimaryKey) > 
writeBufferSize());
+    public boolean writeBufferSpillable() {
+        return options.get(WRITE_BUFFER_SPILLABLE);
     }
 
     public MemorySize writeBufferSpillDiskSize() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 7a316c1c79..0b8797cd08 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -86,7 +86,6 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     private boolean closeCompactExecutorWhenLeaving = true;
     private boolean ignorePreviousFiles = false;
     private boolean ignoreNumBucketCheck = false;
-    protected boolean isStreamingMode = false;
 
     protected CompactionMetrics compactionMetrics = null;
     protected final String tableName;
@@ -416,7 +415,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             LOG.debug("Creating writer for partition {}, bucket {}", 
partition, bucket);
         }
 
-        if (!isStreamingMode && writerNumber() >= writerNumberMax) {
+        if (writerNumber() >= writerNumberMax) {
             try {
                 forceBufferSpill();
             } catch (Exception e) {
@@ -462,11 +461,6 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 previousSnapshot == null ? null : previousSnapshot.id());
     }
 
-    @Override
-    public void withExecutionMode(boolean isStreamingMode) {
-        this.isStreamingMode = isStreamingMode;
-    }
-
     @Override
     public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) 
{
         this.compactionMetrics = new CompactionMetrics(metricRegistry, 
tableName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index ed75afdf2b..27e1e21b90 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -127,8 +127,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 pathFactory.createDataFilePathFactory(partition, bucket),
                 restoreIncrement,
                 options.useWriteBufferForAppend() || forceBufferSpill,
-                options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode, false)
-                        || forceBufferSpill,
+                options.writeBufferSpillable() || forceBufferSpill,
                 options.fileCompression(),
                 options.spillCompressOptions(),
                 statsCollectors,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index a5538dbe71..1b85e4f144 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -81,13 +81,6 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
      */
     void withIgnoreNumBucketCheck(boolean ignoreNumBucketCheck);
 
-    /**
-     * We detect whether it is in batch mode, if so, we do some optimization.
-     *
-     * @param isStreamingMode whether in streaming mode
-     */
-    void withExecutionMode(boolean isStreamingMode);
-
     /** With metrics to measure compaction. */
     FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index eac7e1888e..e337db7e63 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -23,7 +23,6 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
@@ -203,7 +202,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         partition, bucket, compactStrategy, compactExecutor, 
levels, dvMaintainer);
 
         return new MergeTreeWriter(
-                bufferSpillable(),
+                options.writeBufferSpillable(),
                 options.writeBufferSpillDiskSize(),
                 options.localSortMaxNumFileHandles(),
                 options.spillCompressOptions(),
@@ -219,11 +218,6 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 UserDefinedSeqComparator.create(valueType, options));
     }
 
-    @VisibleForTesting
-    public boolean bufferSpillable() {
-        return options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode, true);
-    }
-
     private CompactStrategy createCompactStrategy(CoreOptions options) {
         if (options.needLookup()) {
             if 
(CoreOptions.LookupCompactMode.RADICAL.equals(options.lookupCompact())) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 999de4ec96..beeb1decbe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -67,9 +67,7 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
 
     @Override
     public BatchTableWrite newWrite() {
-        return table.newWrite(commitUser)
-                .withIgnorePreviousFiles(staticPartition != null)
-                .withExecutionMode(false);
+        return 
table.newWrite(commitUser).withIgnorePreviousFiles(staticPartition != null);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index 1fc0191976..754577eeab 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -26,7 +26,4 @@ public interface InnerTableWrite extends StreamTableWrite, 
BatchTableWrite {
     InnerTableWrite withWriteRestore(WriteRestore writeRestore);
 
     InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
-
-    // we detect whether in streaming mode, and do some optimization
-    InnerTableWrite withExecutionMode(boolean isStreamingMode);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 3cc84390a7..a302105799 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -102,12 +102,6 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         return this;
     }
 
-    @Override
-    public TableWriteImpl<T> withExecutionMode(boolean isStreamingMode) {
-        write.withExecutionMode(isStreamingMode);
-        return this;
-    }
-
     @Override
     public TableWriteImpl<T> withIOManager(IOManager ioManager) {
         write.withIOManager(ioManager);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 8cb293cd8e..69d4e5c11f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -408,54 +408,6 @@ public class AppendOnlyWriterTest {
         writer.close();
     }
 
-    @Test
-    public void tesWriteBufferSpillAutoEnabled() {
-        HashMap<String, String> map = new HashMap<>();
-        // This is the default behavior,no object store and streaming mode.
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
-                .isFalse();
-
-        // Using object store.
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(true, true, 
false))
-                .isTrue();
-
-        // Batch mode.
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
false, false))
-                .isTrue();
-
-        // Append only table.
-        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "200 MB");
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
false, false))
-                .isTrue();
-
-        // Primary key table.
-        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "100 MB");
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
false, true))
-                .isTrue();
-
-        // targetFileSize is greater than write buffer size.
-        map.clear();
-        map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
-        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
-                .isTrue();
-
-        // target-file-size is smaller than write-buffer-size.
-        map.clear();
-        map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1 b");
-        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "2 b");
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
-                .isFalse();
-
-        // Set to false manually.
-        map.clear();
-        map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
-        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
-        map.put(CoreOptions.WRITE_BUFFER_SPILLABLE.key(), "false");
-        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
-                .isFalse();
-    }
-
     @Test
     public void testMultipleFlush() throws Exception {
         AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
index 39f2ba79d6..af1becc465 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
@@ -66,7 +66,6 @@ public class BucketedAppendFileStoreWriteTest {
         BucketedAppendFileStoreWrite write =
                 (BucketedAppendFileStoreWrite) table.store().newWrite("ss");
         write.withIOManager(IOManager.create(tempDir.toString()));
-        write.withExecutionMode(false);
 
         write.write(partition(0), 0, GenericRow.of(0, 0, 0));
         write.write(partition(1), 1, GenericRow.of(1, 1, 0));
@@ -119,7 +118,6 @@ public class BucketedAppendFileStoreWriteTest {
         FileStoreTable table = createFileStoreTable();
 
         BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite) 
table.store().newWrite("ss");
-        write.withExecutionMode(false);
 
         write.write(partition(0), 0, GenericRow.of(0, 0, 0));
         write.write(partition(1), 1, GenericRow.of(1, 1, 0));
@@ -188,7 +186,6 @@ public class BucketedAppendFileStoreWriteTest {
 
         BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite) 
table.store().newWrite("ss");
         StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
-        write.withExecutionMode(false);
 
         for (int i = 0; i < 100; i++) {
             if (i == 0) {
@@ -220,7 +217,6 @@ public class BucketedAppendFileStoreWriteTest {
 
         BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite) 
table.store().newWrite("ss");
         StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
-        write.withExecutionMode(false);
 
         for (int i = 0; i < 100; i++) {
             write.write(nullPartition(), i, GenericRow.of(null, i, i));
@@ -247,7 +243,6 @@ public class BucketedAppendFileStoreWriteTest {
 
         BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite) 
table.store().newWrite("ss");
         StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
-        write.withExecutionMode(false);
 
         for (int i = 0; i < 100; i++) {
             write.write(partition(1), i, GenericRow.of(null, i, i));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 163fc6b578..9c65f221aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -144,7 +144,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                 table.newWrite(commitUser, state.getSubtaskId())
                         .withIOManager(paimonIOManager)
                         .withIgnorePreviousFiles(ignorePreviousFiles)
-                        .withExecutionMode(isStreamingMode)
                         .withBucketMode(table.bucketMode());
 
         if (metricGroup != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
deleted file mode 100644
index 5f21858e61..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.FileIOFinder;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.KeyValueFileStoreWrite;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.nio.file.Path;
-import java.util.Collections;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test class for {@link FlinkSink}. */
-public class FlinkSinkTest {
-
-    @TempDir Path tempPath;
-
-    @Test
-    public void testOptimizeKeyValueWriterForBatch() throws Exception {
-        // test for batch mode auto enable spillable
-        FileStoreTable fileStoreTable = createFileStoreTable();
-        StreamExecutionEnvironment streamExecutionEnvironment =
-                StreamExecutionEnvironment.getExecutionEnvironment();
-
-        // set this when batch executing
-        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        assertThat(testSpillable(streamExecutionEnvironment, 
fileStoreTable)).isTrue();
-
-        // set this to streaming, we should get a false then
-        
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        assertThat(testSpillable(streamExecutionEnvironment, 
fileStoreTable)).isFalse();
-    }
-
-    private boolean testSpillable(
-            StreamExecutionEnvironment streamExecutionEnvironment, 
FileStoreTable fileStoreTable)
-            throws Exception {
-        DataStreamSource<InternalRow> source =
-                streamExecutionEnvironment.fromCollection(
-                        Collections.singletonList(GenericRow.of(1, 1)));
-        FlinkSink<InternalRow> flinkSink = new FixedBucketSink(fileStoreTable, 
null, null);
-        DataStream<Committable> written = flinkSink.doWrite(source, "123", 1);
-        OneInputStreamOperatorFactory<InternalRow, Committable> 
operatorFactory =
-                (OneInputStreamOperatorFactory<InternalRow, Committable>)
-                        ((OneInputTransformation<InternalRow, Committable>)
-                                        written.getTransformation())
-                                .getOperatorFactory();
-
-        TypeSerializer<Committable> serializer =
-                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
-        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
-                new OneInputStreamOperatorTestHarness<>(operatorFactory);
-        harness.setup(serializer);
-        harness.initializeEmptyState();
-
-        RowDataStoreWriteOperator operator =
-                (RowDataStoreWriteOperator) harness.getOneInputOperator();
-
-        return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl) 
operator.write).write.getWrite())
-                .bufferSpillable();
-    }
-
-    protected static final RowType ROW_TYPE =
-            RowType.of(
-                    new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"pk", "pt0"});
-
-    private FileStoreTable createFileStoreTable() throws Exception {
-        org.apache.paimon.fs.Path tablePath = new 
org.apache.paimon.fs.Path(tempPath.toString());
-        Options options = new Options();
-        options.set(CoreOptions.PATH, tablePath.toString());
-        options.set(CoreOptions.BUCKET, 1);
-        TableSchema tableSchema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath),
-                        new Schema(
-                                ROW_TYPE.getFields(),
-                                Collections.emptyList(),
-                                Collections.singletonList("pk"),
-                                options.toMap(),
-                                ""));
-        return FileStoreTableFactory.create(
-                FileIOFinder.find(tablePath),
-                tablePath,
-                tableSchema,
-                options,
-                CatalogEnvironment.empty());
-    }
-}

Reply via email to