This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 13fc252b60 [flink] Allow ignoring bucket configuration during read. 
(#5838)
13fc252b60 is described below

commit 13fc252b60ef1dbf8c78a39917d33b5131ab08ff
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Fri Jul 11 11:22:02 2025 +0800

    [flink] Allow ignoring bucket configuration during read. (#5838)
---
 .../shortcodes/generated/core_configuration.html   |   6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |   7 ++
 .../org/apache/paimon/schema/SchemaValidation.java |   2 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    |  15 ++-
 .../apache/paimon/flink/AppendOnlyTableITCase.java |   2 +-
 .../flink/source/FlinkSourceBuilderTest.java       | 117 +++++++++++++++++++++
 6 files changed, 144 insertions(+), 5 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index f39ffdbcd8..0db11d7707 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,6 +68,12 @@ under the License.
             <td>String</td>
             <td>Specify the paimon distribution policy. Data is assigned to 
each bucket according to the hash value of bucket-key.<br />If you specify 
multiple fields, delimiter is ','.<br />If not specified, the primary key will 
be used; if there is no primary key, the full row will be used.</td>
         </tr>
+        <tr>
+            <td><h5>bucket-append-ordered</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to ignore the order of the buckets when reading data 
from an append-only table.</td>
+        </tr>
         <tr>
             <td><h5>cache-page-size</h5></td>
             <td style="word-wrap: break-word;">64 kb</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 7fa95afcd5..fe6b9005e5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -123,6 +123,13 @@ public class CoreOptions implements Serializable {
                                                     + "if there is no primary 
key, the full row will be used.")
                                     .build());
 
+    public static final ConfigOption<Boolean> BUCKET_APPEND_ORDERD =
+            key("bucket-append-ordered")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to ignore the order of the buckets when 
reading data from an append-only table.");
+
     @Immutable
     public static final ConfigOption<BucketFunctionType> BUCKET_FUNCTION_TYPE =
             key("bucket-function.type")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index ddde77cbb1..6e5d03015d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -542,7 +542,7 @@ public class SchemaValidation {
         if (bucket == -1) {
             if (options.toMap().get(BUCKET_KEY.key()) != null) {
                 throw new RuntimeException(
-                        "Cannot define 'bucket-key' with bucket -1, please 
specify a bucket number.");
+                        "Cannot define 'bucket-key' with bucket = -1, please 
remove the 'bucket-key' setting or specify a bucket number.");
             }
 
             if (schema.primaryKeys().isEmpty()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 44b5790472..846f5f645e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.StartupMode;
 import org.apache.paimon.CoreOptions.StreamingReadMode;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.NestedProjectedRowData;
 import org.apache.paimon.flink.Projection;
@@ -97,11 +98,14 @@ public class FlinkSourceBuilder {
 
     public FlinkSourceBuilder(Table table) {
         this.table = table;
-        this.unawareBucket =
-                table instanceof FileStoreTable
-                        && ((FileStoreTable) table).bucketMode() == 
BucketMode.BUCKET_UNAWARE;
         this.sourceName = table.name();
         this.conf = Options.fromMap(table.options());
+        this.unawareBucket =
+                (table instanceof FileStoreTable
+                                && ((FileStoreTable) table).bucketMode()
+                                        == BucketMode.BUCKET_UNAWARE)
+                        || (table.primaryKeys().isEmpty()
+                                && 
!this.conf.get(CoreOptions.BUCKET_APPEND_ORDERD));
     }
 
     public FlinkSourceBuilder env(StreamExecutionEnvironment env) {
@@ -266,6 +270,11 @@ public class FlinkSourceBuilder {
                 .orElse(null);
     }
 
+    @VisibleForTesting
+    public boolean isUnawareBucket() {
+        return unawareBucket;
+    }
+
     /** Build source {@link DataStream} with {@link RowData}. */
     public DataStream<Row> buildForRow() {
         DataType rowType = 
fromLogicalToDataType(toLogicalType(table.rowType()));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 882071a44e..d3a06a09ae 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -57,7 +57,7 @@ public class AppendOnlyTableITCase extends CatalogITCaseBase {
                                                 + "WITH ('bucket' = '-1', 
'bucket-key' = 'id')"))
                 .hasRootCauseInstanceOf(RuntimeException.class)
                 .hasRootCauseMessage(
-                        "Cannot define 'bucket-key' with bucket -1, please 
specify a bucket number.");
+                        "Cannot define 'bucket-key' with bucket = -1, please 
remove the 'bucket-key' setting or specify a bucket number.");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
new file mode 100644
index 0000000000..b19a1a330c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link FlinkSourceBuilder}. */
+public class FlinkSourceBuilderTest {
+
+    @TempDir Path tempDir;
+    private Catalog catalog;
+
+    @BeforeEach
+    public void setUp() {
+        try {
+            initCatalog();
+        } catch (Exception e) {
+            throw new RuntimeException("Catalog initialization failed", e);
+        }
+    }
+
+    private void initCatalog() throws Exception {
+        if (catalog == null) {
+            catalog =
+                    CatalogFactory.createCatalog(
+                            CatalogContext.create(new 
org.apache.paimon.fs.Path(tempDir.toUri())));
+            catalog.createDatabase("default", false);
+        }
+    }
+
+    private Table createTable(
+            String tableName, boolean hasPrimaryKey, int bucketNum, boolean 
bucketAppendOrdered)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .option("bucket", bucketNum + "")
+                        .option("bucket-append-ordered", 
String.valueOf(bucketAppendOrdered));
+
+        if (hasPrimaryKey) {
+            schemaBuilder.primaryKey("a");
+        }
+
+        if (bucketNum != -1) {
+            schemaBuilder.option("bucket-key", "a");
+        }
+
+        Schema schema = schemaBuilder.build();
+        Identifier identifier = Identifier.create("default", tableName);
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    @Test
+    public void testUnawareBucket() throws Exception {
+        // pk table && bucket-append-ordered is true
+        Table table = createTable("t1", true, 2, true);
+        FlinkSourceBuilder builder = new FlinkSourceBuilder(table);
+        assertFalse(builder.isUnawareBucket());
+
+        // pk table && bucket-append-ordered is false
+        table = createTable("t2", true, 2, false);
+        builder = new FlinkSourceBuilder(table);
+        assertFalse(builder.isUnawareBucket());
+
+        // pk table && bucket num == -1 && bucket-append-ordered is false
+        table = createTable("t3", true, -1, false);
+        builder = new FlinkSourceBuilder(table);
+        assertFalse(builder.isUnawareBucket());
+
+        // append table && bucket num != 1 && bucket-append-ordered is true
+        table = createTable("t4", false, 2, true);
+        builder = new FlinkSourceBuilder(table);
+        assertFalse(builder.isUnawareBucket());
+
+        // append table && bucket num == -1
+        table = createTable("t5", false, -1, true);
+        builder = new FlinkSourceBuilder(table);
+        assertTrue(builder.isUnawareBucket());
+
+        // append table && bucket-append-ordered is false
+        table = createTable("t6", false, 2, false);
+        builder = new FlinkSourceBuilder(table);
+        assertTrue(builder.isUnawareBucket());
+    }
+}

Reply via email to