This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a133f7dafd [core] Refactor ChainSplit to remove bucket and totalBucket
a133f7dafd is described below
commit a133f7dafd77eb87c48271c86f60251facdbfc58
Author: JingsongLi <[email protected]>
AuthorDate: Wed Dec 17 22:09:43 2025 +0800
[core] Refactor ChainSplit to remove bucket and totalBucket
---
.../paimon/io/ChainKeyValueFileReaderFactory.java | 42 +++++++++
.../paimon/io/KeyValueFileReaderFactory.java | 100 ++++++++-------------
.../paimon/operation/MergeFileSplitRead.java | 27 ++----
.../org/apache/paimon/table/source/ChainSplit.java | 57 ++----------
.../apache/paimon/table/source/ChainSplitTest.java | 19 ++--
5 files changed, 97 insertions(+), 148 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
index 66ec25b4ca..0fa9ae73ef 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
@@ -20,14 +20,19 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FormatReaderMapping;
+import javax.annotation.Nullable;
+
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** A specific implementation about {@link KeyValueFileReaderFactory} for
chain read. */
@@ -85,4 +90,41 @@ public class ChainKeyValueFileReaderFactory extends
KeyValueFileReaderFactory {
protected BinaryRow getLogicalPartition() {
return chainReadContext.logicalPartition();
}
+
+ public static Builder newBuilder(KeyValueFileReaderFactory.Builder
wrapped) {
+ return new Builder(wrapped);
+ }
+
+ /** Builder to build {@link ChainKeyValueFileReaderFactory}. */
+ public static class Builder {
+
+ private final KeyValueFileReaderFactory.Builder wrapped;
+
+ public Builder(KeyValueFileReaderFactory.Builder wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ public ChainKeyValueFileReaderFactory build(
+ BinaryRow partition,
+ DeletionVector.Factory dvFactory,
+ boolean projectKeys,
+ @Nullable List<Predicate> filters,
+ @Nullable VariantAccessInfo[] variantAccess,
+ @Nullable ChainReadContext chainReadContext) {
+ FormatReaderMapping.Builder builder =
+ wrapped.formatReaderMappingBuilder(projectKeys, filters,
variantAccess);
+ return new ChainKeyValueFileReaderFactory(
+ wrapped.fileIO,
+ wrapped.schemaManager,
+ wrapped.schema,
+ projectKeys ? wrapped.readKeyType : wrapped.keyType,
+ wrapped.readValueType,
+ builder,
+
wrapped.pathFactory.createChainReadDataFilePathFactory(chainReadContext),
+ wrapped.options.fileReaderAsyncThreshold().getBytes(),
+ partition,
+ dvFactory,
+ chainReadContext);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 5864a8b963..7f48f24602 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -198,18 +198,18 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
/** Builder for {@link KeyValueFileReaderFactory}. */
public static class Builder {
- private final FileIO fileIO;
- private final SchemaManager schemaManager;
- private final TableSchema schema;
- private final RowType keyType;
- private final RowType valueType;
- private final FileFormatDiscover formatDiscover;
- private final FileStorePathFactory pathFactory;
- private final KeyValueFieldsExtractor extractor;
- private final CoreOptions options;
-
- private RowType readKeyType;
- private RowType readValueType;
+ protected final FileIO fileIO;
+ protected final SchemaManager schemaManager;
+ protected final TableSchema schema;
+ protected final RowType keyType;
+ protected final RowType valueType;
+ protected final FileFormatDiscover formatDiscover;
+ protected final FileStorePathFactory pathFactory;
+ protected final KeyValueFieldsExtractor extractor;
+ protected final CoreOptions options;
+
+ protected RowType readKeyType;
+ protected RowType readValueType;
private Builder(
FileIO fileIO,
@@ -278,67 +278,43 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
boolean projectKeys,
@Nullable List<Predicate> filters,
@Nullable VariantAccessInfo[] variantAccess) {
- return build(
- partition, bucket, dvFactory, projectKeys, filters,
variantAccess, null, null);
+ FormatReaderMapping.Builder builder =
+ formatReaderMappingBuilder(projectKeys, filters,
variantAccess);
+ return new KeyValueFileReaderFactory(
+ fileIO,
+ schemaManager,
+ schema,
+ projectKeys ? this.readKeyType : keyType,
+ readValueType,
+ builder,
+ pathFactory.createDataFilePathFactory(partition, bucket),
+ options.fileReaderAsyncThreshold().getBytes(),
+ partition,
+ dvFactory);
}
- public KeyValueFileReaderFactory build(
- BinaryRow partition,
- int bucket,
- DeletionVector.Factory dvFactory,
+ protected FormatReaderMapping.Builder formatReaderMappingBuilder(
boolean projectKeys,
@Nullable List<Predicate> filters,
- @Nullable VariantAccessInfo[] variantAccess,
- @Nullable KeyValueFileReaderFactoryType factoryType,
- @Nullable ChainReadContext chainReadContext) {
+ @Nullable VariantAccessInfo[] variantAccess) {
RowType finalReadKeyType = projectKeys ? this.readKeyType :
keyType;
+ List<DataField> readTableFields =
+ KeyValue.createKeyValueFields(
+ finalReadKeyType.getFields(),
readValueType.getFields());
Function<TableSchema, List<DataField>> fieldsExtractor =
schema -> {
List<DataField> dataKeyFields =
extractor.keyFields(schema);
List<DataField> dataValueFields =
extractor.valueFields(schema);
return KeyValue.createKeyValueFields(dataKeyFields,
dataValueFields);
};
- List<DataField> readTableFields =
- KeyValue.createKeyValueFields(
- finalReadKeyType.getFields(),
readValueType.getFields());
- FormatReaderMapping.Builder builder =
- new FormatReaderMapping.Builder(
- formatDiscover,
- readTableFields,
- fieldsExtractor,
- filters,
- null,
- null,
- variantAccess);
-
- if (factoryType == null || factoryType ==
KeyValueFileReaderFactoryType.DEFAULT) {
- return new KeyValueFileReaderFactory(
- fileIO,
- schemaManager,
- schema,
- finalReadKeyType,
- readValueType,
- builder,
- pathFactory.createDataFilePathFactory(partition,
bucket),
- options.fileReaderAsyncThreshold().getBytes(),
- partition,
- dvFactory);
- } else if (factoryType == KeyValueFileReaderFactoryType.CHAIN) {
- return new ChainKeyValueFileReaderFactory(
- fileIO,
- schemaManager,
- schema,
- finalReadKeyType,
- readValueType,
- builder,
-
pathFactory.createChainReadDataFilePathFactory(chainReadContext),
- options.fileReaderAsyncThreshold().getBytes(),
- partition,
- dvFactory,
- chainReadContext);
- } else {
- throw new IllegalArgumentException("Unsupported factory type:
" + factoryType);
- }
+ return new FormatReaderMapping.Builder(
+ formatDiscover,
+ readTableFields,
+ fieldsExtractor,
+ filters,
+ null,
+ null,
+ variantAccess);
}
public FileIO fileIO() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 5383355db6..2cd3a5322e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -31,7 +31,6 @@ import org.apache.paimon.io.ChainKeyValueFileReaderFactory;
import org.apache.paimon.io.ChainReadContext;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
-import
org.apache.paimon.io.KeyValueFileReaderFactory.KeyValueFileReaderFactoryType;
import org.apache.paimon.mergetree.DropDeleteReader;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
@@ -301,28 +300,14 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
.build();
DeletionVector.Factory dvFactory =
DeletionVector.factory(fileIO, files,
chainSplit.deletionFiles().orElse(null));
+ ChainKeyValueFileReaderFactory.Builder builder =
+
ChainKeyValueFileReaderFactory.newBuilder(readerFactoryBuilder);
ChainKeyValueFileReaderFactory overlappedSectionFactory =
- (ChainKeyValueFileReaderFactory)
- readerFactoryBuilder.build(
- null,
- chainSplit.bucket(),
- dvFactory,
- false,
- filtersForKeys,
- variantAccess,
- KeyValueFileReaderFactoryType.CHAIN,
- chainReadContext);
+ builder.build(
+ null, dvFactory, false, filtersForKeys, variantAccess,
chainReadContext);
ChainKeyValueFileReaderFactory nonOverlappedSectionFactory =
- (ChainKeyValueFileReaderFactory)
- readerFactoryBuilder.build(
- null,
- chainSplit.bucket(),
- dvFactory,
- false,
- filtersForAll,
- variantAccess,
- KeyValueFileReaderFactoryType.CHAIN,
- chainReadContext);
+ builder.build(
+ null, dvFactory, false, filtersForAll, variantAccess,
chainReadContext);
return createMergeReader(
files, overlappedSectionFactory, nonOverlappedSectionFactory,
forceKeepDelete);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
index 8b12520b01..30733c93e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
@@ -27,8 +27,6 @@ import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.utils.SerializationUtils;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -46,28 +44,19 @@ public class ChainSplit implements Split {
private static final long serialVersionUID = 1L;
- private static final long MAGIC = -6740198283268612946L;
private static final int VERSION = 1;
- /** The logical partition of this split. */
private BinaryRow logicalPartition;
-
- private int bucket;
- private int totalBuckets;
private List<DataFileMeta> dataFiles;
private Map<String, String> fileBranchMapping;
private Map<String, String> fileBucketPathMapping;
public ChainSplit(
BinaryRow logicalPartition,
- int bucket,
- int totalBuckets,
List<DataFileMeta> dataFiles,
Map<String, String> fileBranchMapping,
Map<String, String> fileBucketPathMapping) {
this.logicalPartition = logicalPartition;
- this.bucket = bucket;
- this.totalBuckets = totalBuckets;
this.dataFiles = dataFiles;
this.fileBranchMapping = fileBranchMapping;
this.fileBucketPathMapping = fileBucketPathMapping;
@@ -77,14 +66,6 @@ public class ChainSplit implements Split {
return logicalPartition;
}
- public int bucket() {
- return bucket;
- }
-
- public @Nullable Integer totalBuckets() {
- return totalBuckets;
- }
-
public List<DataFileMeta> dataFiles() {
return dataFiles;
}
@@ -115,27 +96,13 @@ public class ChainSplit implements Split {
return false;
}
ChainSplit that = (ChainSplit) o;
- return bucket == that.bucket
- && totalBuckets == that.totalBuckets
- && Objects.equals(logicalPartition, that.logicalPartition)
+ return Objects.equals(logicalPartition, that.logicalPartition)
&& Objects.equals(dataFiles, that.dataFiles);
}
@Override
public int hashCode() {
- return Objects.hash(logicalPartition, bucket, totalBuckets, dataFiles);
- }
-
- @Override
- public String toString() {
- return "{"
- + "partition=hash-"
- + (logicalPartition == null ? 0 : logicalPartition.hashCode())
- + ", bucket="
- + bucket
- + '}'
- + "@"
- + Integer.toHexString(hashCode());
+ return Objects.hash(logicalPartition, dataFiles);
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -148,18 +115,15 @@ public class ChainSplit implements Split {
protected void assign(ChainSplit other) {
this.logicalPartition = other.logicalPartition;
- this.bucket = other.bucket;
- this.totalBuckets = other.totalBuckets;
this.dataFiles = other.dataFiles;
+ this.fileBranchMapping = other.fileBranchMapping;
+ this.fileBucketPathMapping = other.fileBucketPathMapping;
}
public void serialize(DataOutputView out) throws IOException {
- out.writeLong(MAGIC);
out.writeInt(VERSION);
SerializationUtils.serializeBinaryRow(logicalPartition, out);
- out.writeInt(bucket);
- out.writeInt(totalBuckets);
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
int size = dataFiles == null ? 0 : dataFiles.size();
@@ -183,18 +147,12 @@ public class ChainSplit implements Split {
}
public static ChainSplit deserialize(DataInputView in) throws IOException {
- long magic = in.readLong();
- if (magic != MAGIC) {
- throw new UnsupportedOperationException("Unsupported magic: " +
magic);
- }
int version = in.readInt();
if (version != VERSION) {
throw new UnsupportedOperationException("Unsupported version: " +
version);
}
BinaryRow logicalPartition =
SerializationUtils.deserializeBinaryRow(in);
- int bucket = in.readInt();
- int totalBuckets = in.readInt();
int n = in.readInt();
List<DataFileMeta> dataFiles = new ArrayList<>(n);
@@ -219,11 +177,6 @@ public class ChainSplit implements Split {
}
return new ChainSplit(
- logicalPartition,
- bucket,
- totalBuckets,
- dataFiles,
- fileBucketPathMapping,
- fileBranchMapping);
+ logicalPartition, dataFiles, fileBucketPathMapping,
fileBranchMapping);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
index eebed5499b..c042f6a371 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
@@ -23,12 +23,10 @@ import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileTestDataGenerator;
-import org.apache.paimon.io.DataInputDeserializer;
-import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.utils.InstantiationUtil;
import org.junit.jupiter.api.Test;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -41,7 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ChainSplitTest {
@Test
- public void testChainSplitSerde() throws IOException {
+ public void testChainSplitSerde() throws IOException,
ClassNotFoundException {
BinaryRow logicalPartition = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(logicalPartition);
writer.writeString(0, BinaryString.fromString("20251202"));
@@ -58,15 +56,10 @@ public class ChainSplitTest {
}
ChainSplit split =
new ChainSplit(
- logicalPartition,
- 0,
- 2,
- dataFiles,
- fileBucketPathMapping,
- fileBranchMapping);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- split.serialize(new DataOutputViewStreamWrapper(out));
- ChainSplit newSplit = ChainSplit.deserialize(new
DataInputDeserializer(out.toByteArray()));
+ logicalPartition, dataFiles, fileBucketPathMapping,
fileBranchMapping);
+ byte[] bytes = InstantiationUtil.serializeObject(split);
+ ChainSplit newSplit =
+ InstantiationUtil.deserializeObject(bytes,
ChainSplit.class.getClassLoader());
assertThat(fileBucketPathMapping).isEqualTo(newSplit.fileBucketPathMapping());
assertThat(fileBranchMapping).isEqualTo(newSplit.fileBranchMapping());
assertThat(newSplit).isEqualTo(split);