This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2d324f0ab9 [core] fix: FormatReadBuilder serialization fail and
partition bug (#6190)
2d324f0ab9 is described below
commit 2d324f0ab9ffb7b560a50727a32a1088c1659a15
Author: jerry <[email protected]>
AuthorDate: Wed Sep 3 16:55:12 2025 +0800
[core] fix: FormatReadBuilder serialization fail and partition bug (#6190)
---
.../paimon/table/format/FormatReadBuilder.java | 14 +--
.../paimon/table/format/FormatTableScan.java | 2 +-
.../org/apache/paimon/catalog/CatalogTestBase.java | 10 +-
.../paimon/table/format/FormatReadBuilderTest.java | 125 +++++++++++++++++++++
4 files changed, 139 insertions(+), 12 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 60eb8e1312..4c882ae4a6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.format;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
@@ -56,9 +55,8 @@ public class FormatReadBuilder implements ReadBuilder {
private static final long serialVersionUID = 1L;
private final FormatTable table;
- private final FileFormat fileFormat;
-
private RowType readType;
+ private final CoreOptions options;
@Nullable private Predicate filter;
@Nullable private PartitionPredicate partitionFilter;
@Nullable private Integer limit;
@@ -66,8 +64,7 @@ public class FormatReadBuilder implements ReadBuilder {
public FormatReadBuilder(FormatTable table) {
this.table = table;
this.readType = this.table.rowType();
- CoreOptions options = new CoreOptions(table.options());
- this.fileFormat =
FileFormatDiscover.of(options).discover(options.formatType());
+ this.options = new CoreOptions(table.options());
}
@Override
@@ -144,10 +141,11 @@ public class FormatReadBuilder implements ReadBuilder {
Path filePath = dataSplit.dataPath();
FormatReaderContext formatReaderContext =
new FormatReaderContext(table.fileIO(), filePath,
dataSplit.length(), null);
-
FormatReaderFactory readerFactory =
- fileFormat.createReaderFactory(
- table.rowType(), readType(),
PredicateBuilder.splitAnd(filter));
+ FileFormatDiscover.of(options)
+ .discover(options.formatType())
+ .createReaderFactory(
+ table.rowType(), readType(),
PredicateBuilder.splitAnd(filter));
Pair<int[], RowType> partitionMapping =
PartitionUtils.getPartitionMapping(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index 3555f76836..1850bc4c5c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -142,7 +142,7 @@ public class FormatTableScan implements InnerTableScan {
partition2Paths) {
LinkedHashMap<String, String> partitionSpec =
partition2Path.getKey();
BinaryRow partitionRow =
createPartitionRow(partitionSpec);
- if (partitionFilter != null &&
partitionFilter.test(partitionRow)) {
+ if (partitionFilter == null ||
partitionFilter.test(partitionRow)) {
splits.addAll(
getSplits(fileIO,
partition2Path.getValue(), partitionRow));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 192e8f8171..082d2bc638 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -661,6 +661,7 @@ public abstract class CatalogTestBase {
diffPartitionPathFactory.newPath(),
compressionType.value(),
dataWithDiffPartition);
+ size = size + 1;
partitionSpec = new HashMap<>();
partitionSpec.put("dt", "" + partitionValue);
} else {
@@ -675,12 +676,15 @@ public abstract class CatalogTestBase {
null);
write(factory, dataFilePathFactory.newPath(),
compressionType.value(), datas);
}
- List<InternalRow> readData = read(table, predicate, projection,
partitionSpec, null);
- Integer limit = checkSize - 1;
+ List<InternalRow> readFilterData =
+ read(table, predicate, projection, partitionSpec, null);
+ assertThat(readFilterData).containsExactlyInAnyOrder(checkDatas);
+ List<InternalRow> readAllData = read(table, null, null, null,
null);
+ assertThat(readAllData).hasSize(size);
+ int limit = checkSize - 1;
List<InternalRow> readLimitData =
read(table, predicate, projection, partitionSpec, limit);
assertThat(readLimitData).hasSize(limit);
- assertThat(readData).containsExactlyInAnyOrder(checkDatas);
catalog.dropTable(Identifier.create(dbName, format), true);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
new file mode 100644
index 0000000000..45cff37f3e
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+/** Test for {@link FormatReadBuilder} serialization. */
+public class FormatReadBuilderTest {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ @Test
+ public void testSerializeAndDeserialize() throws IOException,
ClassNotFoundException {
+ RowType rowType =
+ RowType.builder()
+ .field("partition_key", DataTypes.STRING())
+ .field("id", DataTypes.BIGINT())
+ .field("data", DataTypes.STRING())
+ .field("timestamp", DataTypes.TIMESTAMP())
+ .build();
+
+ Map<String, String> options = new HashMap<>();
+ options.put("file.format", "orc");
+ options.put("file.compression", "zstd");
+
+ Path tablePath = new Path(tempPath.toUri());
+ FormatTable table =
+ FormatTable.builder()
+ .fileIO(LocalFileIO.create())
+ .identifier(Identifier.create("test_db",
"complex_table"))
+ .rowType(rowType)
+ .partitionKeys(Arrays.asList("partition_key"))
+ .location(tablePath.toString())
+ .format(FormatTable.Format.ORC)
+ .options(options)
+ .build();
+
+ FormatReadBuilder readBuilder = new FormatReadBuilder(table);
+
+ assertThatNoException().isThrownBy(() ->
InstantiationUtil.serializeObject(readBuilder));
+ assertThatNoException()
+ .isThrownBy(
+ () ->
+ InstantiationUtil.deserializeObject(
+
InstantiationUtil.serializeObject(readBuilder),
+ getClass().getClassLoader()));
+
+ // Add multiple filters
+ PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
+ Predicate filter1 = predicateBuilder.greaterThan(1, 1000L); // id >
1000
+ Predicate filter2 = predicateBuilder.isNotNull(2); // data is not null
+ readBuilder.withFilter(filter1);
+ readBuilder.withFilter(filter2);
+
+ // Add partition filter
+ Map<String, String> partitionSpec = new HashMap<>();
+ partitionSpec.put("partition_key", "test_partition");
+ readBuilder.withPartitionFilter(partitionSpec);
+
+ // Add projection and limit
+ int[] projection = {1, 2, 3}; // project id, data, timestamp
+ readBuilder.withProjection(projection);
+ readBuilder.withLimit(500);
+
+ // Test Java serialization
+ byte[] serialized = InstantiationUtil.serializeObject(readBuilder);
+ FormatReadBuilder deserialized =
+ InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ // Verify the deserialized object maintains all configurations
+ assertThat(deserialized.tableName()).isEqualTo(table.name());
+
+ RowType expectedProjectedType =
+ RowType.builder()
+ .field("id", DataTypes.BIGINT())
+ .field("data", DataTypes.STRING())
+ .field("timestamp", DataTypes.TIMESTAMP())
+ .build();
+ assertThat(deserialized.readType().getFieldCount())
+ .isEqualTo(expectedProjectedType.getFieldCount());
+ assertThat(deserialized.readType().getFieldNames())
+ .isEqualTo(expectedProjectedType.getFieldNames());
+ assertThat(deserialized.readType().getFieldTypes())
+ .isEqualTo(expectedProjectedType.getFieldTypes());
+
+ // Verify that scan and read can be created
+ assertThat(deserialized.newScan()).isNotNull();
+ assertThat(deserialized.newRead()).isNotNull();
+ }
+}