This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1cc16bffe37 [improvement](external)add some improvements for external
scan #38946 (#43310)
1cc16bffe37 is described below
commit 1cc16bffe3705d99a33f47908b0f1ab84a9dc864
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Nov 6 16:53:07 2024 +0800
[improvement](external)add some improvements for external scan #38946
(#43310)
cherry pick from #38946
Co-authored-by: wuwenchi <[email protected]>
---
.../org/apache/doris/paimon/PaimonJniScanner.java | 2 +-
.../apache/doris/datasource/ExternalScanNode.java | 3 +-
.../doris/datasource/FederationBackendPolicy.java | 2 +-
.../org/apache/doris/datasource/FileSplit.java | 19 +++++
.../iceberg/source/IcebergDeleteFileFilter.java | 20 +++--
.../datasource/iceberg/source/IcebergScanNode.java | 7 +-
.../datasource/iceberg/source/IcebergSplit.java | 6 ++
.../datasource/paimon/source/PaimonScanNode.java | 25 +++++-
.../datasource/paimon/source/PaimonSplit.java | 22 +++++
.../glue/translator/PhysicalPlanTranslator.java | 3 +-
.../apache/doris/planner/SingleNodePlanner.java | 3 +-
.../java/org/apache/doris/qe/SessionVariable.java | 36 ++++++++
.../src/main/java/org/apache/doris/spi/Split.java | 5 ++
.../doris/planner/FederationBackendPolicyTest.java | 95 ++++++++++++++++++++++
14 files changed, 228 insertions(+), 20 deletions(-)
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index f229134e9d8..7bd9fa631c8 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -125,7 +125,7 @@ public class PaimonJniScanner extends JniScanner {
int[] projected = getProjected();
readBuilder.withProjection(projected);
readBuilder.withFilter(getPredicates());
- reader = readBuilder.newRead().createReader(getSplit());
+ reader =
readBuilder.newRead().executeFilter().createReader(getSplit());
paimonDataTypeList =
Arrays.stream(projected).mapToObj(i ->
table.rowType().getTypeAt(i)).collect(Collectors.toList());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index e85fed8b62a..0d67a9e44b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -46,7 +46,8 @@ public abstract class ExternalScanNode extends ScanNode {
protected boolean needCheckColumnPriv;
protected final FederationBackendPolicy backendPolicy =
(ConnectContext.get() != null
- && ConnectContext.get().getSessionVariable().enableFileCache)
+ && (ConnectContext.get().getSessionVariable().enableFileCache
+ ||
ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan()))
? new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
: new FederationBackendPolicy();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index a2b902fd744..1e1787c1f64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -497,7 +497,7 @@ public class FederationBackendPolicy {
private static class SplitHash implements Funnel<Split> {
@Override
public void funnel(Split split, PrimitiveSink primitiveSink) {
-
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
+
primitiveSink.putBytes(split.getConsistentHashString().getBytes(StandardCharsets.UTF_8));
primitiveSink.putLong(split.getStart());
primitiveSink.putLong(split.getLength());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
index 7eaa87b74aa..1ebb390e904 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
@@ -47,6 +47,9 @@ public class FileSplit implements Split {
// the location type for BE, eg: HDFS, LOCAL, S3
protected TFileType locationType;
+ public Long selfSplitWeight;
+ public Long targetSplitSize;
+
public FileSplit(LocationPath path, long start, long length, long
fileLength,
long modificationTime, String[] hosts, List<String>
partitionValues) {
this.path = path;
@@ -89,4 +92,20 @@ public class FileSplit implements Split {
return new FileSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues);
}
}
+
+ @Override
+ public void setTargetSplitSize(Long targetSplitSize) {
+ this.targetSplitSize = targetSplitSize;
+ }
+
+ @Override
+ public SplitWeight getSplitWeight() {
+ if (selfSplitWeight != null && targetSplitSize != null) {
+ double computedWeight = selfSplitWeight * 1.0 / targetSplitSize;
+ // Clamp the value be between the minimum weight and 1.0 (standard
weight)
+ return
SplitWeight.fromProportion(Math.min(Math.max(computedWeight, 0.01), 1.0));
+ } else {
+ return SplitWeight.standard();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
index 394bc849a56..b876732ff3f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
@@ -25,23 +25,25 @@ import java.util.OptionalLong;
@Data
public class IcebergDeleteFileFilter {
private String deleteFilePath;
+ private long filesize;
- public IcebergDeleteFileFilter(String deleteFilePath) {
+ public IcebergDeleteFileFilter(String deleteFilePath, long filesize) {
this.deleteFilePath = deleteFilePath;
+ this.filesize = filesize;
}
public static PositionDelete createPositionDelete(String deleteFilePath,
Long positionLowerBound,
- Long positionUpperBound)
{
- return new PositionDelete(deleteFilePath, positionLowerBound,
positionUpperBound);
+ Long positionUpperBound,
long filesize) {
+ return new PositionDelete(deleteFilePath, positionLowerBound,
positionUpperBound, filesize);
}
- public static EqualityDelete createEqualityDelete(String deleteFilePath,
List<Integer> fieldIds) {
+ public static EqualityDelete createEqualityDelete(String deleteFilePath,
List<Integer> fieldIds, long fileSize) {
// todo:
// Schema deleteSchema = TypeUtil.select(scan.schema(), new
HashSet<>(fieldIds));
// StructLikeSet deleteSet =
StructLikeSet.create(deleteSchema.asStruct());
// pass deleteSet to BE
// compare two StructLike value, if equals, filtered
- return new EqualityDelete(deleteFilePath, fieldIds);
+ return new EqualityDelete(deleteFilePath, fieldIds, fileSize);
}
static class PositionDelete extends IcebergDeleteFileFilter {
@@ -49,8 +51,8 @@ public class IcebergDeleteFileFilter {
private final Long positionUpperBound;
public PositionDelete(String deleteFilePath, Long positionLowerBound,
- Long positionUpperBound) {
- super(deleteFilePath);
+ Long positionUpperBound, long fileSize) {
+ super(deleteFilePath, fileSize);
this.positionLowerBound = positionLowerBound;
this.positionUpperBound = positionUpperBound;
}
@@ -67,8 +69,8 @@ public class IcebergDeleteFileFilter {
static class EqualityDelete extends IcebergDeleteFileFilter {
private List<Integer> fieldIds;
- public EqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
- super(deleteFilePath);
+ public EqualityDelete(String deleteFilePath, List<Integer> fieldIds,
long fileSize) {
+ super(deleteFilePath, fileSize);
this.fieldIds = fieldIds;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index fe6c54cf53b..56dda7b4fe2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -282,7 +282,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
selectedPartitionNum = partitionPathSet.size();
-
+ splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}
@@ -315,10 +315,11 @@ public class IcebergScanNode extends FileQueryScanNode {
.map(m ->
m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
.map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
- positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L)));
+ positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L),
+ delete.fileSizeInBytes()));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
filters.add(IcebergDeleteFileFilter.createEqualityDelete(
- delete.path().toString(), delete.equalityFieldIds()));
+ delete.path().toString(), delete.equalityFieldIds(),
delete.fileSizeInBytes()));
} else {
throw new IllegalStateException("Unknown delete content: " +
delete.content());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 46e8f96ba35..580d3cf1bb2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -47,6 +47,7 @@ public class IcebergSplit extends FileSplit {
this.formatVersion = formatVersion;
this.config = config;
this.originalPath = originalPath;
+ this.selfSplitWeight = length;
}
public long getRowCount() {
@@ -56,4 +57,9 @@ public class IcebergSplit extends FileSplit {
public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}
+
+ public void setDeleteFileFilters(List<IcebergDeleteFileFilter>
deleteFileFilters) {
+ this.deleteFileFilters = deleteFileFilters;
+ this.selfSplitWeight +=
deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 02f831ba37b..cd477cc9b29 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -28,7 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@@ -101,9 +101,14 @@ public class PaimonScanNode extends FileQueryScanNode {
private int rawFileSplitNum = 0;
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
+ private SessionVariable sessionVariable;
- public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ public PaimonScanNode(PlanNodeId id,
+ TupleDescriptor desc,
+ boolean needCheckColumnPriv,
+ SessionVariable sessionVariable) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
needCheckColumnPriv);
+ this.sessionVariable = sessionVariable;
}
@Override
@@ -176,7 +181,9 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public List<Split> getSplits() throws UserException {
- boolean forceJniScanner =
ConnectContext.get().getSessionVariable().isForceJniScanner();
+ boolean forceJniScanner = sessionVariable.isForceJniScanner();
+ SessionVariable.IgnoreSplitType ignoreSplitType =
+
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot ->
(source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
@@ -196,7 +203,11 @@ public class PaimonScanNode extends FileQueryScanNode {
selectedPartitionValues.add(partitionValue);
Optional<List<RawFile>> optRawFiles =
dataSplit.convertToRawFiles();
Optional<List<DeletionFile>> optDeletionFiles =
dataSplit.deletionFiles();
+
if (supportNativeReader(optRawFiles)) {
+ if (ignoreSplitType ==
SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
+ continue;
+ }
splitStat.setType(SplitReadType.NATIVE);
splitStat.setRawFileConvertable(true);
List<RawFile> rawFiles = optRawFiles.get();
@@ -252,10 +263,16 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
} else {
+ if (ignoreSplitType ==
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
+ continue;
+ }
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
} else {
+ if (ignoreSplitType ==
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
+ continue;
+ }
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
@@ -263,6 +280,8 @@ public class PaimonScanNode extends FileQueryScanNode {
}
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
+ // We should set fileSplitSize at the end because fileSplitSize may be
modified in splitFile.
+ splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index ffd063d77e8..3ab38c7db28 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -23,11 +23,14 @@ import org.apache.doris.datasource.SplitCreator;
import org.apache.doris.datasource.TableFormatType;
import com.google.common.collect.Maps;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.Split;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
public class PaimonSplit extends FileSplit {
private static final LocationPath DUMMY_PATH = new
LocationPath("/dummyPath", Maps.newHashMap());
@@ -35,11 +38,20 @@ public class PaimonSplit extends FileSplit {
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile;
+
public PaimonSplit(Split split) {
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
+
+ if (split instanceof DataSplit) {
+ List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
+ this.path = new LocationPath("/" +
dataFileMetas.get(0).fileName());
+ this.selfSplitWeight =
dataFileMetas.stream().mapToLong(DataFileMeta::fileSize).sum();
+ } else {
+ this.selfSplitWeight = split.rowCount();
+ }
}
private PaimonSplit(LocationPath file, long start, long length, long
fileLength, long modificationTime,
@@ -47,6 +59,15 @@ public class PaimonSplit extends FileSplit {
super(file, start, length, fileLength, modificationTime, hosts,
partitionList);
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
+ this.selfSplitWeight = length;
+ }
+
+ @Override
+ public String getConsistentHashString() {
+ if (this.path == DUMMY_PATH) {
+ return UUID.randomUUID().toString();
+ }
+ return getPathString();
}
public Split getSplit() {
@@ -66,6 +87,7 @@ public class PaimonSplit extends FileSplit {
}
public void setDeletionFile(DeletionFile deletionFile) {
+ this.selfSplitWeight += deletionFile.length();
this.optDeletionFile = Optional.of(deletionFile);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 4f26f6d6383..4896ceb7170 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -615,7 +615,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else if (table instanceof PaimonExternalTable) {
- scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false,
+ ConnectContext.get().getSessionVariable());
} else if (table instanceof TrinoConnectorExternalTable) {
scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 152bb7cc881..d94ad0a2552 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1986,7 +1986,8 @@ public class SingleNodePlanner {
scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
break;
case PAIMON_EXTERNAL_TABLE:
- scanNode = new PaimonScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ scanNode = new PaimonScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true,
+ ConnectContext.get().getSessionVariable());
break;
case TRINO_CONNECTOR_EXTERNAL_TABLE:
scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6376a5a3623..530a6f3042c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -944,6 +944,26 @@ public class SessionVariable implements Serializable,
Writable {
setter = "setPipelineTaskNum")
public int parallelPipelineTaskNum = 0;
+
+ public enum IgnoreSplitType {
+ NONE,
+ IGNORE_JNI,
+ IGNORE_NATIVE
+ }
+
+ public static final String IGNORE_SPLIT_TYPE = "ignore_split_type";
+ @VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE,
+ checker = "checkIgnoreSplitType",
+ options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"},
+ description = {"忽略指定类型的split", "Ignore splits of the specified
type"})
+ public String ignoreSplitType = IgnoreSplitType.NONE.toString();
+
+ public static final String USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN =
"use_consistent_hash_for_external_scan";
+ @VariableMgr.VarAttr(name = USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN,
+ description = {"对外表采用一致性hash的方式做split的分发",
+ "Use consistent hashing to split the appearance for
external scan"})
+ public boolean useConsistentHashForExternalScan = false;
+
@VariableMgr.VarAttr(name = PROFILE_LEVEL, fuzzy = true)
public int profileLevel = 1;
@@ -4331,6 +4351,22 @@ public class SessionVariable implements Serializable,
Writable {
return forceJniScanner;
}
+ public String getIgnoreSplitType() {
+ return ignoreSplitType;
+ }
+
+ public void checkIgnoreSplitType(String value) {
+ try {
+ IgnoreSplitType.valueOf(value);
+ } catch (Exception e) {
+ throw new UnsupportedOperationException("We only support `NONE`,
`IGNORE_JNI` and `IGNORE_NATIVE`");
+ }
+ }
+
+ public boolean getUseConsistentHashForExternalScan() {
+ return useConsistentHashForExternalScan;
+ }
+
public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index e86b287ac93..412e4b6792f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -48,4 +48,9 @@ public interface Split {
void setAlternativeHosts(List<String> alternativeHosts);
+ default String getConsistentHashString() {
+ return getPathString();
+ }
+
+ void setTargetSplitSize(Long targetSplitSize);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index df2e7dd3932..3b3e2eeedf7 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -741,4 +741,99 @@ public class FederationBackendPolicyTest {
return entries1.containsAll(entries2) &&
entries2.containsAll(entries1);
}
+
+ @Test
+ public void testSplitWeight() {
+ FileSplit fileSplit = new FileSplit(new LocationPath("s1"), 0, 1000,
1000, 0, null, Collections.emptyList());
+ fileSplit.setSelfSplitWeight(1000L);
+
+ fileSplit.setTargetSplitSize(10L);
+ Assert.assertEquals(100L, fileSplit.getSplitWeight().getRawValue(),
100L);
+
+ fileSplit.setTargetSplitSize(10000000L);
+ Assert.assertEquals(1L, fileSplit.getSplitWeight().getRawValue());
+
+ fileSplit.setTargetSplitSize(2000L);
+ Assert.assertEquals(50, fileSplit.getSplitWeight().getRawValue());
+ }
+
+ @Test
+ public void testBiggerSplit() throws UserException {
+ SystemInfoService service = new SystemInfoService();
+
+ Backend backend1 = new Backend(1L, "172.30.0.100", 9050);
+ backend1.setAlive(true);
+ service.addBackend(backend1);
+ Backend backend2 = new Backend(2L, "172.30.0.106", 9050);
+ backend2.setAlive(true);
+ service.addBackend(backend2);
+ Backend backend3 = new Backend(3L, "172.30.0.118", 9050);
+ backend3.setAlive(true);
+ service.addBackend(backend3);
+
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return service;
+ }
+ };
+
+ List<Split> splits = new ArrayList<>();
+ splits.add(genFileSplit("s1", 1000000L, 1000L)); // belong 2
+ splits.add(genFileSplit("s2", 100000L, 1000L)); // belong 2
+ splits.add(genFileSplit("s3", 200000L, 1000L)); // belong 2
+ splits.add(genFileSplit("s4", 300000L, 1000L)); // belong 2
+ splits.add(genFileSplit("s5", 800000L, 1000L)); // belong 1
+
+ FederationBackendPolicy policy = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+ // Set these options to ensure that the consistent hash algorithm is
consistent.
+ policy.setEnableSplitsRedistribution(false);
+ Config.split_assigner_min_consistent_hash_candidate_num = 1;
+ policy.init();
+ Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(splits);
+ Map<Backend, List<Split>> backendListMap = mergeAssignment(assignment);
+ backendListMap.forEach((k, v) -> {
+ if (k.getId() == 1) {
+ Assert.assertEquals(800000L,
v.stream().mapToLong(Split::getLength).sum());
+ } else if (k.getId() == 2) {
+ Assert.assertEquals(1600000L,
v.stream().mapToLong(Split::getLength).sum());
+ }
+ });
+
+ Config.split_assigner_min_consistent_hash_candidate_num = 1;
+ FederationBackendPolicy policy2 = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+ policy2.init();
+ Multimap<Backend, Split> assignment2 =
policy2.computeScanRangeAssignment(splits);
+ Map<Backend, List<Split>> backendListMap2 =
mergeAssignment(assignment2);
+ backendListMap2.forEach((k, v) -> {
+ if (k.getId() == 1) {
+ Assert.assertEquals(900000L,
v.stream().mapToLong(Split::getLength).sum());
+ } else if (k.getId() == 2) {
+ Assert.assertEquals(500000L,
v.stream().mapToLong(Split::getLength).sum());
+ } else if (k.getId() == 3) {
+ Assert.assertEquals(1000000L,
v.stream().mapToLong(Split::getLength).sum());
+ }
+ });
+ }
+
+ private Map<Backend, List<Split>> mergeAssignment(Multimap<Backend, Split>
ass) {
+ HashMap<Backend, List<Split>> map = new HashMap<>();
+ ass.forEach((k, v) -> {
+ if (map.containsKey(k)) {
+ map.get(k).add(v);
+ } else {
+ ArrayList<Split> splits = new ArrayList<>();
+ splits.add(v);
+ map.put(k, splits);
+ }
+ });
+ return map;
+ }
+
+ private FileSplit genFileSplit(String path, long length, long targetSplit)
{
+ FileSplit s = new FileSplit(new LocationPath(path), 0, length, length,
0, null, Collections.emptyList());
+ s.setSelfSplitWeight(length);
+ s.setTargetSplitSize(targetSplit);
+ return s;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]