morningman commented on code in PR #27784:
URL: https://github.com/apache/doris/pull/27784#discussion_r1413288385
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
columns = tmpSchema;
}
initPartitionColumns(columns);
+ initBucketingColumns(columns);
return columns;
}
+ private void initBucketingColumns(List<Column> columns) {
+ List<String> bucketCols = new ArrayList<>(5);
+ int numBuckets = getBucketColums(bucketCols);
Review Comment:
```suggestion
int numBuckets = getBucketColumns(bucketCols);
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
columns = tmpSchema;
}
initPartitionColumns(columns);
+ initBucketingColumns(columns);
return columns;
}
+ private void initBucketingColumns(List<Column> columns) {
+ List<String> bucketCols = new ArrayList<>(5);
+ int numBuckets = getBucketColums(bucketCols);
+ if (bucketCols.isEmpty()) {
+ bucketColumns = ImmutableList.of();
+ distributionInfo = new RandomDistributionInfo(1, true);
+ return;
+ }
+
+ int bucketingVersion =
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+ "2"));
+ ImmutableList.Builder<Column> bucketColBuilder =
ImmutableList.builder();
+ for (String colName : bucketCols) {
+ // do not use "getColum()", which will cause dead loop
Review Comment:
```suggestion
// do not use "getColumn()", which will cause dead loop
```
##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java:
##########
@@ -421,4 +428,37 @@ protected TFileCompressType getFileCompressType(FileSplit
fileSplit) throws User
}
return compressType;
}
+
+ @Override
+ public DataPartition constructInputPartitionByDistributionInfo() {
+ if (hmsTable.isBucketedTable()) {
+ DistributionInfo distributionInfo =
hmsTable.getDefaultDistributionInfo();
+ if (!(distributionInfo instanceof HashDistributionInfo)) {
+ return DataPartition.RANDOM;
+ }
+ List<Column> distributeColumns = ((HiveExternalDistributionInfo)
distributionInfo).getDistributionColumns();
+ List<Expr> dataDistributeExprs = Lists.newArrayList();
+ for (Column column : distributeColumns) {
+ SlotRef slotRef = new SlotRef(desc.getRef().getName(),
column.getName());
+ dataDistributeExprs.add(slotRef);
+ }
+ return DataPartition.hashPartitioned(dataDistributeExprs,
THashType.SPARK_MURMUR32);
+ }
+
+ return DataPartition.RANDOM;
+ }
+
+ public HMSExternalTable getHiveTable() {
+ return hmsTable;
+ }
+
+ @Override
+ public THashType getHashType() {
+ if (hmsTable.isBucketedTable()
Review Comment:
Need to check if this table is created by Spark?
##########
fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java:
##########
@@ -76,10 +83,15 @@ public DataPartition(TPartitionType type) {
Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type
== TPartitionType.RANDOM);
this.type = type;
this.partitionExprs = ImmutableList.of();
+ this.hashType = THashType.CRC32;
+ }
+
+ public static DataPartition hashPartitioned(List<Expr> exprs, THashType
hashType) {
+ return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs,
hashType);
}
public static DataPartition hashPartitioned(List<Expr> exprs) {
- return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
+ return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs,
THashType.XXHASH64);
Review Comment:
Why using `THashType.XXHASH64` for this method?
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
columns = tmpSchema;
}
initPartitionColumns(columns);
+ initBucketingColumns(columns);
return columns;
}
+ private void initBucketingColumns(List<Column> columns) {
+ List<String> bucketCols = new ArrayList<>(5);
+ int numBuckets = getBucketColums(bucketCols);
+ if (bucketCols.isEmpty()) {
+ bucketColumns = ImmutableList.of();
+ distributionInfo = new RandomDistributionInfo(1, true);
+ return;
+ }
+
+ int bucketingVersion =
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+ "2"));
+ ImmutableList.Builder<Column> bucketColBuilder =
ImmutableList.builder();
+ for (String colName : bucketCols) {
+ // do not use "getColum()", which will cause dead loop
+ for (Column column : columns) {
+ if (colName.equals(column.getName())) {
+ // For partition column, if it is string type, change it
to varchar(65535)
+ // to be same as doris managed table.
+ // This is to avoid some unexpected behavior such as
different partition pruning result
+ // between doris managed table and external table.
+ if (column.getType().getPrimitiveType() ==
PrimitiveType.STRING) {
+
column.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH));
+ }
+ bucketColBuilder.add(column);
+ break;
+ }
+ }
+ }
+
+ bucketColumns = bucketColBuilder.build();
+ distributionInfo = new HiveExternalDistributionInfo(numBuckets,
bucketColumns, bucketingVersion);
+ LOG.debug("get {} bucket columns for table: {}", bucketColumns.size(),
name);
+ }
+
+ private int getBucketColums(List<String> bucketCols) {
+ StorageDescriptor descriptor = remoteTable.getSd();
+ int numBuckets = -1;
+ if (descriptor.isSetBucketCols() &&
!descriptor.getBucketCols().isEmpty()) {
+ /* Hive Bucketed Table */
+ bucketCols.addAll(descriptor.getBucketCols());
+ numBuckets = descriptor.getNumBuckets();
+ } else if (remoteTable.isSetParameters()
+ && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES,
remoteTable.getParameters().keySet())) {
+ Map<String, String> parameters = remoteTable.getParameters();
+ for (String key : SUPPORTED_BUCKET_PROPERTIES) {
+ if (parameters.containsKey(key)) {
+ switch (key) {
+ case SPARK_BUCKET + "0":
+ bucketCols.add(0, parameters.get(key));
+ break;
+ case SPARK_BUCKET + "1":
+ bucketCols.add(1, parameters.get(key));
+ break;
+ case SPARK_BUCKET + "2":
+ bucketCols.add(2, parameters.get(key));
+ break;
+ case SPARK_BUCKET + "3":
+ bucketCols.add(3, parameters.get(key));
+ break;
+ case SPARK_BUCKET + "4":
Review Comment:
The spark can have at most 5 bucket columns?
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
columns = tmpSchema;
}
initPartitionColumns(columns);
+ initBucketingColumns(columns);
return columns;
}
+ private void initBucketingColumns(List<Column> columns) {
+ List<String> bucketCols = new ArrayList<>(5);
+ int numBuckets = getBucketColums(bucketCols);
+ if (bucketCols.isEmpty()) {
+ bucketColumns = ImmutableList.of();
+ distributionInfo = new RandomDistributionInfo(1, true);
+ return;
+ }
+
+ int bucketingVersion =
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+ "2"));
+ ImmutableList.Builder<Column> bucketColBuilder =
ImmutableList.builder();
+ for (String colName : bucketCols) {
+ // do not use "getColum()", which will cause dead loop
+ for (Column column : columns) {
+ if (colName.equals(column.getName())) {
Review Comment:
```suggestion
if (colName.equalsIgnoreCase(column.getName())) {
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
columns = tmpSchema;
}
initPartitionColumns(columns);
+ initBucketingColumns(columns);
return columns;
}
+ private void initBucketingColumns(List<Column> columns) {
+ List<String> bucketCols = new ArrayList<>(5);
+ int numBuckets = getBucketColums(bucketCols);
+ if (bucketCols.isEmpty()) {
+ bucketColumns = ImmutableList.of();
+ distributionInfo = new RandomDistributionInfo(1, true);
+ return;
+ }
+
+ int bucketingVersion =
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+ "2"));
+ ImmutableList.Builder<Column> bucketColBuilder =
ImmutableList.builder();
+ for (String colName : bucketCols) {
+ // do not use "getColum()", which will cause dead loop
+ for (Column column : columns) {
+ if (colName.equals(column.getName())) {
+ // For partition column, if it is string type, change it
to varchar(65535)
Review Comment:
```suggestion
// For partition/bucket column, if it is string type,
change it to varchar(65535)
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -715,14 +835,23 @@ public long getDataSize(boolean singleReplica) {
@Override
public boolean isDistributionColumn(String columnName) {
- return
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
- .collect(Collectors.toSet()).contains(columnName.toLowerCase());
+ Set<String> distributeColumns = getDistributionColumnNames()
+ .stream().map(String::toLowerCase).collect(Collectors.toSet());
+ return distributeColumns.contains(columnName.toLowerCase());
}
@Override
public Set<String> getDistributionColumnNames() {
- return
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
- .collect(Collectors.toSet());
+ Set<String> distributionColumnNames = Sets.newHashSet();
+ if (distributionInfo instanceof RandomDistributionInfo) {
+ return distributionColumnNames;
+ }
+ HashDistributionInfo hashDistributionInfo = (HashDistributionInfo)
distributionInfo;
+ List<Column> partitionColumns =
hashDistributionInfo.getDistributionColumns();
Review Comment:
```suggestion
List<Column> distColumns =
hashDistributionInfo.getDistributionColumns();
```
##########
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java:
##########
@@ -626,17 +639,83 @@ private boolean canBucketShuffleJoin(HashJoinNode node,
PlanFragment leftChildFr
leftRoot = leftRoot.getChild(0);
}
if (leftRoot instanceof OlapScanNode) {
- return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+ return canBucketShuffleJoin(node, (OlapScanNode) leftRoot,
rhsHashExprs);
+ } else if (leftRoot instanceof HiveScanNode) {
+ return canBucketShuffleJoin(node, (HiveScanNode) leftRoot,
rhsHashExprs, hashType);
}
}
return false;
}
+ private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode
leftScanNode,
+ List<Expr> rhsJoinExprs,
Ref<THashType> hashType) {
+ HMSExternalTable leftTable = leftScanNode.getHiveTable();
+
+ DistributionInfo leftDistribution =
leftTable.getDefaultDistributionInfo();
+ if (leftDistribution == null || !(leftDistribution instanceof
HiveExternalDistributionInfo)) {
+ return false;
+ }
+
+ HiveExternalDistributionInfo hiveDistributionInfo =
(HiveExternalDistributionInfo) leftDistribution;
+
+ List<Column> leftDistributeColumns =
hiveDistributionInfo.getDistributionColumns();
+ List<String> leftDistributeColumnNames = leftDistributeColumns.stream()
+ .map(col -> leftTable.getName() + "." +
col.getName().toLowerCase()).collect(Collectors.toList());
+
+ List<String> leftJoinColumnNames = new ArrayList<>();
+ List<Expr> rightExprs = new ArrayList<>();
+ List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
+
+ for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
+ Expr lhsJoinExpr = eqJoinPredicate.getChild(0);
+ Expr rhsJoinExpr = eqJoinPredicate.getChild(1);
+ if (lhsJoinExpr.unwrapSlotRef() == null ||
rhsJoinExpr.unwrapSlotRef() == null) {
+ continue;
+ }
+
+ SlotRef leftSlot =
node.getChild(0).findSrcSlotRef(lhsJoinExpr.unwrapSlotRef());
+ if (leftSlot.getTable() instanceof HMSExternalTable
+ &&
leftScanNode.desc.getSlots().contains(leftSlot.getDesc())) {
+ // table name in SlotRef is not the really name. `select *
from test as t`
+ // table name in SlotRef is `t`, but here we need is `test`.
+ leftJoinColumnNames.add(leftSlot.getTable().getName() + "."
+ + leftSlot.getColumnName().toLowerCase());
+ rightExprs.add(rhsJoinExpr);
+ }
+ }
+
+ //2 the join columns should contains all left table distribute columns
to enable bucket shuffle join
+ for (int i = 0; i < leftDistributeColumnNames.size(); i++) {
+ String distributeColumnName = leftDistributeColumnNames.get(i);
+ boolean findRhsExprs = false;
+ // check the join column name is same as distribute column name and
+ // check the rhs join expr type is same as distribute column
+ for (int j = 0; j < leftJoinColumnNames.size(); j++) {
+ if (leftJoinColumnNames.get(j).equals(distributeColumnName)) {
+ // varchar and string type don't need to check the length
property
+ if ((rightExprs.get(j).getType().isVarcharOrStringType()
+ &&
leftDistributeColumns.get(i).getType().isVarcharOrStringType())
+ ||
(rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType()))) {
+ rhsJoinExprs.add(rightExprs.get(j));
+ findRhsExprs = true;
+ break;
+ }
+ }
+ }
+
+ if (!findRhsExprs) {
+ return false;
+ }
+ }
+
+ hashType.value = THashType.SPARK_MURMUR32;
Review Comment:
Need to check if this table is created by Spark?
##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -2216,8 +2218,13 @@ private void computeScanRangeAssignment() throws
Exception {
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode,
assignedBytesPerHost, replicaNumPerHost);
}
if (fragmentContainsBucketShuffleJoin) {
-
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode)
scanNode,
- idToBackend, addressToBackendID, replicaNumPerHost);
+ if (scanNode instanceof OlapScanNode) {
+
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode)
scanNode,
+ idToBackend, addressToBackendID,
replicaNumPerHost);
+ } else {
Review Comment:
else if (scanNode instanceof HiveScanNode)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]