This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a9532bd175 [flink] Postpone bucket table supports to batch write fixed
bucket (#6460)
a9532bd175 is described below
commit a9532bd175fe94b0bc907fd94f8d9bd192aff605
Author: yuzelin <[email protected]>
AuthorDate: Fri Oct 31 10:06:47 2025 +0800
[flink] Postpone bucket table supports to batch write fixed bucket (#6460)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 11 +
.../apache/paimon/table/sink/TableWriteImpl.java | 2 -
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 63 ++-
.../apache/paimon/flink/sink/FlinkWriteSink.java | 26 +-
.../flink/sink/PostponeBatchWriteOperator.java | 118 ++++++
.../sink/PostponeFixedBucketChannelComputer.java | 66 +++
.../paimon/flink/sink/PostponeFixedBucketSink.java | 93 +++++
.../flink/sink/RowDataStoreWriteOperator.java | 16 +-
.../sink/StatelessRowDataStoreWriteOperator.java | 58 +++
.../paimon/flink/sink/StoreSinkWriteImpl.java | 2 -
.../paimon/flink/CrossPartitionTableITCase.java | 2 +-
.../paimon/flink/PostponeBucketTableITCase.java | 450 ++++++++++++++++++++-
13 files changed, 859 insertions(+), 54 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9a5f3c99ad..d26fa9b59d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -941,6 +941,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>String</td>
<td>You can specify a pattern to get a timestamp from partitions.
The formatter pattern is defined by 'partition.timestamp-formatter'.<ul><li>By
default, read from the first field.</li><li>If the timestamp in the partition
is a single field called 'dt', you can use '$dt'.</li><li>If it is spread
across multiple fields for year, month, day, and hour, you can use
'$year-$month-$day $hour:00:00'.</li><li>If the timestamp is in fields dt and
hour, you can use '$dt $hour:00:00'.</ [...]
</tr>
+ <tr>
+ <td><h5>postpone.batch-write-fixed-bucket</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to write the data into fixed bucket for batch writing
a postpone bucket table.</td>
+ </tr>
<tr>
<td><h5>primary-key</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index bac0d58fd9..f5c90b4196 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2042,6 +2042,13 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether discard duplicate files in
commit.");
+ public static final ConfigOption<Boolean>
POSTPONE_BATCH_WRITE_FIXED_BUCKET =
+ key("postpone.batch-write-fixed-bucket")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to write the data into fixed bucket for
batch writing a postpone bucket table.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -3143,6 +3150,10 @@ public class CoreOptions implements Serializable {
return options.get(BLOB_AS_DESCRIPTOR);
}
+ public boolean postponeBatchWriteFixedBucket() {
+ return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 325911b0ea..8d5757dcd3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -19,7 +19,6 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.FileStore;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.casting.DefaultValueRow;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
@@ -277,7 +276,6 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
write.restore(state);
}
- @VisibleForTesting
public FileStoreWrite<T> getWrite() {
return write;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index af3d407d95..0d50178dd5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -22,12 +22,14 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -54,6 +56,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.CoreOptions.clusteringStrategy;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -292,16 +296,59 @@ public class FlinkSinkBuilder {
}
private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow>
input) {
- ChannelComputer<InternalRow> channelComputer;
- if (!table.partitionKeys().isEmpty()
- && table.coreOptions().partitionSinkStrategy() ==
PartitionSinkStrategy.HASH) {
- channelComputer = new
RowDataHashPartitionChannelComputer(table.schema());
+ if (isStreaming(input) ||
!table.coreOptions().postponeBatchWriteFixedBucket()) {
+ ChannelComputer<InternalRow> channelComputer;
+ if (!table.partitionKeys().isEmpty()
+ && table.coreOptions().partitionSinkStrategy() ==
PartitionSinkStrategy.HASH) {
+ channelComputer = new
RowDataHashPartitionChannelComputer(table.schema());
+ } else {
+ channelComputer = new
PostponeBucketChannelComputer(table.schema());
+ }
+ DataStream<InternalRow> partitioned = partition(input,
channelComputer, parallelism);
+ PostponeBucketSink sink = new PostponeBucketSink(table,
overwritePartition);
+ return sink.sinkFrom(partitioned);
} else {
- channelComputer = new
PostponeBucketChannelComputer(table.schema());
+ Map<BinaryRow, Integer> knownNumBuckets = getKnownNumBuckets();
+
+ DataStream<InternalRow> partitioned =
+ partition(
+ input,
+ new
PostponeFixedBucketChannelComputer(table.schema(), knownNumBuckets),
+ parallelism);
+
+ Map<String, String> batchWriteOptions = new HashMap<>();
+ batchWriteOptions.put(WRITE_ONLY.key(), "true");
+ // It's just used to create merge tree writer for writing files to
fixed bucket.
+ // The real bucket number is determined at runtime.
+ batchWriteOptions.put(BUCKET.key(), "1");
+ FileStoreTable tableForWrite = table.copy(batchWriteOptions);
+
+ PostponeFixedBucketSink sink =
+ new PostponeFixedBucketSink(tableForWrite,
overwritePartition, knownNumBuckets);
+ return sink.sinkFrom(partitioned);
}
- DataStream<InternalRow> partitioned = partition(input,
channelComputer, parallelism);
- PostponeBucketSink sink = new PostponeBucketSink(table,
overwritePartition);
- return sink.sinkFrom(partitioned);
+ }
+
+ private Map<BinaryRow, Integer> getKnownNumBuckets() {
+ Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+ List<SimpleFileEntry> simpleFileEntries =
+
table.store().newScan().onlyReadRealBuckets().readSimpleEntries();
+ for (SimpleFileEntry entry : simpleFileEntries) {
+ if (entry.totalBuckets() >= 0) {
+ Integer oldTotalBuckets =
+ knownNumBuckets.put(entry.partition(),
entry.totalBuckets());
+ if (oldTotalBuckets != null && oldTotalBuckets !=
entry.totalBuckets()) {
+ throw new IllegalStateException(
+ "Partition "
+ + entry.partition()
+ + " has different totalBuckets "
+ + oldTotalBuckets
+ + " and "
+ + entry.totalBuckets());
+ }
+ }
+ }
+ return knownNumBuckets;
}
private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow>
input) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 1656ba8a8f..d694604627 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -24,7 +24,6 @@ import
org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -40,7 +39,7 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
private static final long serialVersionUID = 1L;
- @Nullable private final Map<String, String> overwritePartition;
+ @Nullable protected final Map<String, String> overwritePartition;
public FlinkWriteSink(FileStoreTable table, @Nullable Map<String, String>
overwritePartition) {
super(table, overwritePartition != null);
@@ -81,27 +80,8 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T>
{
@Override
@SuppressWarnings("unchecked, rawtypes")
public StreamOperator
createStreamOperator(StreamOperatorParameters parameters) {
- return new RowDataStoreWriteOperator(
- parameters, table, logSinkFunction, writeProvider,
commitUser) {
-
- @Override
- protected StoreSinkWriteState createState(
- int subtaskId,
- StateInitializationContext context,
- StoreSinkWriteState.StateValueFilter stateFilter) {
- // No conflicts will occur in append only unaware
bucket writer, so no state
- // is needed.
- return new NoopStoreSinkWriteState(subtaskId);
- }
-
- @Override
- protected String getCommitUser(StateInitializationContext
context)
- throws Exception {
- // No conflicts will occur in append only unaware
bucket writer, so
- // commitUser does not matter.
- return commitUser == null ? initialCommitUser :
commitUser;
- }
- };
+ return new StatelessRowDataStoreWriteOperator(
+ parameters, table, logSinkFunction, writeProvider,
commitUser);
}
};
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
new file mode 100644
index 0000000000..a47925accf
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.bucket.BucketFunction;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.SinkRecord;
+
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Writer to write {@link InternalRow} to postpone fixed bucket. */
+public class PostponeBatchWriteOperator extends
StatelessRowDataStoreWriteOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<BinaryRow, Integer> knownNumBuckets;
+ private final BucketFunction bucketFunction;
+
+ private transient RowPartitionKeyExtractor partitionKeyExtractor;
+ private transient int defaultNumBuckets;
+ private transient Projection bucketKeyProjection;
+
+ public PostponeBatchWriteOperator(
+ StreamOperatorParameters<Committable> parameters,
+ FileStoreTable table,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser,
+ Map<BinaryRow, Integer> knownNumBuckets) {
+ super(parameters, table, null, storeSinkWriteProvider,
initialCommitUser);
+ this.knownNumBuckets = new HashMap<>(knownNumBuckets);
+ this.bucketFunction =
+ BucketFunction.create(
+ new CoreOptions(table.options()),
table.schema().logicalBucketKeyType());
+ }
+
+ public void open() throws Exception {
+ super.open();
+
+ int sinkParallelism =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ this.defaultNumBuckets = sinkParallelism <= 0 ? 1 : sinkParallelism;
+
+ TableSchema schema = table.schema();
+ this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+ this.bucketKeyProjection =
+ CodeGenUtils.newProjection(
+ schema.logicalRowType(),
schema.projection(schema.bucketKeys()));
+ ((StoreSinkWriteImpl)
write).getWrite().getWrite().withIgnoreNumBucketCheck(true);
+ }
+
+ @Override
+ @Nullable
+ protected SinkRecord write(InternalRow row) throws Exception {
+ BinaryRow partition = partitionKeyExtractor.partition(row);
+ int numBuckets = knownNumBuckets.computeIfAbsent(partition.copy(), p
-> defaultNumBuckets);
+ int bucket = bucketFunction.bucket(bucketKeyProjection.apply(row),
numBuckets);
+ return write.write(row, bucket);
+ }
+
+ @Override
+ protected List<Committable> prepareCommit(boolean waitCompaction, long
checkpointId)
+ throws IOException {
+ List<Committable> committables = new ArrayList<>();
+ for (Committable committable : super.prepareCommit(waitCompaction,
checkpointId)) {
+ if (committable.kind() == Committable.Kind.FILE) {
+ CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
+ committables.add(
+ new Committable(
+ committable.checkpointId(),
+ committable.kind(),
+ new CommitMessageImpl(
+ message.partition(),
+ message.bucket(),
+
checkNotNull(knownNumBuckets.get(message.partition())),
+ message.newFilesIncrement(),
+ message.compactIncrement())));
+ } else {
+ committables.add(committable);
+ }
+ }
+
+ return committables;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
new file mode 100644
index 0000000000..5fd55db13e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+
+import java.util.Map;
+
+/**
+ * {@link ChannelComputer} for writing {@link InternalRow}s into the fixed
bucket of postpone-bucket
+ * tables.
+ */
+public class PostponeFixedBucketChannelComputer implements
ChannelComputer<InternalRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+ private final Map<BinaryRow, Integer> knownNumBuckets;
+
+ private transient int numChannels;
+ private transient RowPartitionKeyExtractor partitionKeyExtractor;
+
+ public PostponeFixedBucketChannelComputer(
+ TableSchema schema, Map<BinaryRow, Integer> knownNumBuckets) {
+ this.schema = schema;
+ this.knownNumBuckets = knownNumBuckets;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+ }
+
+ @Override
+ public int channel(InternalRow record) {
+ BinaryRow partition = partitionKeyExtractor.partition(record);
+ int bucket = knownNumBuckets.computeIfAbsent(partition, p ->
numChannels);
+ return ChannelComputer.select(partition, bucket, numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by bucket";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
new file mode 100644
index 0000000000..92763c0cbe
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+
+/** {@link FlinkSink} for writing records into fixed bucket of postpone table.
*/
+public class PostponeFixedBucketSink extends FlinkWriteSink<InternalRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<BinaryRow, Integer> knownNumBuckets;
+
+ public PostponeFixedBucketSink(
+ FileStoreTable table,
+ @Nullable Map<String, String> overwritePartition,
+ Map<BinaryRow, Integer> knownNumBuckets) {
+ super(table, overwritePartition);
+ this.knownNumBuckets = knownNumBuckets;
+ }
+
+ @Override
+ protected OneInputStreamOperatorFactory<InternalRow, Committable>
createWriteOperatorFactory(
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
+ return new RowDataStoreWriteOperator.Factory(table, null,
writeProvider, commitUser) {
+ @Override
+ @SuppressWarnings("unchecked, rawtypes")
+ public StreamOperator
createStreamOperator(StreamOperatorParameters parameters) {
+ return new PostponeBatchWriteOperator(
+ parameters, table, writeProvider, commitUser,
knownNumBuckets);
+ }
+ };
+ }
+
+ @Override
+ protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
+ return createRestoreOnlyCommittableStateManager(table);
+ }
+
+ @Override
+ protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory() {
+ if (overwritePartition == null) {
+ // The table has copied bucket option outside, no need to change
anything
+ return super.createCommitterFactory();
+ } else {
+ // When overwriting, the postpone bucket files need to be deleted,
so using a postpone
+ // bucket table commit here
+ FileStoreTable tableForCommit =
+ table.copy(
+ Collections.singletonMap(
+ BUCKET.key(),
String.valueOf(BucketMode.POSTPONE_BUCKET)));
+ return context ->
+ new StoreCommitter(
+ tableForCommit,
+ tableForCommit
+ .newCommit(context.commitUser())
+ .withOverwrite(overwritePartition)
+
.ignoreEmptyCommit(!context.streamingCheckpointEnabled()),
+ context);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 96ab1be510..4a9b64f7b1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -137,12 +137,7 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
public void processElement(StreamRecord<InternalRow> element) throws
Exception {
sinkContext.timestamp = element.hasTimestamp() ?
element.getTimestamp() : null;
- SinkRecord record;
- try {
- record = write.write(element.getValue());
- } catch (Exception e) {
- throw new IOException(e);
- }
+ SinkRecord record = write(element.getValue());
if (record != null
&& logSinkFunction != null
@@ -153,6 +148,15 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
}
}
+ @Nullable
+ protected SinkRecord write(InternalRow row) throws Exception {
+ try {
+ return write.write(row);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
new file mode 100644
index 0000000000..3c8af5e629
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import javax.annotation.Nullable;
+
+/** Stateless writer used for unaware append and postpone bucket table. */
+public class StatelessRowDataStoreWriteOperator extends
RowDataStoreWriteOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ public StatelessRowDataStoreWriteOperator(
+ StreamOperatorParameters<Committable> parameters,
+ FileStoreTable table,
+ @Nullable LogSinkFunction logSinkFunction,
+ StoreSinkWrite.Provider storeSinkWriteProvider,
+ String initialCommitUser) {
+ super(parameters, table, logSinkFunction, storeSinkWriteProvider,
initialCommitUser);
+ }
+
+ @Override
+ protected StoreSinkWriteState createState(
+ int subtaskId,
+ StateInitializationContext context,
+ StoreSinkWriteState.StateValueFilter stateFilter) {
+ // No conflicts will occur in append only unaware bucket writer, so no
state
+ // is needed.
+ return new NoopStoreSinkWriteState(subtaskId);
+ }
+
+ @Override
+ protected String getCommitUser(StateInitializationContext context) throws
Exception {
+ // No conflicts will occur in append only unaware bucket writer, so
+ // commitUser does not matter.
+ return commitUser == null ? initialCommitUser : commitUser;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 2d3742a95a..73c49be1e0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
@@ -189,7 +188,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
write.restore((List) states);
}
- @VisibleForTesting
public TableWriteImpl<?> getWrite() {
return write;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
index 98747a31d6..ddad7c5e62 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CrossPartitionTableITCase.java
@@ -198,7 +198,7 @@ public class CrossPartitionTableITCase extends
CatalogITCaseBase {
throws ExecutionException, InterruptedException {
sql(
"create table cross_postpone (pt int, k int, v int, primary
key (k) not enforced) "
- + "partitioned by (pt) with ('bucket' = '-2')");
+ + "partitioned by (pt) with ('bucket' = '-2',
'postpone.batch-write-fixed-bucket' = 'false')");
sql("insert into cross_postpone values (1, 1, 1)");
sql("insert into cross_postpone values (2, 2, 2)");
tEnv.executeSql("CALL sys.compact(`table` =>
'default.cross_postpone')").await();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index be19cdff42..c6b832eebe 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink;
import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
@@ -30,11 +32,17 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for postpone bucket tables. */
@@ -71,9 +79,22 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " 'merge-engine' = 'partial-update',\n"
+ " 'rowkind.field' = 'row_kind_col'\n"
+ ")");
- assertThatThrownBy(() -> tEnv.executeSql("INSERT INTO T VALUES (1, 1,
1, '-D')").await())
- .rootCause()
- .hasMessageContaining("By default, Partial update can not
accept delete records");
+
+ boolean writeFixedBucket = ThreadLocalRandom.current().nextBoolean();
+ tEnv.executeSql(
+ String.format(
+ "ALTER TABLE T SET
('postpone.batch-write-fixed-bucket' = '%s')",
+ writeFixedBucket));
+ if (writeFixedBucket) {
+ assertThatCode(() -> tEnv.executeSql("INSERT INTO T VALUES (1, 1,
1, '-U')").await())
+ .doesNotThrowAnyException();
+ } else {
+ assertThatThrownBy(
+ () -> tEnv.executeSql("INSERT INTO T VALUES (1, 1,
1, '-D')").await())
+ .rootCause()
+ .hasMessageContaining(
+ "By default, Partial update can not accept delete
records");
+ }
}
@Test
@@ -100,7 +121,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " v INT,\n"
+ " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) WITH (\n"
- + " 'bucket' = '-2'\n"
+ + " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
int numPartitions = 3;
@@ -159,7 +181,7 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
}
@Test
- public void testOverwrite() throws Exception {
+ public void testOverwriteWithoutBatchWriteFixedBucket() throws Exception {
String warehouse = getTempDirPath();
TableEnvironment tEnv =
tableEnvironmentBuilder()
@@ -182,7 +204,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " v INT,\n"
+ " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) WITH (\n"
- + " 'bucket' = '-2'\n"
+ + " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
tEnv.executeSql(
@@ -211,6 +234,52 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
"+I[10, 110, 1]", "+I[20, 120, 1]", "+I[20, 221, 2]",
"+I[30, 230, 2]");
}
+ @Test
+ public void testOverwriteWithBatchWriteFixedBucket() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+
+ // write postpone bucket with partition 1 and 2
+ tEnv.executeSql(
+ "INSERT INTO T /*+ OPTIONS
('postpone.batch-write-fixed-bucket' = 'false') */ "
+ + "VALUES (1, 10, 110), (1, 20, 120), (2, 10,
210), (2, 20, 220)")
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM
T"))).isEmpty();
+
+ // batch overite partition 2 and the new data can be read
+ tEnv.executeSql("INSERT OVERWRITE T VALUES (2, 20, 221), (2, 30,
230)").await();
+ assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder("+I[20, 221, 2]", "+I[30, 230, 2]");
+
+ // compact then partition 1 can be read
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(tEnv.executeSql("SELECT k, v, pt FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[10, 110, 1]", "+I[20, 120, 1]", "+I[20, 221, 2]",
"+I[30, 230, 2]");
+ }
+
@Timeout(TIMEOUT)
@Test
public void testLookupChangelogProducer() throws Exception {
@@ -238,7 +307,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) WITH (\n"
+ " 'bucket' = '-2',\n"
- + " 'changelog-producer' = 'lookup'\n"
+ + " 'changelog-producer' = 'lookup',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
TableEnvironment sEnv =
@@ -305,7 +375,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) WITH (\n"
+ " 'bucket' = '-2',\n"
- + " 'postpone.default-bucket-num' = '2'\n"
+ + " 'postpone.default-bucket-num' = '2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
int numKeys = 100;
@@ -461,6 +532,11 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " 'snapshot.num-retained.max' = '3'\n"
+ ")");
+ tEnv.executeSql(
+ String.format(
+ "ALTER TABLE T SET
('postpone.batch-write-fixed-bucket' = '%s')",
+ ThreadLocalRandom.current().nextBoolean()));
+
for (int i = 0; i < 5; i++) {
tEnv.executeSql(String.format("INSERT INTO T VALUES (%d, 0, 0)",
i)).await();
}
@@ -494,7 +570,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " v INT,\n"
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
- + " 'bucket' = '-2'\n"
+ + " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
bEnv.executeSql("CREATE TABLE SRC (i INT, `proctime` AS PROCTIME())");
@@ -557,7 +634,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " v INT,\n"
+ " PRIMARY KEY (k, pt) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) WITH (\n"
- + " 'bucket' = '-2'\n"
+ + " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
bEnv.executeSql("CREATE TABLE SRC (i INT, pt INT, `proctime` AS
PROCTIME())");
@@ -618,6 +696,7 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false',\n"
+ " 'deletion-vectors.enabled' = 'true'\n"
+ ")");
@@ -662,7 +741,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " v INT,\n"
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
- + " 'bucket' = '-2'\n"
+ + " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
tEnv.executeSql(
@@ -696,7 +776,8 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " v TIMESTAMP(9),\n"
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
- + " 'bucket' = '-2'\n"
+ + " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false'\n"
+ ")");
tEnv.executeSql(
@@ -733,6 +814,7 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'bucket' = '-2',\n"
+ + " 'postpone.batch-write-fixed-bucket' = 'false',\n"
+ " 'changelog-producer' = 'none',\n"
+ " 'scan.remove-normalize' = 'true',\n"
+ " 'continuous.discovery-interval' = '1ms'\n"
@@ -773,6 +855,342 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
"+I[5, 53]");
}
+ @Test
+ public void testWriteFixedBucketWithDifferentBucketNumber() throws
Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " pt STRING,"
+ + " PRIMARY KEY (k, pt) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+
+ // use sink.parallelism
+ List<String> values1 = new ArrayList<>();
+ List<String> expected1 = new ArrayList<>();
+ for (int i = 1; i <= 8; i++) {
+ values1.add(String.format("(%d, '%c', 'pt1')", i, (i - 1 + 'a')));
+ values1.add(String.format("(%d, '%c', 'pt2')", i, (i - 1 + 'a')));
+ expected1.add(String.format("+I[%d, %c, pt1]", i, (i - 1 + 'a')));
+ expected1.add(String.format("+I[%d, %c, pt2]", i, (i - 1 + 'a')));
+ }
+ tEnv.executeSql(
+ "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '4')
*/ VALUES "
+ + String.join(",", values1))
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrderElementsOf(expected1);
+ assertThat(
+ collectRow(tEnv.executeSql("SELECT * FROM
`T$buckets`")).stream()
+ .map(row -> (Integer) row.getField(1))
+ .collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(0, 1, 2, 3);
+ assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE
level > 0"))).isEmpty();
+
+ // test bucket number strategy: pt1 = 3, pt2 = 4, pt3 = 5 (runtime)
+ tEnv.executeSql(
+ "CALL sys.rescale(`table` => 'default.T', `bucket_num` => 3,
`partition` => 'pt=pt1')");
+
+ List<String> values2 = new ArrayList<>();
+ List<String> expected2 = new ArrayList<>();
+ for (int i = 1; i <= 8; i++) {
+ values2.add(String.format("(%d, '%c', 'pt1')", i, (i - 1 + 'A')));
+ values2.add(String.format("(%d, '%c', 'pt2')", i, (i - 1 + 'A')));
+ values2.add(String.format("(%d, '%c', 'pt3')", i, (i - 1 + 'A')));
+ expected2.add(String.format("+I[%d, %c, pt1]", i, (i - 1 + 'A')));
+ expected2.add(String.format("+I[%d, %c, pt2]", i, (i - 1 + 'A')));
+ expected2.add(String.format("+I[%d, %c, pt3]", i, (i - 1 + 'A')));
+ }
+ for (int i = 9; i <= 16; i++) {
+ values2.add(String.format("(%d, '%d', 'pt1')", i, i));
+ values2.add(String.format("(%d, '%d', 'pt2')", i, i));
+ values2.add(String.format("(%d, '%d', 'pt3')", i, i));
+ expected2.add(String.format("+I[%d, %d, pt1]", i, i));
+ expected2.add(String.format("+I[%d, %d, pt2]", i, i));
+ expected2.add(String.format("+I[%d, %d, pt3]", i, i));
+ }
+
+ tEnv.executeSql(
+ "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '5')
*/ VALUES "
+ + String.join(",", values2))
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrderElementsOf(expected2);
+ Map<String, Set<Integer>> partitionBuckets = new HashMap<>();
+ for (Row row : collectRow(tEnv.executeSql("SELECT * FROM
`T$buckets`"))) {
+ partitionBuckets
+ .computeIfAbsent((String) row.getField(0), p -> new
HashSet<>())
+ .add((Integer) row.getField(1));
+ }
+ assertThat(partitionBuckets).hasSize(3);
+ assertThat(partitionBuckets.get("{pt1}")).containsExactly(0, 1, 2);
+ assertThat(partitionBuckets.get("{pt2}")).containsExactly(0, 1, 2, 3);
+ assertThat(partitionBuckets.get("{pt3}")).containsExactly(0, 1, 2, 3,
4);
+ assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE
level > 0"))).isEmpty();
+ }
+
+ @Test
+ public void testWriteFixedBucketThenWritePostponeBucket() throws Exception
{
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+
+ // use sink.parallelism
+ tEnv.executeSql(
+ "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '4')
*/ "
+ + "VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4,
'd'), (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h');")
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[1, a]",
+ "+I[2, b]",
+ "+I[3, c]",
+ "+I[4, d]",
+ "+I[5, e]",
+ "+I[6, f]",
+ "+I[7, g]",
+ "+I[8, h]");
+ assertThat(
+ collectRow(tEnv.executeSql("SELECT * FROM
`T$buckets`")).stream()
+ .map(row -> (Integer) row.getField(1))
+ .collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(0, 1, 2, 3);
+ assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE
level > 0"))).isEmpty();
+
+ // write to postpone bucket, new record cannot be read before compact
+ tEnv.executeSql(
+ "INSERT INTO T /*+
OPTIONS('postpone.batch-write-fixed-bucket' = 'false') */ "
+ + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4,
'D'), (5, 'E'), (6, 'F'), (7, 'G'), (8, 'H'), "
+ + "(9, '9'), (10, '10'), (11, '11'), (12,
'12'), (13, '13'), (14, '14'), (15, '15'), (16, '16');")
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[1, a]",
+ "+I[2, b]",
+ "+I[3, c]",
+ "+I[4, d]",
+ "+I[5, e]",
+ "+I[6, f]",
+ "+I[7, g]",
+ "+I[8, h]");
+ assertThat(
+ collectRow(tEnv.executeSql("SELECT * FROM
`T$buckets`")).stream()
+ .map(row -> (Integer) row.getField(1))
+ .collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(-2, 0, 1, 2, 3);
+ assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE
level > 0"))).isEmpty();
+
+ // compact and check result again
+ boolean forceUpLevel0 = ThreadLocalRandom.current().nextBoolean();
+ if (forceUpLevel0) {
+ tEnv.executeSql("ALTER TABLE T set ('compaction.force-up-level-0'
= 'true')").await();
+ }
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[1, A]",
+ "+I[2, B]",
+ "+I[3, C]",
+ "+I[4, D]",
+ "+I[5, E]",
+ "+I[6, F]",
+ "+I[7, G]",
+ "+I[8, H]",
+ "+I[9, 9]",
+ "+I[10, 10]",
+ "+I[11, 11]",
+ "+I[12, 12]",
+ "+I[13, 13]",
+ "+I[14, 14]",
+ "+I[15, 15]",
+ "+I[16, 16]");
+ assertThat(
+ collectRow(tEnv.executeSql("SELECT * FROM
`T$buckets`")).stream()
+ .map(row -> (Integer) row.getField(1))
+ .collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(0, 1, 2, 3);
+ if (forceUpLevel0) {
+ assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE
level = 0")))
+ .isEmpty();
+ }
+ }
+
+ @Test
+ public void testWriteWritePostponeBucketThenWriteFixedBucket() throws
Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+
+ tEnv.executeSql(
+ "INSERT INTO T /*+
OPTIONS('postpone.batch-write-fixed-bucket' = 'false') */ "
+ + "VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4,
'd'), (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h');")
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+
+ // use sink.parallelism
+ tEnv.executeSql(
+ "INSERT INTO T /*+ OPTIONS('sink.parallelism' = '4')
*/ "
+ + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4,
'D'), (5, 'E'), (6, 'F'), (7, 'G'), (8, 'H'), "
+ + "(9, '9'), (10, '10'), (11, '11'), (12,
'12'), (13, '13'), (14, '14'), (15, '15'), (16, '16');")
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[1, A]",
+ "+I[2, B]",
+ "+I[3, C]",
+ "+I[4, D]",
+ "+I[5, E]",
+ "+I[6, F]",
+ "+I[7, G]",
+ "+I[8, H]",
+ "+I[9, 9]",
+ "+I[10, 10]",
+ "+I[11, 11]",
+ "+I[12, 12]",
+ "+I[13, 13]",
+ "+I[14, 14]",
+ "+I[15, 15]",
+ "+I[16, 16]");
+ assertThat(
+ collectRow(tEnv.executeSql("SELECT * FROM
`T$buckets`")).stream()
+ .map(row -> (Integer) row.getField(1))
+ .collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(-2, 0, 1, 2, 3);
+ assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE
level > 0"))).isEmpty();
+
+ // compact and check result again
+ boolean forceUpLevel0 = ThreadLocalRandom.current().nextBoolean();
+ if (forceUpLevel0) {
+ tEnv.executeSql("ALTER TABLE T set ('compaction.force-up-level-0'
= 'true')").await();
+ }
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder(
+ "+I[1, a]",
+ "+I[2, b]",
+ "+I[3, c]",
+ "+I[4, d]",
+ "+I[5, e]",
+ "+I[6, f]",
+ "+I[7, g]",
+ "+I[8, h]",
+ "+I[9, 9]",
+ "+I[10, 10]",
+ "+I[11, 11]",
+ "+I[12, 12]",
+ "+I[13, 13]",
+ "+I[14, 14]",
+ "+I[15, 15]",
+ "+I[16, 16]");
+ assertThat(
+ collectRow(tEnv.executeSql("SELECT * FROM
`T$buckets`")).stream()
+ .map(row -> (Integer) row.getField(1))
+ .collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(0, 1, 2, 3);
+ if (forceUpLevel0) {
+ assertThat(collect(tEnv.executeSql("SELECT * FROM `T$files` WHERE
level = 0")))
+ .isEmpty();
+ }
+ }
+
+ @Test
+ public void testCompactPostponeThenWriteFixedBucket() throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '-2'\n"
+ + ")");
+
+ tEnv.executeSql(
+ "INSERT INTO T /*+
OPTIONS('postpone.batch-write-fixed-bucket' = 'false') */ VALUES (1, 'a')")
+ .await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
+
+ tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder("+I[1, a]");
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'A')").await();
+ assertThat(collect(tEnv.executeSql("SELECT * FROM T")))
+ .containsExactlyInAnyOrder("+I[1, A]");
+ }
+
private List<String> collect(TableResult result) throws Exception {
List<String> ret = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
@@ -812,4 +1230,12 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
timerThread.join();
return ret;
}
+
+ private List<Row> collectRow(TableResult result) {
+ try (CloseableIterator<Row> iter = result.collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}