This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 13fc252b60 [flink] Allow ignoring bucket configuration during read.
(#5838)
13fc252b60 is described below
commit 13fc252b60ef1dbf8c78a39917d33b5131ab08ff
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Fri Jul 11 11:22:02 2025 +0800
[flink] Allow ignoring bucket configuration during read. (#5838)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 7 ++
.../org/apache/paimon/schema/SchemaValidation.java | 2 +-
.../paimon/flink/source/FlinkSourceBuilder.java | 15 ++-
.../apache/paimon/flink/AppendOnlyTableITCase.java | 2 +-
.../flink/source/FlinkSourceBuilderTest.java | 117 +++++++++++++++++++++
6 files changed, 144 insertions(+), 5 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index f39ffdbcd8..0db11d7707 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,6 +68,12 @@ under the License.
<td>String</td>
<td>Specify the paimon distribution policy. Data is assigned to
each bucket according to the hash value of bucket-key.<br />If you specify
multiple fields, delimiter is ','.<br />If not specified, the primary key will
be used; if there is no primary key, the full row will be used.</td>
</tr>
+ <tr>
+ <td><h5>bucket-append-ordered</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to ignore the order of the buckets when reading data
from an append-only table.</td>
+ </tr>
<tr>
<td><h5>cache-page-size</h5></td>
<td style="word-wrap: break-word;">64 kb</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 7fa95afcd5..fe6b9005e5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -123,6 +123,13 @@ public class CoreOptions implements Serializable {
+ "if there is no primary
key, the full row will be used.")
.build());
+ public static final ConfigOption<Boolean> BUCKET_APPEND_ORDERD =
+ key("bucket-append-ordered")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to ignore the order of the buckets when
reading data from an append-only table.");
+
@Immutable
public static final ConfigOption<BucketFunctionType> BUCKET_FUNCTION_TYPE =
key("bucket-function.type")
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index ddde77cbb1..6e5d03015d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -542,7 +542,7 @@ public class SchemaValidation {
if (bucket == -1) {
if (options.toMap().get(BUCKET_KEY.key()) != null) {
throw new RuntimeException(
- "Cannot define 'bucket-key' with bucket -1, please
specify a bucket number.");
+ "Cannot define 'bucket-key' with bucket = -1, please
remove the 'bucket-key' setting or specify a bucket number.");
}
if (schema.primaryKeys().isEmpty()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 44b5790472..846f5f645e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.CoreOptions.StreamingReadMode;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.Projection;
@@ -97,11 +98,14 @@ public class FlinkSourceBuilder {
public FlinkSourceBuilder(Table table) {
this.table = table;
- this.unawareBucket =
- table instanceof FileStoreTable
- && ((FileStoreTable) table).bucketMode() ==
BucketMode.BUCKET_UNAWARE;
this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
+ this.unawareBucket =
+ (table instanceof FileStoreTable
+ && ((FileStoreTable) table).bucketMode()
+ == BucketMode.BUCKET_UNAWARE)
+ || (table.primaryKeys().isEmpty()
+ &&
!this.conf.get(CoreOptions.BUCKET_APPEND_ORDERD));
}
public FlinkSourceBuilder env(StreamExecutionEnvironment env) {
@@ -266,6 +270,11 @@ public class FlinkSourceBuilder {
.orElse(null);
}
+ @VisibleForTesting
+ public boolean isUnawareBucket() {
+ return unawareBucket;
+ }
+
/** Build source {@link DataStream} with {@link RowData}. */
public DataStream<Row> buildForRow() {
DataType rowType =
fromLogicalToDataType(toLogicalType(table.rowType()));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 882071a44e..d3a06a09ae 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -57,7 +57,7 @@ public class AppendOnlyTableITCase extends CatalogITCaseBase {
+ "WITH ('bucket' = '-1',
'bucket-key' = 'id')"))
.hasRootCauseInstanceOf(RuntimeException.class)
.hasRootCauseMessage(
- "Cannot define 'bucket-key' with bucket -1, please
specify a bucket number.");
+ "Cannot define 'bucket-key' with bucket = -1, please
remove the 'bucket-key' setting or specify a bucket number.");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
new file mode 100644
index 0000000000..b19a1a330c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link FlinkSourceBuilder}. */
+public class FlinkSourceBuilderTest {
+
+ @TempDir Path tempDir;
+ private Catalog catalog;
+
+ @BeforeEach
+ public void setUp() {
+ try {
+ initCatalog();
+ } catch (Exception e) {
+ throw new RuntimeException("Catalog initialization failed", e);
+ }
+ }
+
+ private void initCatalog() throws Exception {
+ if (catalog == null) {
+ catalog =
+ CatalogFactory.createCatalog(
+ CatalogContext.create(new
org.apache.paimon.fs.Path(tempDir.toUri())));
+ catalog.createDatabase("default", false);
+ }
+ }
+
+ private Table createTable(
+ String tableName, boolean hasPrimaryKey, int bucketNum, boolean
bucketAppendOrdered)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .option("bucket", bucketNum + "")
+ .option("bucket-append-ordered",
String.valueOf(bucketAppendOrdered));
+
+ if (hasPrimaryKey) {
+ schemaBuilder.primaryKey("a");
+ }
+
+ if (bucketNum != -1) {
+ schemaBuilder.option("bucket-key", "a");
+ }
+
+ Schema schema = schemaBuilder.build();
+ Identifier identifier = Identifier.create("default", tableName);
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
+ @Test
+ public void testUnawareBucket() throws Exception {
+ // pk table && bucket-append-ordered is true
+ Table table = createTable("t1", true, 2, true);
+ FlinkSourceBuilder builder = new FlinkSourceBuilder(table);
+ assertFalse(builder.isUnawareBucket());
+
+ // pk table && bucket-append-ordered is false
+ table = createTable("t2", true, 2, false);
+ builder = new FlinkSourceBuilder(table);
+ assertFalse(builder.isUnawareBucket());
+
+ // pk table && bucket num == -1 && bucket-append-ordered is false
+ table = createTable("t3", true, -1, false);
+ builder = new FlinkSourceBuilder(table);
+ assertFalse(builder.isUnawareBucket());
+
+ // append table && bucket num != 1 && bucket-append-ordered is true
+ table = createTable("t4", false, 2, true);
+ builder = new FlinkSourceBuilder(table);
+ assertFalse(builder.isUnawareBucket());
+
+ // append table && bucket num == -1
+ table = createTable("t5", false, -1, true);
+ builder = new FlinkSourceBuilder(table);
+ assertTrue(builder.isUnawareBucket());
+
+ // append table && bucket-append-ordered is false
+ table = createTable("t6", false, 2, false);
+ builder = new FlinkSourceBuilder(table);
+ assertTrue(builder.isUnawareBucket());
+ }
+}