This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f28ccaab794 [fix](paimon)Set the target size of the split for 3.0
(#50405)
f28ccaab794 is described below
commit f28ccaab794e83fc331d271579fd739c6068aaa9
Author: wuwenchi <[email protected]>
AuthorDate: Tue Apr 29 15:17:48 2025 +0800
[fix](paimon)Set the target size of the split for 3.0 (#50405)
### What problem does this PR solve?
bp: #50083
---
.../datasource/paimon/source/PaimonScanNode.java | 44 ++++--
.../datasource/paimon/source/PaimonSource.java | 8 ++
.../paimon/source/PaimonScanNodeTest.java | 157 +++++++++++++++++++++
3 files changed, 195 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 07a49a7aaea..a7367f3e274 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TTableFormatFileDesc;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
@@ -123,6 +124,11 @@ public class PaimonScanNode extends FileQueryScanNode {
Preconditions.checkNotNull(source);
}
+ @VisibleForTesting
+ public void setSource(PaimonSource source) {
+ this.source = source;
+ }
+
@Override
protected void convertPredicate() {
PaimonPredicateConverter paimonPredicateConverter = new
PaimonPredicateConverter(
@@ -211,22 +217,12 @@ public class PaimonScanNode extends FileQueryScanNode {
SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType
.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
- int[] projected = desc.getSlots().stream().mapToInt(
- slot -> source.getPaimonTable().rowType()
- .getFieldNames()
- .stream()
- .map(String::toLowerCase)
- .collect(Collectors.toList())
- .indexOf(slot.getColumn().getName()))
- .toArray();
- ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
- List<org.apache.paimon.table.source.Split> paimonSplits =
readBuilder.withFilter(predicates)
- .withProjection(projected)
- .newScan().plan().splits();
+ List<org.apache.paimon.table.source.Split> paimonSplits =
getPaimonSplitFromAPI();
boolean applyCountPushdown = getPushDownAggNoGroupingOp() ==
TPushAggOp.COUNT;
// Just for counting the number of selected partitions for this paimon
table
Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
+ long realFileSplitSize = getRealFileSplitSize(0);
for (org.apache.paimon.table.source.Split split : paimonSplits) {
SplitStat splitStat = new SplitStat();
splitStat.setRowCount(split.rowCount());
@@ -254,7 +250,7 @@ public class PaimonScanNode extends FileQueryScanNode {
try {
List<Split> dorisSplits =
FileSplitter.splitFile(
locationPath,
- getRealFileSplitSize(0),
+ realFileSplitSize,
null,
file.length(),
-1,
@@ -294,11 +290,30 @@ public class PaimonScanNode extends FileQueryScanNode {
splitStats.add(splitStat);
}
+ // We need to set the target size for all splits so that we can
calculate the proportion of each split later.
+ splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));
+
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
return splits;
}
+ @VisibleForTesting
+ public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() {
+ int[] projected = desc.getSlots().stream().mapToInt(
+ slot -> source.getPaimonTable().rowType()
+ .getFieldNames()
+ .stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList())
+ .indexOf(slot.getColumn().getName()))
+ .toArray();
+ ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
+ return readBuilder.withFilter(predicates)
+ .withProjection(projected)
+ .newScan().plan().splits();
+ }
+
private void createRawFileSplits(List<RawFile> rawFiles, List<Split>
splits, long blockSize) throws UserException {
for (RawFile file : rawFiles) {
LocationPath locationPath = new LocationPath(file.path(),
@@ -325,7 +340,8 @@ public class PaimonScanNode extends FileQueryScanNode {
return
FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties());
}
- private boolean supportNativeReader(Optional<List<RawFile>> optRawFiles) {
+ @VisibleForTesting
+ public boolean supportNativeReader(Optional<List<RawFile>> optRawFiles) {
if (!optRawFiles.isPresent()) {
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index a8bb814f1d3..1c6b88b16ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.thrift.TFileAttributes;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.paimon.table.Table;
@@ -34,6 +35,13 @@ public class PaimonSource {
private final Table originTable;
private final TupleDescriptor desc;
+ @VisibleForTesting
+ public PaimonSource() {
+ this.desc = null;
+ this.paimonExtTable = null;
+ this.originTable = null;
+ }
+
public PaimonSource(TupleDescriptor desc) {
this.desc = desc;
this.paimonExtTable = (PaimonExternalTable) desc.getTable();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
new file mode 100644
index 00000000000..4a2d61f2c7d
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class PaimonScanNodeTest {
+
+ @Mocked
+ private SessionVariable sv;
+
+ @Mocked
+ private PaimonFileExternalCatalog paimonFileExternalCatalog;
+
+ @Test
+ public void testSplitWeight() throws UserException {
+
+ TupleDescriptor desc = new TupleDescriptor(new TupleId(3));
+ PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv);
+
+ paimonScanNode.setSource(new PaimonSource());
+
+ DataFileMeta dfm1 = DataFileMeta.forAppend("f1.parquet", 64 * 1024 *
1024, 1, SimpleStats.EMPTY_STATS, 1, 1, 1,
+ Collections.emptyList(), null, null, null, null);
+ BinaryRow binaryRow1 = BinaryRow.singleColumn(1);
+ DataSplit ds1 = DataSplit.builder()
+ .rawConvertible(true)
+ .withPartition(binaryRow1)
+ .withBucket(1)
+ .withBucketPath("b1")
+ .withDataFiles(Collections.singletonList(dfm1))
+ .build();
+
+ DataFileMeta dfm2 = DataFileMeta.forAppend("f2.parquet", 32 * 1024 *
1024, 2, SimpleStats.EMPTY_STATS, 1, 1, 1,
+ Collections.emptyList(), null, null, null, null);
+ BinaryRow binaryRow2 = BinaryRow.singleColumn(1);
+ DataSplit ds2 = DataSplit.builder()
+ .rawConvertible(true)
+ .withPartition(binaryRow2)
+ .withBucket(1)
+ .withBucketPath("b1")
+ .withDataFiles(Collections.singletonList(dfm2))
+ .build();
+
+
+ new MockUp<PaimonScanNode>() {
+ @Mock
+ public List<org.apache.paimon.table.source.Split>
getPaimonSplitFromAPI() {
+ return new ArrayList<org.apache.paimon.table.source.Split>() {{
+ add(ds1);
+ add(ds2);
+ }};
+ }
+ };
+
+ new MockUp<PaimonSource>() {
+ @Mock
+ public ExternalCatalog getCatalog() {
+ return paimonFileExternalCatalog;
+ }
+ };
+
+ new MockUp<ExternalCatalog>() {
+ @Mock
+ public Map<String, String> getProperties() {
+ return Collections.emptyMap();
+ }
+ };
+
+ new Expectations() {{
+ sv.isForceJniScanner();
+ result = false;
+
+ sv.getIgnoreSplitType();
+ result = "NONE";
+ }};
+
+ // native
+ mockNativeReader();
+ List<org.apache.doris.spi.Split> s1 = paimonScanNode.getSplits(1);
+ PaimonSplit s11 = (PaimonSplit) s1.get(0);
+ PaimonSplit s12 = (PaimonSplit) s1.get(1);
+ Assert.assertEquals(2, s1.size());
+ Assert.assertEquals(100, s11.getSplitWeight().getRawValue());
+ Assert.assertNull(s11.getSplit());
+ Assert.assertEquals(50, s12.getSplitWeight().getRawValue());
+ Assert.assertNull(s12.getSplit());
+
+ // jni
+ mockJniReader();
+ List<org.apache.doris.spi.Split> s2 = paimonScanNode.getSplits(1);
+ PaimonSplit s21 = (PaimonSplit) s2.get(0);
+ PaimonSplit s22 = (PaimonSplit) s2.get(1);
+ Assert.assertEquals(2, s2.size());
+ Assert.assertNotNull(s21.getSplit());
+ Assert.assertNotNull(s22.getSplit());
+ Assert.assertEquals(100, s21.getSplitWeight().getRawValue());
+ Assert.assertEquals(50, s22.getSplitWeight().getRawValue());
+ }
+
+ private void mockJniReader() {
+ new MockUp<PaimonScanNode>() {
+ @Mock
+ public boolean supportNativeReader(Optional<List<RawFile>>
optRawFiles) {
+ return false;
+ }
+ };
+ }
+
+ private void mockNativeReader() {
+ new MockUp<PaimonScanNode>() {
+ @Mock
+ public boolean supportNativeReader(Optional<List<RawFile>>
optRawFiles) {
+ return true;
+ }
+ };
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]