This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3194143693 [core] Enable Auto Spill in Streaming mode (#5857)
3194143693 is described below
commit 31941436936c09df696e0c1d3f1d221522a61b80
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 10 17:35:32 2025 +0800
[core] Enable Auto Spill in Streaming mode (#5857)
---
.../shortcodes/generated/core_configuration.html | 4 +-
.../main/java/org/apache/paimon/CoreOptions.java | 15 +--
.../paimon/operation/AbstractFileStoreWrite.java | 8 +-
.../paimon/operation/BaseAppendFileStoreWrite.java | 3 +-
.../apache/paimon/operation/FileStoreWrite.java | 7 --
.../paimon/operation/KeyValueFileStoreWrite.java | 8 +-
.../paimon/table/sink/BatchWriteBuilderImpl.java | 4 +-
.../apache/paimon/table/sink/InnerTableWrite.java | 3 -
.../apache/paimon/table/sink/TableWriteImpl.java | 6 -
.../apache/paimon/append/AppendOnlyWriterTest.java | 48 --------
.../BucketedAppendFileStoreWriteTest.java | 5 -
.../paimon/flink/sink/StoreSinkWriteImpl.java | 1 -
.../apache/paimon/flink/sink/FlinkSinkTest.java | 130 ---------------------
13 files changed, 10 insertions(+), 232 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 87afc279b4..f39ffdbcd8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1166,9 +1166,9 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
</tr>
<tr>
<td><h5>write-buffer-spillable</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
- <td>Whether the write buffer can be spillable. Enabled by default
when using object storage or when 'target-file-size' is greater than
'write-buffer-size'.</td>
+ <td>Whether the write buffer can be spillable.</td>
</tr>
<tr>
<td><h5>write-max-writers-to-spill</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 7f2dc0217c..7fa95afcd5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -546,9 +546,8 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE =
key("write-buffer-spillable")
.booleanType()
- .noDefaultValue()
- .withDescription(
- "Whether the write buffer can be spillable.
Enabled by default when using object storage or when 'target-file-size' is
greater than 'write-buffer-size'.");
+ .defaultValue(true)
+ .withDescription("Whether the write buffer can be
spillable.");
public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
key("write-buffer-for-append")
@@ -2172,14 +2171,8 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_BUFFER_SIZE).getBytes();
}
- public boolean writeBufferSpillable(
- boolean usingObjectStore, boolean isStreaming, boolean
hasPrimaryKey) {
- // if not streaming mode, we turn spillable on by default.
- return options.getOptional(WRITE_BUFFER_SPILLABLE)
- .orElse(
- usingObjectStore
- || !isStreaming
- || targetFileSize(hasPrimaryKey) >
writeBufferSize());
+ public boolean writeBufferSpillable() {
+ return options.get(WRITE_BUFFER_SPILLABLE);
}
public MemorySize writeBufferSpillDiskSize() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 7a316c1c79..0b8797cd08 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -86,7 +86,6 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
private boolean ignoreNumBucketCheck = false;
- protected boolean isStreamingMode = false;
protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
@@ -416,7 +415,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
LOG.debug("Creating writer for partition {}, bucket {}",
partition, bucket);
}
- if (!isStreamingMode && writerNumber() >= writerNumberMax) {
+ if (writerNumber() >= writerNumberMax) {
try {
forceBufferSpill();
} catch (Exception e) {
@@ -462,11 +461,6 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
previousSnapshot == null ? null : previousSnapshot.id());
}
- @Override
- public void withExecutionMode(boolean isStreamingMode) {
- this.isStreamingMode = isStreamingMode;
- }
-
@Override
public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry)
{
this.compactionMetrics = new CompactionMetrics(metricRegistry,
tableName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index ed75afdf2b..27e1e21b90 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -127,8 +127,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
pathFactory.createDataFilePathFactory(partition, bucket),
restoreIncrement,
options.useWriteBufferForAppend() || forceBufferSpill,
- options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode, false)
- || forceBufferSpill,
+ options.writeBufferSpillable() || forceBufferSpill,
options.fileCompression(),
options.spillCompressOptions(),
statsCollectors,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index a5538dbe71..1b85e4f144 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -81,13 +81,6 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
*/
void withIgnoreNumBucketCheck(boolean ignoreNumBucketCheck);
- /**
- * We detect whether it is in batch mode, if so, we do some optimization.
- *
- * @param isStreamingMode whether in streaming mode
- */
- void withExecutionMode(boolean isStreamingMode);
-
/** With metrics to measure compaction. */
FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index eac7e1888e..e337db7e63 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -23,7 +23,6 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
@@ -203,7 +202,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
partition, bucket, compactStrategy, compactExecutor,
levels, dvMaintainer);
return new MergeTreeWriter(
- bufferSpillable(),
+ options.writeBufferSpillable(),
options.writeBufferSpillDiskSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
@@ -219,11 +218,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
UserDefinedSeqComparator.create(valueType, options));
}
- @VisibleForTesting
- public boolean bufferSpillable() {
- return options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode, true);
- }
-
private CompactStrategy createCompactStrategy(CoreOptions options) {
if (options.needLookup()) {
if
(CoreOptions.LookupCompactMode.RADICAL.equals(options.lookupCompact())) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 999de4ec96..beeb1decbe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -67,9 +67,7 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
@Override
public BatchTableWrite newWrite() {
- return table.newWrite(commitUser)
- .withIgnorePreviousFiles(staticPartition != null)
- .withExecutionMode(false);
+ return
table.newWrite(commitUser).withIgnorePreviousFiles(staticPartition != null);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index 1fc0191976..754577eeab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -26,7 +26,4 @@ public interface InnerTableWrite extends StreamTableWrite,
BatchTableWrite {
InnerTableWrite withWriteRestore(WriteRestore writeRestore);
InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
-
- // we detect whether in streaming mode, and do some optimization
- InnerTableWrite withExecutionMode(boolean isStreamingMode);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 3cc84390a7..a302105799 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -102,12 +102,6 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
return this;
}
- @Override
- public TableWriteImpl<T> withExecutionMode(boolean isStreamingMode) {
- write.withExecutionMode(isStreamingMode);
- return this;
- }
-
@Override
public TableWriteImpl<T> withIOManager(IOManager ioManager) {
write.withIOManager(ioManager);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 8cb293cd8e..69d4e5c11f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -408,54 +408,6 @@ public class AppendOnlyWriterTest {
writer.close();
}
- @Test
- public void tesWriteBufferSpillAutoEnabled() {
- HashMap<String, String> map = new HashMap<>();
- // This is the default behavior,no object store and streaming mode.
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
- .isFalse();
-
- // Using object store.
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(true, true,
false))
- .isTrue();
-
- // Batch mode.
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
false, false))
- .isTrue();
-
- // Append only table.
- map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "200 MB");
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
false, false))
- .isTrue();
-
- // Primary key table.
- map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "100 MB");
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
false, true))
- .isTrue();
-
- // targetFileSize is greater than write buffer size.
- map.clear();
- map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
- map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
- .isTrue();
-
- // target-file-size is smaller than write-buffer-size.
- map.clear();
- map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1 b");
- map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "2 b");
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
- .isFalse();
-
- // Set to false manually.
- map.clear();
- map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
- map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
- map.put(CoreOptions.WRITE_BUFFER_SPILLABLE.key(), "false");
-
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
- .isFalse();
- }
-
@Test
public void testMultipleFlush() throws Exception {
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
index 39f2ba79d6..af1becc465 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
@@ -66,7 +66,6 @@ public class BucketedAppendFileStoreWriteTest {
BucketedAppendFileStoreWrite write =
(BucketedAppendFileStoreWrite) table.store().newWrite("ss");
write.withIOManager(IOManager.create(tempDir.toString()));
- write.withExecutionMode(false);
write.write(partition(0), 0, GenericRow.of(0, 0, 0));
write.write(partition(1), 1, GenericRow.of(1, 1, 0));
@@ -119,7 +118,6 @@ public class BucketedAppendFileStoreWriteTest {
FileStoreTable table = createFileStoreTable();
BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite)
table.store().newWrite("ss");
- write.withExecutionMode(false);
write.write(partition(0), 0, GenericRow.of(0, 0, 0));
write.write(partition(1), 1, GenericRow.of(1, 1, 0));
@@ -188,7 +186,6 @@ public class BucketedAppendFileStoreWriteTest {
BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite)
table.store().newWrite("ss");
StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
- write.withExecutionMode(false);
for (int i = 0; i < 100; i++) {
if (i == 0) {
@@ -220,7 +217,6 @@ public class BucketedAppendFileStoreWriteTest {
BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite)
table.store().newWrite("ss");
StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
- write.withExecutionMode(false);
for (int i = 0; i < 100; i++) {
write.write(nullPartition(), i, GenericRow.of(null, i, i));
@@ -247,7 +243,6 @@ public class BucketedAppendFileStoreWriteTest {
BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite)
table.store().newWrite("ss");
StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
- write.withExecutionMode(false);
for (int i = 0; i < 100; i++) {
write.write(partition(1), i, GenericRow.of(null, i, i));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 163fc6b578..9c65f221aa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -144,7 +144,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
table.newWrite(commitUser, state.getSubtaskId())
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
- .withExecutionMode(isStreamingMode)
.withBucketMode(table.bucketMode());
if (metricGroup != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
deleted file mode 100644
index 5f21858e61..0000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.FileIOFinder;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.KeyValueFileStoreWrite;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.nio.file.Path;
-import java.util.Collections;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test class for {@link FlinkSink}. */
-public class FlinkSinkTest {
-
- @TempDir Path tempPath;
-
- @Test
- public void testOptimizeKeyValueWriterForBatch() throws Exception {
- // test for batch mode auto enable spillable
- FileStoreTable fileStoreTable = createFileStoreTable();
- StreamExecutionEnvironment streamExecutionEnvironment =
- StreamExecutionEnvironment.getExecutionEnvironment();
-
- // set this when batch executing
- streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
- assertThat(testSpillable(streamExecutionEnvironment,
fileStoreTable)).isTrue();
-
- // set this to streaming, we should get a false then
-
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
- assertThat(testSpillable(streamExecutionEnvironment,
fileStoreTable)).isFalse();
- }
-
- private boolean testSpillable(
- StreamExecutionEnvironment streamExecutionEnvironment,
FileStoreTable fileStoreTable)
- throws Exception {
- DataStreamSource<InternalRow> source =
- streamExecutionEnvironment.fromCollection(
- Collections.singletonList(GenericRow.of(1, 1)));
- FlinkSink<InternalRow> flinkSink = new FixedBucketSink(fileStoreTable,
null, null);
- DataStream<Committable> written = flinkSink.doWrite(source, "123", 1);
- OneInputStreamOperatorFactory<InternalRow, Committable>
operatorFactory =
- (OneInputStreamOperatorFactory<InternalRow, Committable>)
- ((OneInputTransformation<InternalRow, Committable>)
- written.getTransformation())
- .getOperatorFactory();
-
- TypeSerializer<Committable> serializer =
- new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
- OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
- new OneInputStreamOperatorTestHarness<>(operatorFactory);
- harness.setup(serializer);
- harness.initializeEmptyState();
-
- RowDataStoreWriteOperator operator =
- (RowDataStoreWriteOperator) harness.getOneInputOperator();
-
- return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl)
operator.write).write.getWrite())
- .bufferSpillable();
- }
-
- protected static final RowType ROW_TYPE =
- RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"pk", "pt0"});
-
- private FileStoreTable createFileStoreTable() throws Exception {
- org.apache.paimon.fs.Path tablePath = new
org.apache.paimon.fs.Path(tempPath.toString());
- Options options = new Options();
- options.set(CoreOptions.PATH, tablePath.toString());
- options.set(CoreOptions.BUCKET, 1);
- TableSchema tableSchema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(
- ROW_TYPE.getFields(),
- Collections.emptyList(),
- Collections.singletonList("pk"),
- options.toMap(),
- ""));
- return FileStoreTableFactory.create(
- FileIOFinder.find(tablePath),
- tablePath,
- tableSchema,
- options,
- CatalogEnvironment.empty());
- }
-}