This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 86b6350e8 [core] remove Dynamic Bucket Row (#2541)
86b6350e8 is described below
commit 86b6350e8fa86ef9a09c8b1bbca45196074c8739
Author: YeJunHao <[email protected]>
AuthorDate: Wed Dec 20 17:14:55 2023 +0800
[core] remove Dynamic Bucket Row (#2541)
---
.../apache/paimon/table/sink/DynamicBucketRow.java | 133 ---------------------
.../table/sink/DynamicBucketRowKeyExtractor.java | 10 +-
.../org/apache/paimon/table/sink/TableWrite.java | 3 +
.../apache/paimon/table/sink/TableWriteImpl.java | 20 ++++
.../paimon/crosspartition/IndexBootstrapTest.java | 6 +-
.../paimon/index/HashIndexMaintainerTest.java | 28 +++--
.../paimon/table/DynamicBucketTableTest.java | 12 +-
.../paimon/table/IndexFileExpireTableTest.java | 25 ++--
.../org/apache/paimon/table/TableTestBase.java | 12 ++
.../sink/cdc/CdcDynamicBucketWriteOperator.java | 3 +-
.../flink/sink/DynamicBucketRowWriteOperator.java | 4 +-
.../apache/paimon/flink/sink/StoreSinkWrite.java | 2 +
.../paimon/flink/sink/StoreSinkWriteImpl.java | 5 +
.../SortCompactActionForDynamicBucketITCase.java | 9 +-
.../flink/sink/StoreCompactOperatorTest.java | 5 +
.../spark/commands/WriteIntoPaimonTable.scala | 4 +-
16 files changed, 101 insertions(+), 180 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
deleted file mode 100644
index 73394a9f5..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.sink;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.RowKind;
-
-/** An {@link InternalRow} wraps another {@link InternalRow} with bucket. */
-public class DynamicBucketRow implements InternalRow {
-
- private final InternalRow row;
- private final int bucket;
-
- public DynamicBucketRow(InternalRow row, int bucket) {
- this.row = row;
- this.bucket = bucket;
- }
-
- public int bucket() {
- return bucket;
- }
-
- @Override
- public int getFieldCount() {
- return row.getFieldCount();
- }
-
- @Override
- public RowKind getRowKind() {
- return row.getRowKind();
- }
-
- @Override
- public void setRowKind(RowKind kind) {
- row.setRowKind(kind);
- }
-
- @Override
- public boolean isNullAt(int pos) {
- return row.isNullAt(pos);
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return row.getBoolean(pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return row.getByte(pos);
- }
-
- @Override
- public short getShort(int pos) {
- return row.getShort(pos);
- }
-
- @Override
- public int getInt(int pos) {
- return row.getInt(pos);
- }
-
- @Override
- public long getLong(int pos) {
- return row.getLong(pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return row.getFloat(pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return row.getDouble(pos);
- }
-
- @Override
- public BinaryString getString(int pos) {
- return row.getString(pos);
- }
-
- @Override
- public Decimal getDecimal(int pos, int precision, int scale) {
- return row.getDecimal(pos, precision, scale);
- }
-
- @Override
- public Timestamp getTimestamp(int pos, int precision) {
- return row.getTimestamp(pos, precision);
- }
-
- @Override
- public byte[] getBinary(int pos) {
- return row.getBinary(pos);
- }
-
- @Override
- public InternalArray getArray(int pos) {
- return row.getArray(pos);
- }
-
- @Override
- public InternalMap getMap(int pos) {
- return row.getMap(pos);
- }
-
- @Override
- public InternalRow getRow(int pos, int numFields) {
- return row.getRow(pos, numFields);
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
index 6ae14b5f7..5f4734bf5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
@@ -25,8 +25,8 @@ import org.apache.paimon.schema.TableSchema;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
- * {@link KeyAndBucketExtractor} for {@link InternalRow} with extracting
bucket from {@link
- * DynamicBucketRow}.
+ * {@link KeyAndBucketExtractor} for {@link InternalRow}, just throws error
when extract bucket from
+ * dynamic row.
*/
public class DynamicBucketRowKeyExtractor extends RowKeyExtractor {
@@ -41,10 +41,6 @@ public class DynamicBucketRowKeyExtractor extends
RowKeyExtractor {
@Override
public int bucket() {
- if (record instanceof DynamicBucketRow) {
- return ((DynamicBucketRow) record).bucket();
- }
- throw new IllegalArgumentException(
- "Only supports DynamicBucketRow, illegal record type: " +
record.getClass());
+ throw new IllegalArgumentException("Can't extract bucket from row in
dynamic bucket mode.");
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index bf61cc57b..0f25f8219 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -49,6 +49,9 @@ public interface TableWrite extends AutoCloseable {
/** Write a row to the writer. */
void write(InternalRow row) throws Exception;
+ /** Write a row with bucket. */
+ void write(InternalRow row, int bucket) throws Exception;
+
/**
* Compact a bucket of a partition. By default, it will determine whether
to perform the
* compaction according to the 'num-sorted-run.compaction-trigger' option.
If fullCompaction is
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 25c29be9f..1c859bf03 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -116,12 +116,23 @@ public class TableWriteImpl<T> implements
InnerTableWrite, Restorable<List<State
writeAndReturn(row);
}
+ @Override
+ public void write(InternalRow row, int bucket) throws Exception {
+ writeAndReturn(row, bucket);
+ }
+
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
SinkRecord record = toSinkRecord(row);
write.write(record.partition(), record.bucket(),
recordExtractor.extract(record));
return record;
}
+ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws
Exception {
+ SinkRecord record = toSinkRecord(row, bucket);
+ write.write(record.partition(), bucket,
recordExtractor.extract(record));
+ return record;
+ }
+
@VisibleForTesting
public T writeAndReturnData(InternalRow row) throws Exception {
SinkRecord record = toSinkRecord(row);
@@ -139,6 +150,15 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
row);
}
+ private SinkRecord toSinkRecord(InternalRow row, int bucket) {
+ keyAndBucketExtractor.setRecord(row);
+ return new SinkRecord(
+ keyAndBucketExtractor.partition(),
+ bucket,
+ keyAndBucketExtractor.trimmedPrimaryKey(),
+ row);
+ }
+
public SinkRecord toLogRecord(SinkRecord record) {
keyAndBucketExtractor.setRecord(record.row());
return new SinkRecord(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 976466464..cd9568d46 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -28,9 +28,9 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
-import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Test;
@@ -135,8 +135,8 @@ public class IndexBootstrapTest extends TableTestBase {
.toLocalDateTime()));
}
- private DynamicBucketRow row(int pt, int col, int pk, int bucket) {
+ private Pair<InternalRow, Integer> row(int pt, int col, int pk, int
bucket) {
GenericRow row = GenericRow.of(pt, col, pk);
- return new DynamicBucketRow(row, bucket);
+ return Pair.of(row, bucket);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
index af8bc7205..043e3c727 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
@@ -22,13 +22,14 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,8 +65,8 @@ public class HashIndexMaintainerTest extends
PrimaryKeyTableTestBase {
return options;
}
- private DynamicBucketRow createRow(int partition, int bucket, int key, int
value) {
- return new DynamicBucketRow(GenericRow.of(partition, key, value),
bucket);
+ private Pair<InternalRow, Integer> createRow(int partition, int bucket,
int key, int value) {
+ return Pair.of(GenericRow.of(partition, key, value), bucket);
}
private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage>
messages) {
@@ -89,11 +90,11 @@ public class HashIndexMaintainerTest extends
PrimaryKeyTableTestBase {
@Test
public void testAssignBucket() throws Exception {
assertThatThrownBy(() -> write.write(GenericRow.of(1, 1, 1)))
- .hasMessageContaining("Only supports DynamicBucketRow");
+ .hasMessageContaining("Can't extract bucket from row in
dynamic bucket mode.");
// commit two partitions
- write.write(createRow(1, 1, 1, 1));
- write.write(createRow(2, 2, 2, 2));
+ write(write, createRow(1, 1, 1, 1));
+ write(write, createRow(2, 2, 2, 2));
List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
Map<BinaryRow, Map<Integer, int[]>> index = readIndex(commitMessages);
assertThat(index).containsOnlyKeys(row(1), row(2));
@@ -103,7 +104,7 @@ public class HashIndexMaintainerTest extends
PrimaryKeyTableTestBase {
commit.commit(0, commitMessages);
// only one partition
- write.write(createRow(1, 1, 2, 2));
+ write(write, createRow(1, 1, 2, 2));
commitMessages = write.prepareCommit(true, 1);
index = readIndex(commitMessages);
assertThat(index).containsOnlyKeys(row(1));
@@ -113,7 +114,7 @@ public class HashIndexMaintainerTest extends
PrimaryKeyTableTestBase {
// restore
write = writeBuilder.newWrite();
- write.write(createRow(1, 1, 3, 3));
+ write(write, createRow(1, 1, 3, 3));
commitMessages = write.prepareCommit(true, 2);
index = readIndex(commitMessages);
assertThat(index).containsOnlyKeys(row(1));
@@ -128,16 +129,21 @@ public class HashIndexMaintainerTest extends
PrimaryKeyTableTestBase {
@Test
public void testNotCreateNewFile() throws Exception {
// commit two partitions
- write.write(createRow(1, 1, 1, 1));
- write.write(createRow(2, 2, 2, 2));
+ write(write, createRow(1, 1, 1, 1));
+ write(write, createRow(2, 2, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
// same record
- write.write(createRow(1, 1, 1, 1));
+ write(write, createRow(1, 1, 1, 1));
List<CommitMessage> commitMessages = write.prepareCommit(true, 1);
assertThat(readIndex(commitMessages)).isEmpty();
write.close();
commit.close();
}
+
+ private void write(StreamTableWrite write, Pair<InternalRow, Integer>
rowWithBucket)
+ throws Exception {
+ write.write(rowWithBucket.getKey(), rowWithBucket.getValue());
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index 0f0be3937..544d24970 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -29,9 +29,9 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.BatchWriteBuilderImpl;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -57,7 +57,8 @@ public class DynamicBucketTableTest extends TableTestBase {
.indexMaintainer;
Assertions.assertThat(indexMaintainer.isEmpty()).isTrue();
- batchTableWrite.write(data(0));
+ Pair<InternalRow, Integer> rowWithBucket = data(0);
+ batchTableWrite.write(rowWithBucket.getKey(),
rowWithBucket.getValue());
Assertions.assertThat(
((CommitMessageImpl)
batchTableWrite.prepareCommit().get(0))
.indexIncrement()
@@ -74,7 +75,8 @@ public class DynamicBucketTableTest extends TableTestBase {
try (BatchTableWrite batchTableWrite = builder.newWrite()) {
for (int i = 0; i < times; i++) {
for (int j = 0; j < size; j++) {
- batchTableWrite.write(data(i));
+ Pair<InternalRow, Integer> rowWithBucket = data(i);
+ batchTableWrite.write(rowWithBucket.getKey(),
rowWithBucket.getValue());
}
}
messages = batchTableWrite.prepareCommit();
@@ -97,13 +99,13 @@ public class DynamicBucketTableTest extends TableTestBase {
return schemaBuilder.build();
}
- private static InternalRow data(int bucket) {
+ private static Pair<InternalRow, Integer> data(int bucket) {
GenericRow row =
GenericRow.of(
RANDOM.nextLong(),
(long) RANDOM.nextInt(10000),
(long) RANDOM.nextInt(10000),
(long) RANDOM.nextInt(10000));
- return new DynamicBucketRow(row, bucket);
+ return Pair.of(row, bucket);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index 7fc7c76ca..e4e890850 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -28,10 +28,10 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.Test;
@@ -53,8 +53,8 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
return options;
}
- private DynamicBucketRow createRow(int partition, int bucket, int key, int
value) {
- return new DynamicBucketRow(GenericRow.of(partition, key, value),
bucket);
+ private Pair<GenericRow, Integer> createRow(int partition, int bucket, int
key, int value) {
+ return Pair.of(GenericRow.of(partition, key, value), bucket);
}
@Test
@@ -197,13 +197,13 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
StreamTableCommit commit = writeBuilder.newCommit();
// commit bucket 1,2,3
- write.write(createRow(1, 1, 1, 1));
- write.write(createRow(2, 2, 2, 2));
- write.write(createRow(3, 3, 3, 3));
+ write(write, createRow(1, 1, 1, 1));
+ write(write, createRow(2, 2, 2, 2));
+ write(write, createRow(3, 3, 3, 3));
commit.commit(0, write.prepareCommit(true, 0));
// commit bucket 1 only
- write.write(createRow(1, 1, 2, 2));
+ write(write, createRow(1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
// compact only
@@ -211,15 +211,15 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
commit.commit(2, write.prepareCommit(true, 2));
// commit bucket 2 only
- write.write(createRow(2, 2, 3, 3));
+ write(write, createRow(2, 2, 3, 3));
commit.commit(3, write.prepareCommit(true, 3));
// commit bucket 2 only
- write.write(createRow(2, 2, 4, 4));
+ write(write, createRow(2, 2, 4, 4));
commit.commit(4, write.prepareCommit(true, 4));
// commit bucket 2 only
- write.write(createRow(2, 2, 5, 5));
+ write(write, createRow(2, 2, 5, 5));
commit.commit(5, write.prepareCommit(true, 5));
write.close();
@@ -250,4 +250,9 @@ public class IndexFileExpireTableTest extends
PrimaryKeyTableTestBase {
.filter(s -> s.getPath().getName().startsWith("index-"))
.count();
}
+
+ private void write(StreamTableWrite write, Pair<GenericRow, Integer>
rowWithBucket)
+ throws Exception {
+ write.write(rowWithBucket.getKey(), rowWithBucket.getValue());
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 5465d5c4b..16e5db03f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -97,6 +97,18 @@ public abstract class TableTestBase {
return identifier(DEFAULT_TABLE_NAME);
}
+ @SafeVarargs
+ protected final void write(Table table, Pair<InternalRow, Integer>...
rows) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (Pair<InternalRow, Integer> row : rows) {
+ write.write(row.getKey(), row.getValue());
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
protected void write(Table table, InternalRow... rows) throws Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
index 64fb65910..b2fbdc3e9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -81,7 +80,7 @@ public class CdcDynamicBucketWriteOperator extends
TableWriteOperator<Tuple2<Cdc
}
try {
- write.write(new DynamicBucketRow(optionalConverted.get(),
record.f1));
+ write.write(optionalConverted.get(), record.f1);
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
index 2a2d1fe20..53b9be457 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -48,7 +47,6 @@ public class DynamicBucketRowWriteOperator
@Override
public void processElement(StreamRecord<Tuple2<InternalRow, Integer>>
element)
throws Exception {
- DynamicBucketRow row = new DynamicBucketRow(element.getValue().f0,
element.getValue().f1);
- write.write(row);
+ write.write(element.getValue().f0, element.getValue().f1);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 28a82a6c4..7278e1f21 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -41,6 +41,8 @@ public interface StoreSinkWrite {
SinkRecord write(InternalRow rowData) throws Exception;
+ SinkRecord write(InternalRow rowData, int bucket) throws Exception;
+
SinkRecord toLogRecord(SinkRecord record);
void compact(BinaryRow partition, int bucket, boolean fullCompaction)
throws Exception;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 968ace78a..c70f6038e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -174,6 +174,11 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
return write.writeAndReturn(rowData);
}
+ @Override
+ public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
+ return write.writeAndReturn(rowData, bucket);
+ }
+
@Override
public SinkRecord toLogRecord(SinkRecord record) {
return write.toLogRecord(record);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 45dbfcd9d..adc19eb4f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -34,8 +34,8 @@ import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.DynamicBucketRow;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -217,7 +217,8 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
try (BatchTableWrite batchTableWrite = builder.newWrite()) {
for (int i = 0; i < size; i++) {
for (int j = 0; j < 100; j++) {
- batchTableWrite.write(data(i));
+ Pair<InternalRow, Integer> rowWithBucket = data(i);
+ batchTableWrite.write(rowWithBucket.getKey(),
rowWithBucket.getValue());
}
}
messages = batchTableWrite.prepareCommit();
@@ -245,7 +246,7 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
return Identifier.create(database, tableName);
}
- private static InternalRow data(int bucket) {
+ private static Pair<InternalRow, Integer> data(int bucket) {
String in = String.valueOf(Math.abs(RANDOM.nextInt(10000)));
int count = 4 - in.length();
for (int i = 0; i < count; i++) {
@@ -259,6 +260,6 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
(long) RANDOM.nextInt(10000),
(long) RANDOM.nextInt(10000),
BinaryString.fromString("00000000" + in));
- return new DynamicBucketRow(row, bucket);
+ return Pair.of(row, bucket);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index c65190fd4..9cd2a7392 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -107,6 +107,11 @@ public class StoreCompactOperatorTest extends
TableTestBase {
return null;
}
+ @Override
+ public SinkRecord write(InternalRow rowData, int bucket) {
+ return null;
+ }
+
@Override
public SinkRecord toLogRecord(SinkRecord record) {
return null;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 2d52000db..fe2aeede1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -27,7 +27,7 @@ import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
import org.apache.paimon.table.{BucketMode, FileStoreTable}
-import org.apache.paimon.table.sink.{BatchWriteBuilder,
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
+import org.apache.paimon.table.sink.{BatchWriteBuilder,
CommitMessageSerializer, RowPartitionKeyExtractor}
import org.apache.paimon.types.RowType
import org.apache.spark.TaskContext
@@ -149,7 +149,7 @@ case class WriteIntoPaimonTable(
rowType,
bucketColDropped,
SparkRowUtils.getRowKind(row, rowkindColIdx))
- write.write(new DynamicBucketRow(sparkRow, bucket))
+ write.write(sparkRow, bucket)
}
val serializer = new CommitMessageSerializer
write.prepareCommit().asScala.map(serializer.serialize).toIterator