This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f28ccaab794 [fix](paimon)Set the target size of the split for 3.0 
(#50405)
f28ccaab794 is described below

commit f28ccaab794e83fc331d271579fd739c6068aaa9
Author: wuwenchi <[email protected]>
AuthorDate: Tue Apr 29 15:17:48 2025 +0800

    [fix](paimon)Set the target size of the split for 3.0 (#50405)
    
    ### What problem does this PR solve?
    
    bp: #50083
---
 .../datasource/paimon/source/PaimonScanNode.java   |  44 ++++--
 .../datasource/paimon/source/PaimonSource.java     |   8 ++
 .../paimon/source/PaimonScanNodeTest.java          | 157 +++++++++++++++++++++
 3 files changed, 195 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 07a49a7aaea..a7367f3e274 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPaimonFileDesc;
 import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
@@ -123,6 +124,11 @@ public class PaimonScanNode extends FileQueryScanNode {
         Preconditions.checkNotNull(source);
     }
 
+    @VisibleForTesting
+    public void setSource(PaimonSource source) {
+        this.source = source;
+    }
+
     @Override
     protected void convertPredicate() {
         PaimonPredicateConverter paimonPredicateConverter = new 
PaimonPredicateConverter(
@@ -211,22 +217,12 @@ public class PaimonScanNode extends FileQueryScanNode {
         SessionVariable.IgnoreSplitType ignoreSplitType = 
SessionVariable.IgnoreSplitType
                 .valueOf(sessionVariable.getIgnoreSplitType());
         List<Split> splits = new ArrayList<>();
-        int[] projected = desc.getSlots().stream().mapToInt(
-                slot -> source.getPaimonTable().rowType()
-                        .getFieldNames()
-                        .stream()
-                        .map(String::toLowerCase)
-                        .collect(Collectors.toList())
-                        .indexOf(slot.getColumn().getName()))
-                .toArray();
-        ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
-        List<org.apache.paimon.table.source.Split> paimonSplits = 
readBuilder.withFilter(predicates)
-                .withProjection(projected)
-                .newScan().plan().splits();
 
+        List<org.apache.paimon.table.source.Split> paimonSplits = 
getPaimonSplitFromAPI();
         boolean applyCountPushdown = getPushDownAggNoGroupingOp() == 
TPushAggOp.COUNT;
         // Just for counting the number of selected partitions for this paimon 
table
         Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
+        long realFileSplitSize = getRealFileSplitSize(0);
         for (org.apache.paimon.table.source.Split split : paimonSplits) {
             SplitStat splitStat = new SplitStat();
             splitStat.setRowCount(split.rowCount());
@@ -254,7 +250,7 @@ public class PaimonScanNode extends FileQueryScanNode {
                             try {
                                 List<Split> dorisSplits = 
FileSplitter.splitFile(
                                         locationPath,
-                                        getRealFileSplitSize(0),
+                                        realFileSplitSize,
                                         null,
                                         file.length(),
                                         -1,
@@ -294,11 +290,30 @@ public class PaimonScanNode extends FileQueryScanNode {
             splitStats.add(splitStat);
         }
 
+        // We need to set the target size for all splits so that we can 
calculate the proportion of each split later.
+        splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));
+
         this.selectedPartitionNum = selectedPartitionValues.size();
         // TODO: get total partition number
         return splits;
     }
 
+    @VisibleForTesting
+    public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() {
+        int[] projected = desc.getSlots().stream().mapToInt(
+                slot -> source.getPaimonTable().rowType()
+                    .getFieldNames()
+                    .stream()
+                    .map(String::toLowerCase)
+                    .collect(Collectors.toList())
+                    .indexOf(slot.getColumn().getName()))
+            .toArray();
+        ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
+        return readBuilder.withFilter(predicates)
+            .withProjection(projected)
+            .newScan().plan().splits();
+    }
+
     private void createRawFileSplits(List<RawFile> rawFiles, List<Split> 
splits, long blockSize) throws UserException {
         for (RawFile file : rawFiles) {
             LocationPath locationPath = new LocationPath(file.path(),
@@ -325,7 +340,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         return 
FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties());
     }
 
-    private boolean supportNativeReader(Optional<List<RawFile>> optRawFiles) {
+    @VisibleForTesting
+    public boolean supportNativeReader(Optional<List<RawFile>> optRawFiles) {
         if (!optRawFiles.isPresent()) {
             return false;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index a8bb814f1d3..1c6b88b16ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.property.constants.PaimonProperties;
 import org.apache.doris.thrift.TFileAttributes;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.paimon.table.Table;
 
 
@@ -34,6 +35,13 @@ public class PaimonSource {
     private final Table originTable;
     private final TupleDescriptor desc;
 
+    @VisibleForTesting
+    public PaimonSource() {
+        this.desc = null;
+        this.paimonExtTable = null;
+        this.originTable = null;
+    }
+
     public PaimonSource(TupleDescriptor desc) {
         this.desc = desc;
         this.paimonExtTable = (PaimonExternalTable) desc.getTable();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
new file mode 100644
index 00000000000..4a2d61f2c7d
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class PaimonScanNodeTest {
+
+    @Mocked
+    private SessionVariable sv;
+
+    @Mocked
+    private PaimonFileExternalCatalog paimonFileExternalCatalog;
+
+    @Test
+    public void testSplitWeight() throws UserException {
+
+        TupleDescriptor desc = new TupleDescriptor(new TupleId(3));
+        PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1), 
desc, false, sv);
+
+        paimonScanNode.setSource(new PaimonSource());
+
+        DataFileMeta dfm1 = DataFileMeta.forAppend("f1.parquet", 64 * 1024 * 
1024, 1, SimpleStats.EMPTY_STATS, 1, 1, 1,
+                Collections.emptyList(), null, null, null, null);
+        BinaryRow binaryRow1 = BinaryRow.singleColumn(1);
+        DataSplit ds1 = DataSplit.builder()
+                .rawConvertible(true)
+                .withPartition(binaryRow1)
+                .withBucket(1)
+                .withBucketPath("b1")
+                .withDataFiles(Collections.singletonList(dfm1))
+                .build();
+
+        DataFileMeta dfm2 = DataFileMeta.forAppend("f2.parquet", 32 * 1024 * 
1024, 2, SimpleStats.EMPTY_STATS, 1, 1, 1,
+                Collections.emptyList(), null, null, null, null);
+        BinaryRow binaryRow2 = BinaryRow.singleColumn(1);
+        DataSplit ds2 = DataSplit.builder()
+                .rawConvertible(true)
+                .withPartition(binaryRow2)
+                .withBucket(1)
+                .withBucketPath("b1")
+                .withDataFiles(Collections.singletonList(dfm2))
+                .build();
+
+
+        new MockUp<PaimonScanNode>() {
+            @Mock
+            public List<org.apache.paimon.table.source.Split> 
getPaimonSplitFromAPI() {
+                return new ArrayList<org.apache.paimon.table.source.Split>() {{
+                        add(ds1);
+                        add(ds2);
+                    }};
+            }
+        };
+
+        new MockUp<PaimonSource>() {
+            @Mock
+            public ExternalCatalog getCatalog() {
+                return paimonFileExternalCatalog;
+            }
+        };
+
+        new MockUp<ExternalCatalog>() {
+            @Mock
+            public Map<String, String> getProperties() {
+                return Collections.emptyMap();
+            }
+        };
+
+        new Expectations() {{
+                sv.isForceJniScanner();
+                result = false;
+
+                sv.getIgnoreSplitType();
+                result = "NONE";
+            }};
+
+        // native
+        mockNativeReader();
+        List<org.apache.doris.spi.Split> s1 = paimonScanNode.getSplits(1);
+        PaimonSplit s11 = (PaimonSplit) s1.get(0);
+        PaimonSplit s12 = (PaimonSplit) s1.get(1);
+        Assert.assertEquals(2, s1.size());
+        Assert.assertEquals(100, s11.getSplitWeight().getRawValue());
+        Assert.assertNull(s11.getSplit());
+        Assert.assertEquals(50, s12.getSplitWeight().getRawValue());
+        Assert.assertNull(s12.getSplit());
+
+        // jni
+        mockJniReader();
+        List<org.apache.doris.spi.Split> s2 = paimonScanNode.getSplits(1);
+        PaimonSplit s21 = (PaimonSplit) s2.get(0);
+        PaimonSplit s22 = (PaimonSplit) s2.get(1);
+        Assert.assertEquals(2, s2.size());
+        Assert.assertNotNull(s21.getSplit());
+        Assert.assertNotNull(s22.getSplit());
+        Assert.assertEquals(100, s21.getSplitWeight().getRawValue());
+        Assert.assertEquals(50, s22.getSplitWeight().getRawValue());
+    }
+
+    private void mockJniReader() {
+        new MockUp<PaimonScanNode>() {
+            @Mock
+            public boolean supportNativeReader(Optional<List<RawFile>> 
optRawFiles) {
+                return false;
+            }
+        };
+    }
+
+    private void mockNativeReader() {
+        new MockUp<PaimonScanNode>() {
+            @Mock
+            public boolean supportNativeReader(Optional<List<RawFile>> 
optRawFiles) {
+                return true;
+            }
+        };
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to