morrySnow commented on code in PR #40225:
URL: https://github.com/apache/doris/pull/40225#discussion_r1751559686
##########
fe/fe-core/pom.xml:
##########
@@ -453,6 +454,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.aliyun.odps</groupId>
+ <artifactId>odps-sdk-table-api</artifactId>
+ <version>0.48.8-public</version>
Review Comment:
better to use a property here to ensure odps deps always with same version
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java:
##########
@@ -75,11 +99,290 @@ private void setScanParams(TFileRangeDesc rangeDesc,
MaxComputeSplit maxComputeS
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
- if (maxComputeSplit.getPartitionSpec().isPresent()) {
-
fileDesc.setPartitionSpec(maxComputeSplit.getPartitionSpec().get());
- }
+ fileDesc.setPartitionSpec("deprecated");
+ fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
+ fileDesc.setSessionId(maxComputeSplit.getSessionId());
tableFormatFileDesc.setMaxComputeParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " +
maxComputeSplit.getLength() + " ]");
+ rangeDesc.setStartOffset(maxComputeSplit.getStart());
+ rangeDesc.setSize(maxComputeSplit.getLength());
+ }
+
+ void createTableBatchReadSession() throws UserException {
+ Predicate filterPredicate = convertPredicate();
+
+
+ List<String> requiredPartitionColumns = new ArrayList<>();
+ List<String> orderedRequiredDataColumns = new ArrayList<>();
+
+ Set<String> requiredSlots =
+ desc.getSlots().stream().map(e ->
e.getColumn().getName()).collect(Collectors.toSet());
+
+ Set<String> partitionColumns =
+
table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet());
+
+ for (Column column : table.getColumns()) {
+ String columnName = column.getName();
+ if (!requiredSlots.contains(columnName)) {
+ continue;
+ }
+ if (partitionColumns.contains(columnName)) {
+ requiredPartitionColumns.add(columnName);
+ } else {
+ orderedRequiredDataColumns.add(columnName);
+ }
+ }
+
+
+
+ MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog)
table.getCatalog();
+
+ try {
+ TableReadSessionBuilder scanBuilder = new
TableReadSessionBuilder();
+ tableBatchReadSession =
+
scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName()))
+ .withSettings(mcCatalog.getSettings())
+ .withSplitOptions(mcCatalog.getSplitOption())
+ .requiredPartitionColumns(requiredPartitionColumns)
+ .requiredDataColumns(orderedRequiredDataColumns)
+ .withArrowOptions(
+ ArrowOptions.newBuilder()
+
.withDatetimeUnit(TimestampUnit.MILLI)
+
.withTimestampUnit(TimestampUnit.NANO)
+ .build()
+ )
+ .withFilterPredicate(filterPredicate)
+ .buildBatchReadSession();
+ } catch (java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ protected Predicate convertPredicate() {
+ if (conjuncts.isEmpty()) {
+ return Predicate.NO_PREDICATE;
+ }
+
+ if (conjuncts.size() == 1) {
+ try {
+ return convertExprToOdpsPredicate(conjuncts.get(0));
+ } catch (AnalysisException e) {
+ Log.info("Failed to convert predicate " + conjuncts.get(0) + "
to odps predicate");
+ Log.info("Reason: " + e.getMessage());
+ return Predicate.NO_PREDICATE;
+ }
+ }
+
+ com.aliyun.odps.table.optimizer.predicate.CompoundPredicate
+ filterPredicate = new
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(
+
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND
+ );
+
+ for (Expr predicate : conjuncts) {
+ try {
+
filterPredicate.addPredicate(convertExprToOdpsPredicate(predicate));
+ } catch (AnalysisException e) {
+ Log.info("Failed to convert predicate " + predicate);
+ Log.info("Reason: " + e.getMessage());
+ return Predicate.NO_PREDICATE;
+ }
+ }
+ return filterPredicate;
+ }
+
+ private Predicate convertExprToOdpsPredicate(Expr expr) throws
AnalysisException {
+ Predicate odpsPredicate = null;
+ if (expr instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+
+
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator odpsOp;
+ switch (compoundPredicate.getOp()) {
+ case AND:
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND;
+ break;
+ case OR:
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.OR;
+ break;
+ case NOT:
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.NOT;
+ break;
+ default:
+ throw new AnalysisException("Unknown operator: " +
compoundPredicate.getOp());
+ }
+
+ List<Predicate> odpsPredicates = new ArrayList<>();
+
+ odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(0)));
+
+ if (compoundPredicate.getOp() != Operator.NOT) {
+
odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(1)));
+ }
+ odpsPredicate = new
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(odpsOp,
odpsPredicates);
+
+ } else if (expr instanceof InPredicate) {
+
+ InPredicate inPredicate = (InPredicate) expr;
+ if (inPredicate.getChildren().size() > 2) {
+ return Predicate.NO_PREDICATE;
+ }
+ com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator
odpsOp =
+ inPredicate.isNotIn()
+ ?
com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.IN
+ :
com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.NOT_IN;
+
+ String columnName = convertSlotRefToColumnName(expr.getChild(0));
+ com.aliyun.odps.OdpsType odpsType =
table.getColumnNameToOdpsColumn().get(columnName).getType();
+
+ StringBuilder stringBuilder = new StringBuilder();
+
+
+ stringBuilder.append(columnName);
+ stringBuilder.append(" ");
+ stringBuilder.append(odpsOp.getDescription());
+ stringBuilder.append(" (");
+
+ for (int i = 1; i < inPredicate.getChildren().size(); i++) {
+ stringBuilder.append(convertLiteralToOdpsValues(odpsType,
expr.getChild(i)));
+ if (i < inPredicate.getChildren().size() - 1) {
+ stringBuilder.append(", ");
+ }
+ }
+ stringBuilder.append(" )");
+
+ odpsPredicate = new
com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
+
+ } else if (expr instanceof BinaryPredicate) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
+
+
+ com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator
odpsOp;
+ switch (binaryPredicate.getOp()) {
+ case EQ: {
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.EQUALS;
+ break;
+ }
+ case NE: {
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.NOT_EQUALS;
+ break;
+ }
+ case GE: {
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN_OR_EQUAL;
+ break;
+ }
+ case LE: {
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN_OR_EQUAL;
+ break;
+ }
+ case LT: {
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN;
+ break;
+ }
+ case GT: {
+ odpsOp =
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN;
+ break;
+ }
+ default: {
+ odpsOp = null;
+ break;
+ }
+ }
+
+ if (odpsOp != null) {
+ String columnName =
convertSlotRefToColumnName(expr.getChild(0));
+ com.aliyun.odps.OdpsType odpsType =
table.getColumnNameToOdpsColumn().get(columnName).getType();
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(columnName);
+ stringBuilder.append(" ");
+ stringBuilder.append(odpsOp.getDescription());
+ stringBuilder.append(" ");
+ stringBuilder.append(convertLiteralToOdpsValues(odpsType,
expr.getChild(1)));
+
+ odpsPredicate = new
com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
+ }
+ } else if (expr instanceof IsNullPredicate) {
+ IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
+ com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator
odpsOp =
+ isNullPredicate.isNotNull()
+ ?
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.NOT_NULL
+ :
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.IS_NULL;
+
+ odpsPredicate = new
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate(odpsOp,
+ new com.aliyun.odps.table.optimizer.predicate.Attribute(
+ convertSlotRefToColumnName(expr.getChild(0))
+ )
+ );
+ }
+
+
+ if (odpsPredicate == null) {
+ throw new AnalysisException("Do not support convert ["
+ + expr.getExprName() + "] in convertExprToOdpsPredicate.");
+ }
+ return odpsPredicate;
+ }
+
+ private String convertSlotRefToColumnName(Expr expr) throws
AnalysisException {
+ if (expr instanceof SlotRef) {
+ return ((SlotRef) expr).getColumnName();
+ } else if (expr instanceof CastExpr) {
+ if (expr.getChild(0) instanceof SlotRef) {
+ return ((SlotRef) expr.getChild(0)).getColumnName();
+ }
+ }
+
+ throw new AnalysisException("Do not support convert ["
+ + expr.getExprName() + "] in convertSlotRefToAttribute.");
+
+
+ }
+
+ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr)
throws AnalysisException {
+ if (!(expr instanceof LiteralExpr)) {
+ throw new AnalysisException("Do not support convert ["
+ + expr.getExprName() + "] in convertSlotRefToAttribute.");
+ }
+ LiteralExpr literalExpr = (LiteralExpr) expr;
+
+ literalExpr.toString();
Review Comment:
why call this here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]