This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a133f7dafd [core] Refactor ChainSplit to remove bucket and totalBucket
a133f7dafd is described below

commit a133f7dafd77eb87c48271c86f60251facdbfc58
Author: JingsongLi <[email protected]>
AuthorDate: Wed Dec 17 22:09:43 2025 +0800

    [core] Refactor ChainSplit to remove bucket and totalBucket
---
 .../paimon/io/ChainKeyValueFileReaderFactory.java  |  42 +++++++++
 .../paimon/io/KeyValueFileReaderFactory.java       | 100 ++++++++-------------
 .../paimon/operation/MergeFileSplitRead.java       |  27 ++----
 .../org/apache/paimon/table/source/ChainSplit.java |  57 ++----------
 .../apache/paimon/table/source/ChainSplitTest.java |  19 ++--
 5 files changed, 97 insertions(+), 148 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
index 66ec25b4ca..0fa9ae73ef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
@@ -20,14 +20,19 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FormatReaderMapping;
 
+import javax.annotation.Nullable;
+
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** A specific implementation about {@link KeyValueFileReaderFactory} for 
chain read. */
@@ -85,4 +90,41 @@ public class ChainKeyValueFileReaderFactory extends 
KeyValueFileReaderFactory {
     protected BinaryRow getLogicalPartition() {
         return chainReadContext.logicalPartition();
     }
+
+    public static Builder newBuilder(KeyValueFileReaderFactory.Builder 
wrapped) {
+        return new Builder(wrapped);
+    }
+
+    /** Builder to build {@link ChainKeyValueFileReaderFactory}. */
+    public static class Builder {
+
+        private final KeyValueFileReaderFactory.Builder wrapped;
+
+        public Builder(KeyValueFileReaderFactory.Builder wrapped) {
+            this.wrapped = wrapped;
+        }
+
+        public ChainKeyValueFileReaderFactory build(
+                BinaryRow partition,
+                DeletionVector.Factory dvFactory,
+                boolean projectKeys,
+                @Nullable List<Predicate> filters,
+                @Nullable VariantAccessInfo[] variantAccess,
+                @Nullable ChainReadContext chainReadContext) {
+            FormatReaderMapping.Builder builder =
+                    wrapped.formatReaderMappingBuilder(projectKeys, filters, 
variantAccess);
+            return new ChainKeyValueFileReaderFactory(
+                    wrapped.fileIO,
+                    wrapped.schemaManager,
+                    wrapped.schema,
+                    projectKeys ? wrapped.readKeyType : wrapped.keyType,
+                    wrapped.readValueType,
+                    builder,
+                    
wrapped.pathFactory.createChainReadDataFilePathFactory(chainReadContext),
+                    wrapped.options.fileReaderAsyncThreshold().getBytes(),
+                    partition,
+                    dvFactory,
+                    chainReadContext);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 5864a8b963..7f48f24602 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -198,18 +198,18 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
     /** Builder for {@link KeyValueFileReaderFactory}. */
     public static class Builder {
 
-        private final FileIO fileIO;
-        private final SchemaManager schemaManager;
-        private final TableSchema schema;
-        private final RowType keyType;
-        private final RowType valueType;
-        private final FileFormatDiscover formatDiscover;
-        private final FileStorePathFactory pathFactory;
-        private final KeyValueFieldsExtractor extractor;
-        private final CoreOptions options;
-
-        private RowType readKeyType;
-        private RowType readValueType;
+        protected final FileIO fileIO;
+        protected final SchemaManager schemaManager;
+        protected final TableSchema schema;
+        protected final RowType keyType;
+        protected final RowType valueType;
+        protected final FileFormatDiscover formatDiscover;
+        protected final FileStorePathFactory pathFactory;
+        protected final KeyValueFieldsExtractor extractor;
+        protected final CoreOptions options;
+
+        protected RowType readKeyType;
+        protected RowType readValueType;
 
         private Builder(
                 FileIO fileIO,
@@ -278,67 +278,43 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                 boolean projectKeys,
                 @Nullable List<Predicate> filters,
                 @Nullable VariantAccessInfo[] variantAccess) {
-            return build(
-                    partition, bucket, dvFactory, projectKeys, filters, 
variantAccess, null, null);
+            FormatReaderMapping.Builder builder =
+                    formatReaderMappingBuilder(projectKeys, filters, 
variantAccess);
+            return new KeyValueFileReaderFactory(
+                    fileIO,
+                    schemaManager,
+                    schema,
+                    projectKeys ? this.readKeyType : keyType,
+                    readValueType,
+                    builder,
+                    pathFactory.createDataFilePathFactory(partition, bucket),
+                    options.fileReaderAsyncThreshold().getBytes(),
+                    partition,
+                    dvFactory);
         }
 
-        public KeyValueFileReaderFactory build(
-                BinaryRow partition,
-                int bucket,
-                DeletionVector.Factory dvFactory,
+        protected FormatReaderMapping.Builder formatReaderMappingBuilder(
                 boolean projectKeys,
                 @Nullable List<Predicate> filters,
-                @Nullable VariantAccessInfo[] variantAccess,
-                @Nullable KeyValueFileReaderFactoryType factoryType,
-                @Nullable ChainReadContext chainReadContext) {
+                @Nullable VariantAccessInfo[] variantAccess) {
             RowType finalReadKeyType = projectKeys ? this.readKeyType : 
keyType;
+            List<DataField> readTableFields =
+                    KeyValue.createKeyValueFields(
+                            finalReadKeyType.getFields(), 
readValueType.getFields());
             Function<TableSchema, List<DataField>> fieldsExtractor =
                     schema -> {
                         List<DataField> dataKeyFields = 
extractor.keyFields(schema);
                         List<DataField> dataValueFields = 
extractor.valueFields(schema);
                         return KeyValue.createKeyValueFields(dataKeyFields, 
dataValueFields);
                     };
-            List<DataField> readTableFields =
-                    KeyValue.createKeyValueFields(
-                            finalReadKeyType.getFields(), 
readValueType.getFields());
-            FormatReaderMapping.Builder builder =
-                    new FormatReaderMapping.Builder(
-                            formatDiscover,
-                            readTableFields,
-                            fieldsExtractor,
-                            filters,
-                            null,
-                            null,
-                            variantAccess);
-
-            if (factoryType == null || factoryType == 
KeyValueFileReaderFactoryType.DEFAULT) {
-                return new KeyValueFileReaderFactory(
-                        fileIO,
-                        schemaManager,
-                        schema,
-                        finalReadKeyType,
-                        readValueType,
-                        builder,
-                        pathFactory.createDataFilePathFactory(partition, 
bucket),
-                        options.fileReaderAsyncThreshold().getBytes(),
-                        partition,
-                        dvFactory);
-            } else if (factoryType == KeyValueFileReaderFactoryType.CHAIN) {
-                return new ChainKeyValueFileReaderFactory(
-                        fileIO,
-                        schemaManager,
-                        schema,
-                        finalReadKeyType,
-                        readValueType,
-                        builder,
-                        
pathFactory.createChainReadDataFilePathFactory(chainReadContext),
-                        options.fileReaderAsyncThreshold().getBytes(),
-                        partition,
-                        dvFactory,
-                        chainReadContext);
-            } else {
-                throw new IllegalArgumentException("Unsupported factory type: 
" + factoryType);
-            }
+            return new FormatReaderMapping.Builder(
+                    formatDiscover,
+                    readTableFields,
+                    fieldsExtractor,
+                    filters,
+                    null,
+                    null,
+                    variantAccess);
         }
 
         public FileIO fileIO() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 5383355db6..2cd3a5322e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -31,7 +31,6 @@ import org.apache.paimon.io.ChainKeyValueFileReaderFactory;
 import org.apache.paimon.io.ChainReadContext;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
-import 
org.apache.paimon.io.KeyValueFileReaderFactory.KeyValueFileReaderFactoryType;
 import org.apache.paimon.mergetree.DropDeleteReader;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
@@ -301,28 +300,14 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
                         .build();
         DeletionVector.Factory dvFactory =
                 DeletionVector.factory(fileIO, files, 
chainSplit.deletionFiles().orElse(null));
+        ChainKeyValueFileReaderFactory.Builder builder =
+                
ChainKeyValueFileReaderFactory.newBuilder(readerFactoryBuilder);
         ChainKeyValueFileReaderFactory overlappedSectionFactory =
-                (ChainKeyValueFileReaderFactory)
-                        readerFactoryBuilder.build(
-                                null,
-                                chainSplit.bucket(),
-                                dvFactory,
-                                false,
-                                filtersForKeys,
-                                variantAccess,
-                                KeyValueFileReaderFactoryType.CHAIN,
-                                chainReadContext);
+                builder.build(
+                        null, dvFactory, false, filtersForKeys, variantAccess, 
chainReadContext);
         ChainKeyValueFileReaderFactory nonOverlappedSectionFactory =
-                (ChainKeyValueFileReaderFactory)
-                        readerFactoryBuilder.build(
-                                null,
-                                chainSplit.bucket(),
-                                dvFactory,
-                                false,
-                                filtersForAll,
-                                variantAccess,
-                                KeyValueFileReaderFactoryType.CHAIN,
-                                chainReadContext);
+                builder.build(
+                        null, dvFactory, false, filtersForAll, variantAccess, 
chainReadContext);
         return createMergeReader(
                 files, overlappedSectionFactory, nonOverlappedSectionFactory, 
forceKeepDelete);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
index 8b12520b01..30733c93e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
@@ -27,8 +27,6 @@ import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.utils.SerializationUtils;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -46,28 +44,19 @@ public class ChainSplit implements Split {
 
     private static final long serialVersionUID = 1L;
 
-    private static final long MAGIC = -6740198283268612946L;
     private static final int VERSION = 1;
 
-    /** The logical partition of this split. */
     private BinaryRow logicalPartition;
-
-    private int bucket;
-    private int totalBuckets;
     private List<DataFileMeta> dataFiles;
     private Map<String, String> fileBranchMapping;
     private Map<String, String> fileBucketPathMapping;
 
     public ChainSplit(
             BinaryRow logicalPartition,
-            int bucket,
-            int totalBuckets,
             List<DataFileMeta> dataFiles,
             Map<String, String> fileBranchMapping,
             Map<String, String> fileBucketPathMapping) {
         this.logicalPartition = logicalPartition;
-        this.bucket = bucket;
-        this.totalBuckets = totalBuckets;
         this.dataFiles = dataFiles;
         this.fileBranchMapping = fileBranchMapping;
         this.fileBucketPathMapping = fileBucketPathMapping;
@@ -77,14 +66,6 @@ public class ChainSplit implements Split {
         return logicalPartition;
     }
 
-    public int bucket() {
-        return bucket;
-    }
-
-    public @Nullable Integer totalBuckets() {
-        return totalBuckets;
-    }
-
     public List<DataFileMeta> dataFiles() {
         return dataFiles;
     }
@@ -115,27 +96,13 @@ public class ChainSplit implements Split {
             return false;
         }
         ChainSplit that = (ChainSplit) o;
-        return bucket == that.bucket
-                && totalBuckets == that.totalBuckets
-                && Objects.equals(logicalPartition, that.logicalPartition)
+        return Objects.equals(logicalPartition, that.logicalPartition)
                 && Objects.equals(dataFiles, that.dataFiles);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(logicalPartition, bucket, totalBuckets, dataFiles);
-    }
-
-    @Override
-    public String toString() {
-        return "{"
-                + "partition=hash-"
-                + (logicalPartition == null ? 0 : logicalPartition.hashCode())
-                + ", bucket="
-                + bucket
-                + '}'
-                + "@"
-                + Integer.toHexString(hashCode());
+        return Objects.hash(logicalPartition, dataFiles);
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -148,18 +115,15 @@ public class ChainSplit implements Split {
 
     protected void assign(ChainSplit other) {
         this.logicalPartition = other.logicalPartition;
-        this.bucket = other.bucket;
-        this.totalBuckets = other.totalBuckets;
         this.dataFiles = other.dataFiles;
+        this.fileBranchMapping = other.fileBranchMapping;
+        this.fileBucketPathMapping = other.fileBucketPathMapping;
     }
 
     public void serialize(DataOutputView out) throws IOException {
-        out.writeLong(MAGIC);
         out.writeInt(VERSION);
 
         SerializationUtils.serializeBinaryRow(logicalPartition, out);
-        out.writeInt(bucket);
-        out.writeInt(totalBuckets);
 
         DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
         int size = dataFiles == null ? 0 : dataFiles.size();
@@ -183,18 +147,12 @@ public class ChainSplit implements Split {
     }
 
     public static ChainSplit deserialize(DataInputView in) throws IOException {
-        long magic = in.readLong();
-        if (magic != MAGIC) {
-            throw new UnsupportedOperationException("Unsupported magic: " + 
magic);
-        }
         int version = in.readInt();
         if (version != VERSION) {
             throw new UnsupportedOperationException("Unsupported version: " + 
version);
         }
 
         BinaryRow logicalPartition = 
SerializationUtils.deserializeBinaryRow(in);
-        int bucket = in.readInt();
-        int totalBuckets = in.readInt();
 
         int n = in.readInt();
         List<DataFileMeta> dataFiles = new ArrayList<>(n);
@@ -219,11 +177,6 @@ public class ChainSplit implements Split {
         }
 
         return new ChainSplit(
-                logicalPartition,
-                bucket,
-                totalBuckets,
-                dataFiles,
-                fileBucketPathMapping,
-                fileBranchMapping);
+                logicalPartition, dataFiles, fileBucketPathMapping, 
fileBranchMapping);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
index eebed5499b..c042f6a371 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
@@ -23,12 +23,10 @@ import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileTestDataGenerator;
-import org.apache.paimon.io.DataInputDeserializer;
-import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.utils.InstantiationUtil;
 
 import org.junit.jupiter.api.Test;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -41,7 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ChainSplitTest {
 
     @Test
-    public void testChainSplitSerde() throws IOException {
+    public void testChainSplitSerde() throws IOException, 
ClassNotFoundException {
         BinaryRow logicalPartition = new BinaryRow(1);
         BinaryRowWriter writer = new BinaryRowWriter(logicalPartition);
         writer.writeString(0, BinaryString.fromString("20251202"));
@@ -58,15 +56,10 @@ public class ChainSplitTest {
         }
         ChainSplit split =
                 new ChainSplit(
-                        logicalPartition,
-                        0,
-                        2,
-                        dataFiles,
-                        fileBucketPathMapping,
-                        fileBranchMapping);
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        split.serialize(new DataOutputViewStreamWrapper(out));
-        ChainSplit newSplit = ChainSplit.deserialize(new 
DataInputDeserializer(out.toByteArray()));
+                        logicalPartition, dataFiles, fileBucketPathMapping, 
fileBranchMapping);
+        byte[] bytes = InstantiationUtil.serializeObject(split);
+        ChainSplit newSplit =
+                InstantiationUtil.deserializeObject(bytes, 
ChainSplit.class.getClassLoader());
         
assertThat(fileBucketPathMapping).isEqualTo(newSplit.fileBucketPathMapping());
         assertThat(fileBranchMapping).isEqualTo(newSplit.fileBranchMapping());
         assertThat(newSplit).isEqualTo(split);

Reply via email to