This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b6350e8 [core] remove Dynamic Bucket Row (#2541)
86b6350e8 is described below

commit 86b6350e8fa86ef9a09c8b1bbca45196074c8739
Author: YeJunHao <[email protected]>
AuthorDate: Wed Dec 20 17:14:55 2023 +0800

    [core] remove Dynamic Bucket Row (#2541)
---
 .../apache/paimon/table/sink/DynamicBucketRow.java | 133 ---------------------
 .../table/sink/DynamicBucketRowKeyExtractor.java   |  10 +-
 .../org/apache/paimon/table/sink/TableWrite.java   |   3 +
 .../apache/paimon/table/sink/TableWriteImpl.java   |  20 ++++
 .../paimon/crosspartition/IndexBootstrapTest.java  |   6 +-
 .../paimon/index/HashIndexMaintainerTest.java      |  28 +++--
 .../paimon/table/DynamicBucketTableTest.java       |  12 +-
 .../paimon/table/IndexFileExpireTableTest.java     |  25 ++--
 .../org/apache/paimon/table/TableTestBase.java     |  12 ++
 .../sink/cdc/CdcDynamicBucketWriteOperator.java    |   3 +-
 .../flink/sink/DynamicBucketRowWriteOperator.java  |   4 +-
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |   2 +
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |   5 +
 .../SortCompactActionForDynamicBucketITCase.java   |   9 +-
 .../flink/sink/StoreCompactOperatorTest.java       |   5 +
 .../spark/commands/WriteIntoPaimonTable.scala      |   4 +-
 16 files changed, 101 insertions(+), 180 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
deleted file mode 100644
index 73394a9f5..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.sink;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.RowKind;
-
-/** An {@link InternalRow} wraps another {@link InternalRow} with bucket. */
-public class DynamicBucketRow implements InternalRow {
-
-    private final InternalRow row;
-    private final int bucket;
-
-    public DynamicBucketRow(InternalRow row, int bucket) {
-        this.row = row;
-        this.bucket = bucket;
-    }
-
-    public int bucket() {
-        return bucket;
-    }
-
-    @Override
-    public int getFieldCount() {
-        return row.getFieldCount();
-    }
-
-    @Override
-    public RowKind getRowKind() {
-        return row.getRowKind();
-    }
-
-    @Override
-    public void setRowKind(RowKind kind) {
-        row.setRowKind(kind);
-    }
-
-    @Override
-    public boolean isNullAt(int pos) {
-        return row.isNullAt(pos);
-    }
-
-    @Override
-    public boolean getBoolean(int pos) {
-        return row.getBoolean(pos);
-    }
-
-    @Override
-    public byte getByte(int pos) {
-        return row.getByte(pos);
-    }
-
-    @Override
-    public short getShort(int pos) {
-        return row.getShort(pos);
-    }
-
-    @Override
-    public int getInt(int pos) {
-        return row.getInt(pos);
-    }
-
-    @Override
-    public long getLong(int pos) {
-        return row.getLong(pos);
-    }
-
-    @Override
-    public float getFloat(int pos) {
-        return row.getFloat(pos);
-    }
-
-    @Override
-    public double getDouble(int pos) {
-        return row.getDouble(pos);
-    }
-
-    @Override
-    public BinaryString getString(int pos) {
-        return row.getString(pos);
-    }
-
-    @Override
-    public Decimal getDecimal(int pos, int precision, int scale) {
-        return row.getDecimal(pos, precision, scale);
-    }
-
-    @Override
-    public Timestamp getTimestamp(int pos, int precision) {
-        return row.getTimestamp(pos, precision);
-    }
-
-    @Override
-    public byte[] getBinary(int pos) {
-        return row.getBinary(pos);
-    }
-
-    @Override
-    public InternalArray getArray(int pos) {
-        return row.getArray(pos);
-    }
-
-    @Override
-    public InternalMap getMap(int pos) {
-        return row.getMap(pos);
-    }
-
-    @Override
-    public InternalRow getRow(int pos, int numFields) {
-        return row.getRow(pos, numFields);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
index 6ae14b5f7..5f4734bf5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
@@ -25,8 +25,8 @@ import org.apache.paimon.schema.TableSchema;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
- * {@link KeyAndBucketExtractor} for {@link InternalRow} with extracting 
bucket from {@link
- * DynamicBucketRow}.
+ * {@link KeyAndBucketExtractor} for {@link InternalRow}, just throws error 
when extract bucket from
+ * dynamic row.
  */
 public class DynamicBucketRowKeyExtractor extends RowKeyExtractor {
 
@@ -41,10 +41,6 @@ public class DynamicBucketRowKeyExtractor extends 
RowKeyExtractor {
 
     @Override
     public int bucket() {
-        if (record instanceof DynamicBucketRow) {
-            return ((DynamicBucketRow) record).bucket();
-        }
-        throw new IllegalArgumentException(
-                "Only supports DynamicBucketRow, illegal record type: " + 
record.getClass());
+        throw new IllegalArgumentException("Can't extract bucket from row in 
dynamic bucket mode.");
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index bf61cc57b..0f25f8219 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -49,6 +49,9 @@ public interface TableWrite extends AutoCloseable {
     /** Write a row to the writer. */
     void write(InternalRow row) throws Exception;
 
+    /** Write a row with bucket. */
+    void write(InternalRow row, int bucket) throws Exception;
+
     /**
      * Compact a bucket of a partition. By default, it will determine whether 
to perform the
      * compaction according to the 'num-sorted-run.compaction-trigger' option. 
If fullCompaction is
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 25c29be9f..1c859bf03 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -116,12 +116,23 @@ public class TableWriteImpl<T> implements 
InnerTableWrite, Restorable<List<State
         writeAndReturn(row);
     }
 
+    @Override
+    public void write(InternalRow row, int bucket) throws Exception {
+        writeAndReturn(row, bucket);
+    }
+
     public SinkRecord writeAndReturn(InternalRow row) throws Exception {
         SinkRecord record = toSinkRecord(row);
         write.write(record.partition(), record.bucket(), 
recordExtractor.extract(record));
         return record;
     }
 
+    public SinkRecord writeAndReturn(InternalRow row, int bucket) throws 
Exception {
+        SinkRecord record = toSinkRecord(row, bucket);
+        write.write(record.partition(), bucket, 
recordExtractor.extract(record));
+        return record;
+    }
+
     @VisibleForTesting
     public T writeAndReturnData(InternalRow row) throws Exception {
         SinkRecord record = toSinkRecord(row);
@@ -139,6 +150,15 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
                 row);
     }
 
+    private SinkRecord toSinkRecord(InternalRow row, int bucket) {
+        keyAndBucketExtractor.setRecord(row);
+        return new SinkRecord(
+                keyAndBucketExtractor.partition(),
+                bucket,
+                keyAndBucketExtractor.trimmedPrimaryKey(),
+                row);
+    }
+
     public SinkRecord toLogRecord(SinkRecord record) {
         keyAndBucketExtractor.setRecord(record.row());
         return new SinkRecord(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 976466464..cd9568d46 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -28,9 +28,9 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
-import org.apache.paimon.table.sink.DynamicBucketRow;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.Test;
 
@@ -135,8 +135,8 @@ public class IndexBootstrapTest extends TableTestBase {
                                 .toLocalDateTime()));
     }
 
-    private DynamicBucketRow row(int pt, int col, int pk, int bucket) {
+    private Pair<InternalRow, Integer> row(int pt, int col, int pk, int 
bucket) {
         GenericRow row = GenericRow.of(pt, col, pk);
-        return new DynamicBucketRow(row, bucket);
+        return Pair.of(row, bucket);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
index af8bc7205..043e3c727 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
@@ -22,13 +22,14 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.table.sink.DynamicBucketRow;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -64,8 +65,8 @@ public class HashIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
         return options;
     }
 
-    private DynamicBucketRow createRow(int partition, int bucket, int key, int 
value) {
-        return new DynamicBucketRow(GenericRow.of(partition, key, value), 
bucket);
+    private Pair<InternalRow, Integer> createRow(int partition, int bucket, 
int key, int value) {
+        return Pair.of(GenericRow.of(partition, key, value), bucket);
     }
 
     private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage> 
messages) {
@@ -89,11 +90,11 @@ public class HashIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
     @Test
     public void testAssignBucket() throws Exception {
         assertThatThrownBy(() -> write.write(GenericRow.of(1, 1, 1)))
-                .hasMessageContaining("Only supports DynamicBucketRow");
+                .hasMessageContaining("Can't extract bucket from row in 
dynamic bucket mode.");
 
         // commit two partitions
-        write.write(createRow(1, 1, 1, 1));
-        write.write(createRow(2, 2, 2, 2));
+        write(write, createRow(1, 1, 1, 1));
+        write(write, createRow(2, 2, 2, 2));
         List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
         Map<BinaryRow, Map<Integer, int[]>> index = readIndex(commitMessages);
         assertThat(index).containsOnlyKeys(row(1), row(2));
@@ -103,7 +104,7 @@ public class HashIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(0, commitMessages);
 
         // only one partition
-        write.write(createRow(1, 1, 2, 2));
+        write(write, createRow(1, 1, 2, 2));
         commitMessages = write.prepareCommit(true, 1);
         index = readIndex(commitMessages);
         assertThat(index).containsOnlyKeys(row(1));
@@ -113,7 +114,7 @@ public class HashIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
 
         // restore
         write = writeBuilder.newWrite();
-        write.write(createRow(1, 1, 3, 3));
+        write(write, createRow(1, 1, 3, 3));
         commitMessages = write.prepareCommit(true, 2);
         index = readIndex(commitMessages);
         assertThat(index).containsOnlyKeys(row(1));
@@ -128,16 +129,21 @@ public class HashIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
     @Test
     public void testNotCreateNewFile() throws Exception {
         // commit two partitions
-        write.write(createRow(1, 1, 1, 1));
-        write.write(createRow(2, 2, 2, 2));
+        write(write, createRow(1, 1, 1, 1));
+        write(write, createRow(2, 2, 2, 2));
         commit.commit(0, write.prepareCommit(true, 0));
 
         // same record
-        write.write(createRow(1, 1, 1, 1));
+        write(write, createRow(1, 1, 1, 1));
         List<CommitMessage> commitMessages = write.prepareCommit(true, 1);
         assertThat(readIndex(commitMessages)).isEmpty();
 
         write.close();
         commit.close();
     }
+
+    private void write(StreamTableWrite write, Pair<InternalRow, Integer> 
rowWithBucket)
+            throws Exception {
+        write.write(rowWithBucket.getKey(), rowWithBucket.getValue());
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index 0f0be3937..544d24970 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -29,9 +29,9 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.BatchWriteBuilderImpl;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.table.sink.DynamicBucketRow;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -57,7 +57,8 @@ public class DynamicBucketTableTest extends TableTestBase {
                                 .indexMaintainer;
 
         Assertions.assertThat(indexMaintainer.isEmpty()).isTrue();
-        batchTableWrite.write(data(0));
+        Pair<InternalRow, Integer> rowWithBucket = data(0);
+        batchTableWrite.write(rowWithBucket.getKey(), 
rowWithBucket.getValue());
         Assertions.assertThat(
                         ((CommitMessageImpl) 
batchTableWrite.prepareCommit().get(0))
                                 .indexIncrement()
@@ -74,7 +75,8 @@ public class DynamicBucketTableTest extends TableTestBase {
         try (BatchTableWrite batchTableWrite = builder.newWrite()) {
             for (int i = 0; i < times; i++) {
                 for (int j = 0; j < size; j++) {
-                    batchTableWrite.write(data(i));
+                    Pair<InternalRow, Integer> rowWithBucket = data(i);
+                    batchTableWrite.write(rowWithBucket.getKey(), 
rowWithBucket.getValue());
                 }
             }
             messages = batchTableWrite.prepareCommit();
@@ -97,13 +99,13 @@ public class DynamicBucketTableTest extends TableTestBase {
         return schemaBuilder.build();
     }
 
-    private static InternalRow data(int bucket) {
+    private static Pair<InternalRow, Integer> data(int bucket) {
         GenericRow row =
                 GenericRow.of(
                         RANDOM.nextLong(),
                         (long) RANDOM.nextInt(10000),
                         (long) RANDOM.nextInt(10000),
                         (long) RANDOM.nextInt(10000));
-        return new DynamicBucketRow(row, bucket);
+        return Pair.of(row, bucket);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index 7fc7c76ca..e4e890850 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -28,10 +28,10 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.sink.DynamicBucketRow;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.TagManager;
 
 import org.junit.jupiter.api.Test;
@@ -53,8 +53,8 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
         return options;
     }
 
-    private DynamicBucketRow createRow(int partition, int bucket, int key, int 
value) {
-        return new DynamicBucketRow(GenericRow.of(partition, key, value), 
bucket);
+    private Pair<GenericRow, Integer> createRow(int partition, int bucket, int 
key, int value) {
+        return Pair.of(GenericRow.of(partition, key, value), bucket);
     }
 
     @Test
@@ -197,13 +197,13 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
         StreamTableCommit commit = writeBuilder.newCommit();
 
         // commit bucket 1,2,3
-        write.write(createRow(1, 1, 1, 1));
-        write.write(createRow(2, 2, 2, 2));
-        write.write(createRow(3, 3, 3, 3));
+        write(write, createRow(1, 1, 1, 1));
+        write(write, createRow(2, 2, 2, 2));
+        write(write, createRow(3, 3, 3, 3));
         commit.commit(0, write.prepareCommit(true, 0));
 
         // commit bucket 1 only
-        write.write(createRow(1, 1, 2, 2));
+        write(write, createRow(1, 1, 2, 2));
         commit.commit(1, write.prepareCommit(true, 1));
 
         // compact only
@@ -211,15 +211,15 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
         commit.commit(2, write.prepareCommit(true, 2));
 
         // commit bucket 2 only
-        write.write(createRow(2, 2, 3, 3));
+        write(write, createRow(2, 2, 3, 3));
         commit.commit(3, write.prepareCommit(true, 3));
 
         // commit bucket 2 only
-        write.write(createRow(2, 2, 4, 4));
+        write(write, createRow(2, 2, 4, 4));
         commit.commit(4, write.prepareCommit(true, 4));
 
         // commit bucket 2 only
-        write.write(createRow(2, 2, 5, 5));
+        write(write, createRow(2, 2, 5, 5));
         commit.commit(5, write.prepareCommit(true, 5));
 
         write.close();
@@ -250,4 +250,9 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
                 .filter(s -> s.getPath().getName().startsWith("index-"))
                 .count();
     }
+
+    private void write(StreamTableWrite write, Pair<GenericRow, Integer> 
rowWithBucket)
+            throws Exception {
+        write.write(rowWithBucket.getKey(), rowWithBucket.getValue());
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 5465d5c4b..16e5db03f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -97,6 +97,18 @@ public abstract class TableTestBase {
         return identifier(DEFAULT_TABLE_NAME);
     }
 
+    @SafeVarargs
+    protected final void write(Table table, Pair<InternalRow, Integer>... 
rows) throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            for (Pair<InternalRow, Integer> row : rows) {
+                write.write(row.getKey(), row.getValue());
+            }
+            commit.commit(write.prepareCommit());
+        }
+    }
+
     protected void write(Table table, InternalRow... rows) throws Exception {
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         try (BatchTableWrite write = writeBuilder.newWrite();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
index 64fb65910..b2fbdc3e9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.PrepareCommitOperator;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.TableWriteOperator;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.DynamicBucketRow;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -81,7 +80,7 @@ public class CdcDynamicBucketWriteOperator extends 
TableWriteOperator<Tuple2<Cdc
         }
 
         try {
-            write.write(new DynamicBucketRow(optionalConverted.get(), 
record.f1));
+            write.write(optionalConverted.get(), record.f1);
         } catch (Exception e) {
             throw new IOException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
index 2a2d1fe20..53b9be457 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.DynamicBucketRow;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -48,7 +47,6 @@ public class DynamicBucketRowWriteOperator
     @Override
     public void processElement(StreamRecord<Tuple2<InternalRow, Integer>> 
element)
             throws Exception {
-        DynamicBucketRow row = new DynamicBucketRow(element.getValue().f0, 
element.getValue().f1);
-        write.write(row);
+        write.write(element.getValue().f0, element.getValue().f1);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 28a82a6c4..7278e1f21 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -41,6 +41,8 @@ public interface StoreSinkWrite {
 
     SinkRecord write(InternalRow rowData) throws Exception;
 
+    SinkRecord write(InternalRow rowData, int bucket) throws Exception;
+
     SinkRecord toLogRecord(SinkRecord record);
 
     void compact(BinaryRow partition, int bucket, boolean fullCompaction) 
throws Exception;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 968ace78a..c70f6038e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -174,6 +174,11 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         return write.writeAndReturn(rowData);
     }
 
+    @Override
+    public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
+        return write.writeAndReturn(rowData, bucket);
+    }
+
     @Override
     public SinkRecord toLogRecord(SinkRecord record) {
         return write.toLogRecord(record);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 45dbfcd9d..adc19eb4f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -34,8 +34,8 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.DynamicBucketRow;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -217,7 +217,8 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
         try (BatchTableWrite batchTableWrite = builder.newWrite()) {
             for (int i = 0; i < size; i++) {
                 for (int j = 0; j < 100; j++) {
-                    batchTableWrite.write(data(i));
+                    Pair<InternalRow, Integer> rowWithBucket = data(i);
+                    batchTableWrite.write(rowWithBucket.getKey(), 
rowWithBucket.getValue());
                 }
             }
             messages = batchTableWrite.prepareCommit();
@@ -245,7 +246,7 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
         return Identifier.create(database, tableName);
     }
 
-    private static InternalRow data(int bucket) {
+    private static Pair<InternalRow, Integer> data(int bucket) {
         String in = String.valueOf(Math.abs(RANDOM.nextInt(10000)));
         int count = 4 - in.length();
         for (int i = 0; i < count; i++) {
@@ -259,6 +260,6 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
                         (long) RANDOM.nextInt(10000),
                         (long) RANDOM.nextInt(10000),
                         BinaryString.fromString("00000000" + in));
-        return new DynamicBucketRow(row, bucket);
+        return Pair.of(row, bucket);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index c65190fd4..9cd2a7392 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -107,6 +107,11 @@ public class StoreCompactOperatorTest extends 
TableTestBase {
             return null;
         }
 
+        @Override
+        public SinkRecord write(InternalRow rowData, int bucket) {
+            return null;
+        }
+
         @Override
         public SinkRecord toLogRecord(SinkRecord record) {
             return null;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 2d52000db..fe2aeede1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -27,7 +27,7 @@ import org.apache.paimon.spark.schema.SparkSystemColumns
 import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
 import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
-import org.apache.paimon.table.sink.{BatchWriteBuilder, 
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
+import org.apache.paimon.table.sink.{BatchWriteBuilder, 
CommitMessageSerializer, RowPartitionKeyExtractor}
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.TaskContext
@@ -149,7 +149,7 @@ case class WriteIntoPaimonTable(
                   rowType,
                   bucketColDropped,
                   SparkRowUtils.getRowKind(row, rowkindColIdx))
-                write.write(new DynamicBucketRow(sparkRow, bucket))
+                write.write(sparkRow, bucket)
             }
             val serializer = new CommitMessageSerializer
             write.prepareCommit().asScala.map(serializer.serialize).toIterator

Reply via email to