This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 677b3424e3 [feature](Load)(step1)support nereids load, add load
grammar (#23485)
677b3424e3 is described below
commit 677b3424e3b24793a8cd2af54db9e1cc2f8b933a
Author: slothever <[email protected]>
AuthorDate: Wed Sep 20 21:12:23 2023 +0800
[feature](Load)(step1)support nereids load, add load grammar (#23485)
support nereids load grammar.
we will convert the broker load stmt to insert into clause:
1. rename broker load to bulk load.
2. add load grammar to nereids optimizer.
3. convert to insert into clause with table value function.
https://github.com/apache/doris/issues/24221
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 86 ++-
.../java/org/apache/doris/common/FeConstants.java | 2 +
.../doris/nereids/parser/LogicalPlanBuilder.java | 89 ++-
.../apache/doris/nereids/trees/plans/PlanType.java | 5 +-
.../nereids/trees/plans/commands/LoadCommand.java | 495 ++++++++++++++++
.../plans/commands/info/BulkLoadDataDesc.java | 333 +++++++++++
.../trees/plans/commands/info/BulkStorageDesc.java | 123 ++++
.../trees/plans/visitor/CommandVisitor.java | 5 +
.../ExternalFileTableValuedFunction.java | 12 +-
.../doris/analysis/BulkLoadDataDescTest.java | 658 +++++++++++++++++++++
.../apache/doris/analysis/S3TvfLoadStmtTest.java | 36 +-
11 files changed, 1818 insertions(+), 26 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 335576660f..b0a11bcb28 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -63,6 +63,19 @@ statement
(PARTITION partition=identifierList)?
(USING relation (COMMA relation)*)
whereClause #delete
+ | LOAD LABEL lableName=identifier
+ LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+ (withRemoteStorageSystem)?
+ (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+ (commentSpec)? #load
+ | LOAD LABEL lableName=identifier
+ LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+ resourceDesc
+ (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+ (commentSpec)?
#resourceLoad
+ | LOAD mysqlDataDesc
+ (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+ (commentSpec)?
#mysqlLoad
| EXPORT TABLE tableName=multipartIdentifier
(PARTITION partition=identifierList)?
(whereClause)?
@@ -71,7 +84,29 @@ statement
(withRemoteStorageSystem)? #export
;
-
+dataDesc
+ : ((WITH)? mergeType)? DATA INFILE LEFT_PAREN filePaths+=STRING_LITERAL
(COMMA filePath+=STRING_LITERAL)* RIGHT_PAREN
+ INTO TABLE tableName=multipartIdentifier
+ (PARTITION partition=identifierList)?
+ (COLUMNS TERMINATED BY comma=STRING_LITERAL)?
+ (LINES TERMINATED BY separator=STRING_LITERAL)?
+ (FORMAT AS format=identifier)?
+ (columns=identifierList)?
+ (columnsFromPath=colFromPath)?
+ (columnMapping=colMappingList)?
+ (preFilter=preFilterClause)?
+ (where=whereClause)?
+ (deleteOn=deleteOnClause)?
+ (sequenceColumn=sequenceColClause)?
+ (propertyClause)?
+ | ((WITH)? mergeType)? DATA FROM TABLE tableName=multipartIdentifier
+ INTO TABLE tableName=multipartIdentifier
+ (PARTITION partition=identifierList)?
+ (columnMapping=colMappingList)?
+ (where=whereClause)?
+ (deleteOn=deleteOnClause)?
+ (propertyClause)?
+ ;
// -----------------Command accessories-----------------
@@ -101,6 +136,36 @@ planType
| ALL // default type
;
+mergeType
+ : APPEND
+ | DELETE
+ | MERGE
+ ;
+
+preFilterClause
+ : PRECEDING FILTER expression
+ ;
+
+deleteOnClause
+ : DELETE ON expression
+ ;
+
+sequenceColClause
+ : ORDER BY identifier
+ ;
+
+colFromPath
+ : COLUMNS FROM PATH AS identifierList
+ ;
+
+colMappingList
+ : SET LEFT_PAREN mappingSet+=mappingExpr (COMMA mappingSet+=mappingExpr)*
RIGHT_PAREN
+ ;
+
+mappingExpr
+ : (mappingCol=identifier EQ expression)
+ ;
+
withRemoteStorageSystem
: WITH S3 LEFT_PAREN
brokerProperties=propertyItemList
@@ -117,6 +182,25 @@ withRemoteStorageSystem
RIGHT_PAREN)?
;
+resourceDesc
+ : WITH RESOURCE resourceName=identifierOrText (LEFT_PAREN propertyItemList
RIGHT_PAREN)?
+ ;
+
+mysqlDataDesc
+ : DATA (LOCAL booleanValue)?
+ INFILE filePath=STRING_LITERAL
+ INTO TABLE tableName=multipartIdentifier
+ (PARTITION partition=identifierList)?
+ (COLUMNS TERMINATED BY comma=STRING_LITERAL)?
+ (LINES TERMINATED BY separator=STRING_LITERAL)?
+ (skipLines)?
+ (columns=identifierList)?
+ (colMappingList)?
+ (propertyClause)?
+ ;
+
+skipLines : IGNORE lines=INTEGER_VALUE LINES | IGNORE lines=INTEGER_VALUE ROWS
;
+
// -----------------Query-----------------
// add queryOrganization for parse (q1) union (q2) union (q3) order by keys,
otherwise 'order' will be recognized to be
// identifier.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 0845d593e2..ecd0c2f4fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -49,6 +49,8 @@ public class FeConstants {
// set to true to skip some step when running FE unit test
public static boolean runningUnitTest = false;
+ // use to set some mocked values for FE unit test
+ public static Object unitTestConstant = null;
// set to false to disable internal schema db
public static boolean enableInternalSchemaDb = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index a9b50c8bf1..75d7c788c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -20,13 +20,13 @@ package org.apache.doris.nereids.parser;
import org.apache.doris.analysis.ArithmeticExpr.Operator;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend;
-import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.DorisParser;
import org.apache.doris.nereids.DorisParser.AggClauseContext;
import org.apache.doris.nereids.DorisParser.AliasQueryContext;
@@ -291,7 +291,10 @@ import
org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DefaultValue;
@@ -330,6 +333,7 @@ import org.apache.doris.nereids.types.StructField;
import org.apache.doris.nereids.types.StructType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.policy.FilterType;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.qe.ConnectContext;
@@ -351,6 +355,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -464,6 +469,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
@Override
public LogicalPlan visitExport(ExportContext ctx) {
+ // TODO: replace old class name like ExportStmt, BrokerDesc, Expr with
new nereid class name
List<String> tableName = visitMultipartIdentifier(ctx.tableName);
List<String> partitions = ctx.partition == null ? ImmutableList.of() :
visitIdentifierList(ctx.partition);
@@ -515,7 +521,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
} else if (ctx.HDFS() != null) {
brokerDesc = new BrokerDesc("HDFS",
StorageBackend.StorageType.HDFS, brokerPropertiesMap);
} else if (ctx.LOCAL() != null) {
- brokerDesc = new BrokerDesc("HDFS", StorageType.LOCAL,
brokerPropertiesMap);
+ brokerDesc = new BrokerDesc("HDFS",
StorageBackend.StorageType.LOCAL, brokerPropertiesMap);
} else if (ctx.BROKER() != null) {
brokerDesc = new BrokerDesc(visitIdentifierOrText(ctx.brokerName),
brokerPropertiesMap);
}
@@ -541,6 +547,85 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return logicalPlans;
}
+ /**
+ * Visit load-statements.
+ */
+ @Override
+ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {
+
+ BulkStorageDesc bulkDesc = null;
+ if (ctx.withRemoteStorageSystem() != null) {
+ Map<String, String> bulkProperties =
+ new
HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties));
+ if (ctx.withRemoteStorageSystem().S3() != null) {
+ bulkDesc = new BulkStorageDesc("S3",
BulkStorageDesc.StorageType.S3, bulkProperties);
+ } else if (ctx.withRemoteStorageSystem().HDFS() != null) {
+ bulkDesc = new BulkStorageDesc("HDFS",
BulkStorageDesc.StorageType.HDFS, bulkProperties);
+ } else if (ctx.withRemoteStorageSystem().LOCAL() != null) {
+ bulkDesc = new BulkStorageDesc("LOCAL_HDFS",
BulkStorageDesc.StorageType.LOCAL, bulkProperties);
+ } else if (ctx.withRemoteStorageSystem().BROKER() != null
+ &&
ctx.withRemoteStorageSystem().identifierOrText().getText() != null) {
+ bulkDesc = new
BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(),
+ bulkProperties);
+ }
+ }
+ ImmutableList.Builder<BulkLoadDataDesc> dataDescriptions = new
ImmutableList.Builder<>();
+ for (DorisParser.DataDescContext ddc : ctx.dataDescs) {
+ List<String> tableName =
RelationUtil.getQualifierName(ConnectContext.get(),
+ visitMultipartIdentifier(ddc.tableName));
+ List<String> colNames = (ddc.columns == null ? ImmutableList.of()
: visitIdentifierList(ddc.columns));
+ List<String> columnsFromPath = (ddc.columnsFromPath == null ?
ImmutableList.of()
+ :
visitIdentifierList(ddc.columnsFromPath.identifierList()));
+ List<String> partitions = ddc.partition == null ?
ImmutableList.of() : visitIdentifierList(ddc.partition);
+ // TODO: multi location
+ List<String> multiFilePaths = new ArrayList<>();
+ for (Token filePath : ddc.filePaths) {
+ multiFilePaths.add(filePath.getText().substring(1,
filePath.getText().length() - 1));
+ }
+ List<String> filePaths = ddc.filePath == null ? ImmutableList.of()
: multiFilePaths;
+ Map<String, Expression> colMappings;
+ if (ddc.columnMapping == null) {
+ colMappings = ImmutableMap.of();
+ } else {
+ colMappings = new HashMap<>();
+ for (DorisParser.MappingExprContext mappingExpr :
ddc.columnMapping.mappingSet) {
+ colMappings.put(mappingExpr.mappingCol.getText(),
getExpression(mappingExpr.expression()));
+ }
+ }
+
+ LoadTask.MergeType mergeType = ddc.mergeType() == null ?
LoadTask.MergeType.APPEND
+ :
LoadTask.MergeType.valueOf(ddc.mergeType().getText());
+
+ Optional<String> fileFormat = ddc.format == null ?
Optional.empty() : Optional.of(ddc.format.getText());
+ Optional<String> separator = ddc.separator == null ?
Optional.empty() : Optional.of(ddc.separator.getText()
+ .substring(1, ddc.separator.getText().length() - 1));
+ Optional<String> comma = ddc.comma == null ? Optional.empty() :
Optional.of(ddc.comma.getText()
+ .substring(1, ddc.comma.getText().length() - 1));
+ Map<String, String> dataProperties = ddc.propertyClause() == null
? new HashMap<>()
+ : visitPropertyClause(ddc.propertyClause());
+ dataDescriptions.add(new BulkLoadDataDesc(
+ tableName,
+ partitions,
+ filePaths,
+ colNames,
+ columnsFromPath,
+ colMappings,
+ new BulkLoadDataDesc.FileFormatDesc(separator, comma,
fileFormat),
+ false,
+ ddc.preFilter == null ? Optional.empty() :
Optional.of(getExpression(ddc.preFilter.expression())),
+ ddc.where == null ? Optional.empty() :
Optional.of(getExpression(ddc.where.booleanExpression())),
+ mergeType,
+ ddc.deleteOn == null ? Optional.empty() :
Optional.of(getExpression(ddc.deleteOn.expression())),
+ ddc.sequenceColumn == null ? Optional.empty()
+ :
Optional.of(ddc.sequenceColumn.identifier().getText()), dataProperties));
+ }
+ String labelName = ctx.lableName.getText();
+ Map<String, String> properties = visitPropertyItemList(ctx.properties);
+ String commentSpec = ctx.commentSpec() == null ? "" :
ctx.commentSpec().STRING_LITERAL().getText();
+ String comment = escapeBackSlash(commentSpec.substring(1,
commentSpec.length() - 1));
+ return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc,
properties, comment);
+ }
+
/*
********************************************************************************************
* Plan parsing
*
********************************************************************************************
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 31c3641620..c1ed61ce1c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -119,8 +119,9 @@ public enum PlanType {
CREATE_TABLE_COMMAND,
DELETE_COMMAND,
EXPLAIN_COMMAND,
+ EXPORT_COMMAND,
INSERT_INTO_TABLE_COMMAND,
+ LOAD_COMMAND,
SELECT_INTO_OUTFILE_COMMAND,
- UPDATE_COMMAND,
- EXPORT_COMMAND
+ UPDATE_COMMAND
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
new file mode 100644
index 0000000000..f02c5bd2d8
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -0,0 +1,495 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * load OLAP table data from external bulk file
+ */
+public class LoadCommand extends Command implements ForwardWithSync {
+
+ public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+
+ private final String labelName;
+ private final BulkStorageDesc bulkStorageDesc;
+ private final List<BulkLoadDataDesc> sourceInfos;
+ private final Map<String, String> properties;
+ private final String comment;
+ private final List<LogicalPlan> plans = new ArrayList<>();
+ private Profile profile;
+
+ /**
+ * constructor of ExportCommand
+ */
+ public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos,
BulkStorageDesc bulkStorageDesc,
+ Map<String, String> properties, String comment) {
+ super(PlanType.LOAD_COMMAND);
+ this.labelName = Objects.requireNonNull(labelName.trim(), "labelName
should not null");
+ this.sourceInfos =
Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should
not null");
+ this.properties =
Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not
null");
+ this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc,
"bulkStorageDesc should not null");
+ this.comment = Objects.requireNonNull(comment, "comment should not
null");
+ }
+
+ /**
+ * for test print
+ *
+ * @param ctx context
+ * @return parsed insert into plan
+ */
+ @VisibleForTesting
+ public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws
AnalysisException {
+ List<LogicalPlan> plans = new ArrayList<>();
+ for (BulkLoadDataDesc dataDesc : sourceInfos) {
+ plans.add(completeQueryPlan(ctx, dataDesc));
+ }
+ return plans;
+ }
+
+ @Override
+ public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ // TODO: begin txn form multi insert sql
+ /* this.profile = new Profile("Query",
ctx.getSessionVariable().enableProfile);
+ profile.getSummaryProfile().setQueryBeginTime();
+ for (BulkLoadDataDesc dataDesc : sourceInfos) {
+ plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx,
dataDesc), Optional.of(labelName), false));
+ }
+ profile.getSummaryProfile().setQueryPlanFinishTime();
+ * executeInsertStmtPlan(ctx, executor, plans); */
+ throw new AnalysisException("Fallback to legacy planner temporary.");
+ }
+
+ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc
dataDesc)
+ throws AnalysisException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("nereids load stmt before conversion: {}",
dataDesc::toSql);
+ }
+ // 1. build source projects plan (select col1,col2... from tvf where
prefilter)
+ Map<String, String> tvfProperties = getTvfProperties(dataDesc,
bulkStorageDesc);
+ LogicalPlan tvfLogicalPlan = new
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties));
+ tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties,
tvfLogicalPlan);
+
+ if (!(tvfLogicalPlan instanceof LogicalProject)) {
+ throw new AnalysisException("Fail to build TVF query, TVF query
should be LogicalProject");
+ }
+ List<NamedExpression> tvfProjects = ((LogicalProject<?>)
tvfLogicalPlan).getProjects();
+ // tvfProjects may be '*' or 'col1,col2,...'
+ if (tvfProjects.isEmpty()) {
+ throw new AnalysisException("Fail to build TVF query, parsed TVF
select list requires not null");
+ }
+ boolean scanAllTvfCol = (tvfProjects.get(0) instanceof UnboundStar);
+
+ OlapTable olapTable = getOlapTable(ctx, dataDesc);
+ List<Column> olapSchema = olapTable.getBaseSchema();
+ // map column index to mapping expr
+ Map<String, Expression> mappingExpressions =
dataDesc.getColumnMappings();
+ // 2. build sink where
+ Set<Expression> conjuncts = new HashSet<>();
+ if (dataDesc.getWhereExpr().isPresent()) {
+ Set<Expression> whereParts =
ExpressionUtils.extractConjunctionToSet(dataDesc.getWhereExpr().get());
+ for (Expression wherePart : whereParts) {
+ if (!(wherePart instanceof ComparisonPredicate)) {
+ throw new AnalysisException("WHERE clause must be
comparison expression");
+ }
+ ComparisonPredicate comparison = ((ComparisonPredicate)
wherePart);
+ if (!(comparison.left() instanceof UnboundSlot)) {
+ throw new AnalysisException("Invalid predicate column " +
comparison.left().toSql());
+ }
+ conjuncts.add(comparison.rewriteUp(e -> {
+ if (!(e instanceof UnboundSlot)) {
+ return e;
+ }
+ UnboundSlot slot = (UnboundSlot) e;
+ String colName = getUnquotedName(slot);
+ return mappingExpressions.getOrDefault(colName, e);
+ }));
+ }
+ }
+
+ if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties)
&& !conjuncts.isEmpty()) {
+ throw new AnalysisException("Required property 'csv_schema' for
csv file, "
+ + "when no column list specified and use WHERE");
+ }
+ tvfLogicalPlan = new LogicalFilter<>(conjuncts, tvfLogicalPlan);
+
+ // 3. build sink project
+ List<String> sinkCols = new ArrayList<>();
+ List<NamedExpression> selectLists = new ArrayList<>();
+ List<String> olapColumns =
olapSchema.stream().map(Column::getDisplayName).collect(Collectors.toList());
+ if (!scanAllTvfCol) {
+ int numSinkCol = Math.min(tvfProjects.size(), olapColumns.size());
+ // if not scan all tvf column, try to treat each tvfColumn as
olapColumn
+ for (int i = 0; i < numSinkCol; i++) {
+ UnboundSlot sourceCol = (UnboundSlot) tvfProjects.get(i);
+ // check sourceCol is slot and check olapColumn beyond index.
+ String olapColumn = olapColumns.get(i);
+ fillSinkBySourceCols(mappingExpressions, olapColumn,
+ sourceCol, sinkCols, selectLists);
+ }
+ fillDeleteOnColumn(dataDesc, olapTable, sinkCols, selectLists,
Column.DELETE_SIGN);
+ } else {
+ for (String olapColumn : olapColumns) {
+ if (olapColumn.equalsIgnoreCase(Column.VERSION_COL)
+ || olapColumn.equalsIgnoreCase(Column.SEQUENCE_COL)) {
+ continue;
+ }
+ if (olapColumn.equalsIgnoreCase(Column.DELETE_SIGN)) {
+ fillDeleteOnColumn(dataDesc, olapTable, sinkCols,
selectLists, olapColumn);
+ continue;
+ }
+ fillSinkBySourceCols(mappingExpressions, olapColumn, new
UnboundSlot(olapColumn),
+ sinkCols, selectLists);
+ }
+ }
+ if (sinkCols.isEmpty() && selectLists.isEmpty()) {
+ // build 'insert into tgt_tbl select * from src_tbl'
+ selectLists.add(new UnboundStar(new ArrayList<>()));
+ }
+ for (String columnFromPath : dataDesc.getColumnsFromPath()) {
+ sinkCols.add(columnFromPath);
+ // columnFromPath will be parsed by BE, put columns as placeholder.
+ selectLists.add(new UnboundSlot(columnFromPath));
+ }
+
+ tvfLogicalPlan = new LogicalProject<>(selectLists, tvfLogicalPlan);
+ checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists);
+ boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
+ && sinkCols.size() < olapTable.getColumns().size();
+ return new UnboundOlapTableSink<>(dataDesc.getNameParts(), sinkCols,
ImmutableList.of(),
+ dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan);
+ }
+
+ private static void fillDeleteOnColumn(BulkLoadDataDesc dataDesc,
OlapTable olapTable,
+ List<String> sinkCols,
+ List<NamedExpression> selectLists,
+ String olapColumn) throws
AnalysisException {
+ if (olapTable.hasDeleteSign() &&
dataDesc.getDeleteCondition().isPresent()) {
+ checkDeleteOnConditions(dataDesc.getMergeType(),
dataDesc.getDeleteCondition().get());
+ Optional<If> deleteIf = createDeleteOnIfCall(olapTable,
olapColumn, dataDesc);
+ if (deleteIf.isPresent()) {
+ sinkCols.add(olapColumn);
+ selectLists.add(new UnboundAlias(deleteIf.get(), olapColumn));
+ }
+ sinkCols.add(olapColumn);
+ }
+ }
+
+ /**
+ * use to get unquoted column name
+ * @return unquoted slot name
+ */
+ public static String getUnquotedName(NamedExpression slot) {
+ if (slot instanceof UnboundAlias) {
+ return slot.getName();
+ } else if (slot instanceof UnboundSlot) {
+ List<String> slotNameParts = ((UnboundSlot) slot).getNameParts();
+ return slotNameParts.get(slotNameParts.size() - 1);
+ }
+ return slot.getName();
+ }
+
+ private static void fillSinkBySourceCols(Map<String, Expression>
mappingExpressions,
+ String olapColumn, UnboundSlot
tvfColumn,
+ List<String> sinkCols,
List<NamedExpression> selectLists) {
+ sinkCols.add(olapColumn);
+ if (mappingExpressions.containsKey(olapColumn)) {
+ selectLists.add(new
UnboundAlias(mappingExpressions.get(olapColumn), olapColumn));
+ } else {
+ selectLists.add(new UnboundAlias(tvfColumn, olapColumn));
+ }
+ }
+
+ private static boolean isCsvType(Map<String, String> tvfProperties) {
+ return
tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv");
+ }
+
+ /**
+ * fill all column that need to be loaded to sinkCols.
+ * fill the map with sink columns and generated source columns.
+ * sink columns use for 'INSERT INTO'
+ * generated source columns use for 'SELECT'
+ *
+ * @param dataDesc dataDesc
+ * @param tvfProperties generated tvfProperties
+ * @param tvfLogicalPlan source tvf relation
+ */
+ private static LogicalPlan buildTvfQueryPlan(BulkLoadDataDesc dataDesc,
+ Map<String, String>
tvfProperties,
+ LogicalPlan tvfLogicalPlan)
throws AnalysisException {
+ // build tvf column filter
+ if (dataDesc.getPrecedingFilterExpr().isPresent()) {
+ Set<Expression> preConjuncts =
+
ExpressionUtils.extractConjunctionToSet(dataDesc.getPrecedingFilterExpr().get());
+ if (!preConjuncts.isEmpty()) {
+ tvfLogicalPlan = new LogicalFilter<>(preConjuncts,
tvfLogicalPlan);
+ }
+ }
+
+ Map<String, String> sourceProperties = dataDesc.getProperties();
+ if (dataDesc.getFileFieldNames().isEmpty() &&
isCsvType(tvfProperties)) {
+ String csvSchemaStr =
sourceProperties.get(ExternalFileTableValuedFunction.CSV_SCHEMA);
+ if (csvSchemaStr != null) {
+ tvfProperties.put(ExternalFileTableValuedFunction.CSV_SCHEMA,
csvSchemaStr);
+ List<Column> csvSchema = new ArrayList<>();
+ ExternalFileTableValuedFunction.parseCsvSchema(csvSchema,
sourceProperties);
+ List<NamedExpression> csvColumns = new ArrayList<>();
+ for (Column csvColumn : csvSchema) {
+ csvColumns.add(new UnboundSlot(csvColumn.getName()));
+ }
+ if (!csvColumns.isEmpty()) {
+ for (String columnFromPath :
dataDesc.getColumnsFromPath()) {
+ csvColumns.add(new UnboundSlot(columnFromPath));
+ }
+ return new LogicalProject<>(csvColumns, tvfLogicalPlan);
+ }
+ if (!dataDesc.getPrecedingFilterExpr().isPresent()) {
+ throw new AnalysisException("Required property
'csv_schema' for csv file, "
+ + "when no column list specified and use PRECEDING
FILTER");
+ }
+ }
+ return getStarProjectPlan(tvfLogicalPlan);
+ }
+ List<NamedExpression> dataDescColumns = new ArrayList<>();
+ for (int i = 0; i < dataDesc.getFileFieldNames().size(); i++) {
+ String sourceColumn = dataDesc.getFileFieldNames().get(i);
+ dataDescColumns.add(new UnboundSlot(sourceColumn));
+ }
+ if (dataDescColumns.isEmpty()) {
+ return getStarProjectPlan(tvfLogicalPlan);
+ } else {
+ return new LogicalProject<>(dataDescColumns, tvfLogicalPlan);
+ }
+ }
+
+ private static LogicalProject<LogicalPlan> getStarProjectPlan(LogicalPlan
logicalPlan) {
+ return new LogicalProject<>(ImmutableList.of(new UnboundStar(new
ArrayList<>())), logicalPlan);
+ }
+
+ private static Optional<If> createDeleteOnIfCall(OlapTable olapTable,
String olapColName,
+ BulkLoadDataDesc
dataDesc) throws AnalysisException {
+ if (olapTable.hasDeleteSign()
+ && dataDesc.getDeleteCondition().isPresent()) {
+ if (!(dataDesc.getDeleteCondition().get() instanceof
ComparisonPredicate)) {
+ throw new AnalysisException("DELETE ON clause must be
comparison expression.");
+ }
+ ComparisonPredicate deleteOn = (ComparisonPredicate)
dataDesc.getDeleteCondition().get();
+ Expression deleteOnCol = deleteOn.left();
+ if (!(deleteOnCol instanceof UnboundSlot)) {
+ throw new AnalysisException("DELETE ON column must be an
undecorated OLAP column.");
+ }
+ if (!olapColName.equalsIgnoreCase(getUnquotedName((UnboundSlot)
deleteOnCol))) {
+ return Optional.empty();
+ }
+ If deleteIf = new If(deleteOn, new TinyIntLiteral((byte) 1), new
TinyIntLiteral((byte) 0));
+ return Optional.of(deleteIf);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private static void checkDeleteOnConditions(LoadTask.MergeType mergeType,
Expression deleteCondition)
+ throws AnalysisException {
+ if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
+ throw new
AnalysisException(BulkLoadDataDesc.EXPECT_MERGE_DELETE_ON);
+ }
+ if (mergeType == LoadTask.MergeType.MERGE && deleteCondition == null) {
+ throw new AnalysisException(BulkLoadDataDesc.EXPECT_DELETE_ON);
+ }
+ }
+
+ private static void checkAndAddSequenceCol(OlapTable olapTable,
BulkLoadDataDesc dataDesc,
+ List<String> sinkCols,
List<NamedExpression> selectLists)
+ throws AnalysisException {
+ Optional<String> optSequenceCol = dataDesc.getSequenceCol();
+ if (!optSequenceCol.isPresent() && !olapTable.hasSequenceCol()) {
+ return;
+ }
+ // check olapTable schema and sequenceCol
+ if (olapTable.hasSequenceCol() && !optSequenceCol.isPresent()) {
+ throw new AnalysisException("Table " + olapTable.getName()
+ + " has sequence column, need to specify the sequence
column");
+ }
+ if (optSequenceCol.isPresent() && !olapTable.hasSequenceCol()) {
+ throw new AnalysisException("There is no sequence column in the
table " + olapTable.getName());
+ }
+ String sequenceCol = dataDesc.getSequenceCol().get();
+ // check source sequence column is in parsedColumnExprList or Table
base schema
+ boolean hasSourceSequenceCol = false;
+ if (!sinkCols.isEmpty()) {
+ List<String> allCols = new
ArrayList<>(dataDesc.getFileFieldNames());
+ allCols.addAll(sinkCols);
+ for (String sinkCol : allCols) {
+ if (sinkCol.equals(sequenceCol)) {
+ hasSourceSequenceCol = true;
+ break;
+ }
+ }
+ }
+ List<Column> columns = olapTable.getBaseSchema();
+ for (Column column : columns) {
+ if (column.getName().equals(sequenceCol)) {
+ hasSourceSequenceCol = true;
+ break;
+ }
+ }
+ if (!hasSourceSequenceCol) {
+ throw new AnalysisException("There is no sequence column " +
sequenceCol + " in the " + olapTable.getName()
+ + " or the COLUMNS and SET clause");
+ } else {
+ sinkCols.add(Column.SEQUENCE_COL);
+ selectLists.add(new UnboundAlias(new UnboundSlot(sequenceCol),
Column.SEQUENCE_COL));
+ }
+ }
+
+ private UnboundTVFRelation getUnboundTVFRelation(Map<String, String>
properties) {
+ UnboundTVFRelation relation;
+ if (bulkStorageDesc.getStorageType() ==
BulkStorageDesc.StorageType.S3) {
+ relation = new
UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
+ S3TableValuedFunction.NAME, new Properties(properties));
+ } else if (bulkStorageDesc.getStorageType() ==
BulkStorageDesc.StorageType.HDFS) {
+ relation = new
UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
+ HdfsTableValuedFunction.NAME, new Properties(properties));
+ } else {
+ throw new UnsupportedOperationException("Unsupported load storage
type: "
+ + bulkStorageDesc.getStorageType());
+ }
+ return relation;
+ }
+
+ private static OlapTable getOlapTable(ConnectContext ctx, BulkLoadDataDesc
dataDesc) throws AnalysisException {
+ OlapTable targetTable;
+ TableIf table = RelationUtil.getTable(dataDesc.getNameParts(),
ctx.getEnv());
+ if (!(table instanceof OlapTable)) {
+ throw new AnalysisException("table must be olapTable in load
command");
+ }
+ targetTable = ((OlapTable) table);
+ return targetTable;
+ }
+
+ private static Map<String, String> getTvfProperties(BulkLoadDataDesc
dataDesc, BulkStorageDesc bulkStorageDesc) {
+ Map<String, String> tvfProperties = new
HashMap<>(bulkStorageDesc.getProperties());
+ String fileFormat =
dataDesc.getFormatDesc().getFileFormat().orElse("csv");
+ if ("csv".equalsIgnoreCase(fileFormat)) {
+ dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep ->
+
tvfProperties.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR,
sep.getSeparator()));
+ dataDesc.getFormatDesc().getLineDelimiter().ifPresent(sep ->
+
tvfProperties.put(ExternalFileTableValuedFunction.LINE_DELIMITER,
sep.getSeparator()));
+ }
+ // TODO: resolve and put ExternalFileTableValuedFunction params
+ tvfProperties.put(ExternalFileTableValuedFunction.FORMAT, fileFormat);
+
+ List<String> filePaths = dataDesc.getFilePaths();
+ // TODO: support multi location by union
+ String listFilePath = filePaths.get(0);
+ if (bulkStorageDesc.getStorageType() ==
BulkStorageDesc.StorageType.S3) {
+ S3Properties.convertToStdProperties(tvfProperties);
+
tvfProperties.keySet().removeIf(S3Properties.Env.FS_KEYS::contains);
+ // TODO: check file path by s3 fs list status
+ tvfProperties.put(S3TableValuedFunction.S3_URI, listFilePath);
+ }
+
+ final Map<String, String> dataDescProps = dataDesc.getProperties();
+ if (dataDescProps != null) {
+ tvfProperties.putAll(dataDescProps);
+ }
+ List<String> columnsFromPath = dataDesc.getColumnsFromPath();
+ if (columnsFromPath != null && !columnsFromPath.isEmpty()) {
+
tvfProperties.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS,
+ String.join(",", columnsFromPath));
+ }
+ return tvfProperties;
+ }
+
+ private void executeInsertStmtPlan(ConnectContext ctx, StmtExecutor
executor, List<InsertIntoTableCommand> plans) {
+ try {
+ for (LogicalPlan logicalPlan : plans) {
+ ((Command) logicalPlan).run(ctx, executor);
+ }
+ } catch (QueryStateException e) {
+ ctx.setState(e.getQueryState());
+ throw new NereidsException("Command process failed", new
AnalysisException(e.getMessage(), e));
+ } catch (UserException e) {
+ // Return message to info client what happened.
+ ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+ throw new NereidsException("Command process failed", new
AnalysisException(e.getMessage(), e));
+ } catch (Exception e) {
+ // Maybe our bug
+ ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getMessage());
+ throw new NereidsException("Command process failed.", new
AnalysisException(e.getMessage(), e));
+ }
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLoadCommand(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java
new file mode 100644
index 0000000000..e0365ca098
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.analysis.Separator;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * used to describe data info which is needed to import.
+ * The transform of columns should be added after the keyword named COLUMNS.
+ * The transform after the keyword named SET is the old ways which only
supports the hadoop function.
+ * It old way of transform will be removed gradually.
+ * data_desc:
+ * DATA INFILE ('file_path', ...)
+ * [NEGATIVE]
+ * INTO TABLE tbl_name
+ * [PARTITION (p1, p2)]
+ * [COLUMNS TERMINATED BY separator]
+ * [FORMAT AS format]
+ * [(tmp_col1, tmp_col2, col3, ...)]
+ * [COLUMNS FROM PATH AS (col1, ...)]
+ * [SET (k1=f1(xx), k2=f2(xxx))]
+ * [where_clause]
+ * DATA FROM TABLE external_hive_tbl_name
+ * [NEGATIVE]
+ * INTO TABLE tbl_name
+ * [PARTITION (p1, p2)]
+ * [SET (k1=f1(xx), k2=f2(xxx))]
+ * [where_clause]
+ */
+public class BulkLoadDataDesc {
+
+ public static final String EXPECT_MERGE_DELETE_ON = "not support DELETE ON
clause when merge type is not MERGE.";
+ public static final String EXPECT_DELETE_ON = "Excepted DELETE ON clause
when merge type is MERGE.";
+ private static final Logger LOG =
LogManager.getLogger(BulkLoadDataDesc.class);
+
+ private final List<String> nameParts;
+ private final String tableName;
+ private final String dbName;
+ private final List<String> partitionNames;
+ private final List<String> filePaths;
+ private final boolean isNegative;
+ // column names in the path
+ private final List<String> columnsFromPath;
+ // save column mapping in SET(xxx = xxx) clause
+ private final Map<String, Expression> columnMappings;
+ private final Optional<Expression> precedingFilterExpr;
+ private final Optional<Expression> whereExpr;
+ private final LoadTask.MergeType mergeType;
+ private final String srcTableName;
+ // column names of source files
+ private final List<String> fileFieldNames;
+ private final Optional<String> sequenceCol;
+ private final FileFormatDesc formatDesc;
+ // Merged from fileFieldNames, columnsFromPath and columnMappingList
+ // ImportColumnDesc: column name to (expr or null)
+ private final Optional<Expression> deleteCondition;
+ private final Map<String, String> dataProperties;
+ private boolean isMysqlLoad = false;
+
+ /**
+ * bulk load desc
+ */
+ public BulkLoadDataDesc(List<String> fullTableName,
+ List<String> partitionNames,
+ List<String> filePaths,
+ List<String> columns,
+ List<String> columnsFromPath,
+ Map<String, Expression> columnMappings,
+ FileFormatDesc formatDesc,
+ boolean isNegative,
+ Optional<Expression> fileFilterExpr,
+ Optional<Expression> whereExpr,
+ LoadTask.MergeType mergeType,
+ Optional<Expression> deleteCondition,
+ Optional<String> sequenceColName,
+ Map<String, String> dataProperties) {
+ this.nameParts = Objects.requireNonNull(fullTableName, "nameParts
should not null");
+ this.dbName = Objects.requireNonNull(fullTableName.get(1), "dbName
should not null");
+ this.tableName = Objects.requireNonNull(fullTableName.get(2),
"tableName should not null");
+ this.partitionNames = Objects.requireNonNull(partitionNames,
"partitionNames should not null");
+ this.filePaths = Objects.requireNonNull(filePaths, "filePaths should
not null");
+ this.formatDesc = Objects.requireNonNull(formatDesc, "formatDesc
should not null");
+ this.fileFieldNames =
columnsNameToLowerCase(Objects.requireNonNull(columns, "columns should not
null"));
+ this.columnsFromPath = columnsNameToLowerCase(columnsFromPath);
+ this.isNegative = isNegative;
+ this.columnMappings = columnMappings;
+ this.precedingFilterExpr = fileFilterExpr;
+ this.whereExpr = whereExpr;
+ this.mergeType = mergeType;
+ // maybe from tvf or table
+ this.srcTableName = null;
+ this.deleteCondition = deleteCondition;
+ this.sequenceCol = sequenceColName;
+ this.dataProperties = dataProperties;
+ }
+
+ /**
+ * bulk load file format desc
+ */
+ public static class FileFormatDesc {
+ private final Separator lineDelimiter;
+ private final Separator columnSeparator;
+ private final String fileFormat;
+
+ public FileFormatDesc(Optional<String> fileFormat) {
+ this(Optional.empty(), Optional.empty(), fileFormat);
+ }
+
+ public FileFormatDesc(Optional<String> lineDelimiter, Optional<String>
columnSeparator) {
+ this(lineDelimiter, columnSeparator, Optional.empty());
+ }
+
+ /**
+ * build bulk load format desc and check valid
+ * @param lineDelimiter text format line delimiter
+ * @param columnSeparator text format column separator
+ * @param fileFormat file format
+ */
+ public FileFormatDesc(Optional<String> lineDelimiter, Optional<String>
columnSeparator,
+ Optional<String> fileFormat) {
+ this.lineDelimiter = new Separator(lineDelimiter.orElse(null));
+ this.columnSeparator = new Separator(columnSeparator.orElse(null));
+ try {
+ if
(!StringUtils.isEmpty(this.lineDelimiter.getOriSeparator())) {
+ this.lineDelimiter.analyze();
+ }
+ if
(!StringUtils.isEmpty(this.columnSeparator.getOriSeparator())) {
+ this.columnSeparator.analyze();
+ }
+ } catch (AnalysisException e) {
+ throw new RuntimeException("Fail to parse separator. ", e);
+ }
+ this.fileFormat = fileFormat.orElse(null);
+ }
+
+ public Optional<Separator> getLineDelimiter() {
+ if (lineDelimiter == null || lineDelimiter.getOriSeparator() ==
null) {
+ return Optional.empty();
+ }
+ return Optional.of(lineDelimiter);
+ }
+
+ public Optional<Separator> getColumnSeparator() {
+ if (columnSeparator == null || columnSeparator.getOriSeparator()
== null) {
+ return Optional.empty();
+ }
+ return Optional.of(columnSeparator);
+ }
+
+ public Optional<String> getFileFormat() {
+ return Optional.ofNullable(fileFormat);
+ }
+ }
+
+ public List<String> getNameParts() {
+ return nameParts;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public List<String> getPartitionNames() {
+ return partitionNames;
+ }
+
+ public FileFormatDesc getFormatDesc() {
+ return formatDesc;
+ }
+
+ public List<String> getFilePaths() {
+ return filePaths;
+ }
+
+ public List<String> getColumnsFromPath() {
+ return columnsFromPath;
+ }
+
+ public Map<String, Expression> getColumnMappings() {
+ return columnMappings;
+ }
+
+ public Optional<Expression> getPrecedingFilterExpr() {
+ return precedingFilterExpr;
+ }
+
+ public Optional<Expression> getWhereExpr() {
+ return whereExpr;
+ }
+
+ public List<String> getFileFieldNames() {
+ return fileFieldNames;
+ }
+
+ public Optional<String> getSequenceCol() {
+ if (sequenceCol.isPresent() && StringUtils.isBlank(sequenceCol.get()))
{
+ return Optional.empty();
+ }
+ return sequenceCol;
+ }
+
+ public Optional<Expression> getDeleteCondition() {
+ return deleteCondition;
+ }
+
+ public LoadTask.MergeType getMergeType() {
+ return mergeType;
+ }
+
+ public Map<String, String> getProperties() {
+ return dataProperties;
+ }
+
+ // Change all the columns name to lower case, because Doris column is
case-insensitive.
+ private List<String> columnsNameToLowerCase(List<String> columns) {
+ if (columns == null || columns.isEmpty() ||
"json".equals(this.formatDesc.fileFormat)) {
+ return columns;
+ }
+ List<String> lowerCaseColumns = new ArrayList<>();
+ for (int i = 0; i < columns.size(); i++) {
+ String column = columns.get(i);
+ lowerCaseColumns.add(i, column.toLowerCase());
+ }
+ return lowerCaseColumns;
+ }
+
+ @Override
+ public String toString() {
+ return toSql();
+ }
+
+ /**
+ * print data desc load info
+ * @return bulk load sql
+ */
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ if (isMysqlLoad) {
+ sb.append("DATA ").append(isClientLocal() ? "LOCAL " : "");
+ sb.append("INFILE '").append(filePaths.get(0)).append("'");
+ } else if (isLoadFromTable()) {
+ sb.append(mergeType.toString());
+ sb.append(" DATA FROM TABLE ").append(srcTableName);
+ } else {
+ sb.append(mergeType.toString());
+ sb.append(" DATA INFILE (");
+ Joiner.on(", ").appendTo(sb, filePaths.stream()
+ .map(s -> "'" + s +
"'").collect(Collectors.toList())).append(")");
+ }
+ if (isNegative) {
+ sb.append(" NEGATIVE");
+ }
+ sb.append(" INTO TABLE ");
+ sb.append(isMysqlLoad ? ClusterNamespace.getNameFromFullName(dbName) +
"." + tableName : tableName);
+ if (partitionNames != null && !partitionNames.isEmpty()) {
+ sb.append(" (");
+ Joiner.on(", ").appendTo(sb, partitionNames).append(")");
+ }
+ if (formatDesc.columnSeparator != null) {
+ sb.append(" COLUMNS TERMINATED BY
").append(formatDesc.columnSeparator.toSql());
+ }
+ if (formatDesc.lineDelimiter != null && isMysqlLoad) {
+ sb.append(" LINES TERMINATED BY
").append(formatDesc.lineDelimiter.toSql());
+ }
+ if (formatDesc.fileFormat != null && !formatDesc.fileFormat.isEmpty())
{
+ sb.append(" FORMAT AS '" + formatDesc.fileFormat + "'");
+ }
+ if (fileFieldNames != null && !fileFieldNames.isEmpty()) {
+ sb.append(" (");
+ Joiner.on(", ").appendTo(sb, fileFieldNames).append(")");
+ }
+ if (columnsFromPath != null && !columnsFromPath.isEmpty()) {
+ sb.append(" COLUMNS FROM PATH AS (");
+ Joiner.on(", ").appendTo(sb, columnsFromPath).append(")");
+ }
+ if (columnMappings != null && !columnMappings.isEmpty()) {
+ sb.append(" SET (");
+ Joiner.on(", ").appendTo(sb, columnMappings.entrySet().stream()
+ .map(e -> e.getKey() + "=" +
e.getValue().toSql()).collect(Collectors.toList())).append(")");
+ }
+ whereExpr.ifPresent(e -> sb.append(" WHERE ").append(e.toSql()));
+ deleteCondition.ifPresent(e -> {
+ if (mergeType == LoadTask.MergeType.MERGE) {
+ sb.append(" DELETE ON ").append(e.toSql());
+ }
+ });
+ return sb.toString();
+ }
+
+ private boolean isLoadFromTable() {
+ return false;
+ }
+
+ private boolean isClientLocal() {
+ return false;
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java
new file mode 100644
index 0000000000..38543c069d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.property.S3ClientBEProperties;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Broker descriptor
+ * Broker example:
+ * WITH S3/HDFS
+ * (
+ * "username" = "user0",
+ * "password" = "password0"
+ * )
+ */
+public class BulkStorageDesc implements Writable {
+ @SerializedName(value = "storageType")
+ protected StorageType storageType;
+ @SerializedName(value = "properties")
+ protected Map<String, String> properties;
+ @SerializedName(value = "name")
+ private String name;
+
+ /**
+ * Bulk Storage Type
+ */
+ public enum StorageType {
+ BROKER,
+ S3,
+ HDFS,
+ LOCAL;
+
+ }
+
+ /**
+ * BulkStorageDesc
+ * @param name bulk load name
+ * @param properties properties
+ */
+ public BulkStorageDesc(String name, Map<String, String> properties) {
+ this(name, StorageType.BROKER, properties);
+ }
+
+ /**
+ * BulkStorageDesc
+ * @param name bulk load name
+ * @param type bulk load type
+ * @param properties properties
+ */
+ public BulkStorageDesc(String name, StorageType type, Map<String, String>
properties) {
+ this.name = name;
+ this.properties = properties;
+ if (this.properties == null) {
+ this.properties = Maps.newHashMap();
+ }
+ this.storageType = type;
+
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
+ }
+
+ public StorageType getStorageType() {
+ return storageType;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static BulkStorageDesc read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, BulkStorageDesc.class);
+ }
+
+ /**
+ * bulk load to sql string
+ * @return bulk load sql
+ */
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ if (storageType == StorageType.BROKER) {
+ sb.append("WITH BROKER ").append(name);
+ } else {
+ sb.append("WITH ").append(storageType.name());
+ }
+ if (properties != null && !properties.isEmpty()) {
+ PrintableMap<String, String> printableMap = new
PrintableMap<>(properties, " = ", true, false, true);
+ sb.append(" (").append(printableMap).append(")");
+ }
+ return sb.toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index e91e846e2f..c055da5735 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -24,6 +24,7 @@ import
org.apache.doris.nereids.trees.plans.commands.DeleteCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
/** CommandVisitor. */
@@ -52,6 +53,10 @@ public interface CommandVisitor<R, C> {
return visitCommand(deleteCommand, context);
}
+ default R visitLoadCommand(LoadCommand loadCommand, C context) {
+ return visitCommand(loadCommand, context);
+ }
+
default R visitExportCommand(ExportCommand exportCommand, C context) {
return visitCommand(exportCommand, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 4eeadffd09..6acc276a25 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -103,7 +103,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
protected static final String FUZZY_PARSE = "fuzzy_parse";
protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
protected static final String SKIP_LINES = "skip_lines";
- protected static final String CSV_SCHEMA = "csv_schema";
+ public static final String CSV_SCHEMA = "csv_schema";
protected static final String COMPRESS_TYPE = "compress_type";
public static final String PATH_PARTITION_KEYS = "path_partition_keys";
// decimal(p,s)
@@ -380,12 +380,16 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
@Override
public List<Column> getTableColumns() throws AnalysisException {
- if (FeConstants.runningUnitTest) {
- return Lists.newArrayList();
- }
if (!csvSchema.isEmpty()) {
return csvSchema;
}
+ if (FeConstants.runningUnitTest) {
+ Object mockedUtObj = FeConstants.unitTestConstant;
+ if (mockedUtObj instanceof List) {
+ return ((List<Column>) mockedUtObj);
+ }
+ return new ArrayList<>();
+ }
if (this.columns != null) {
return columns;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java
new file mode 100644
index 0000000000..e5a3e66932
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java
@@ -0,0 +1,658 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.stats.ExpressionEstimation;
+import org.apache.doris.nereids.trees.expressions.Add;
+import org.apache.doris.nereids.trees.expressions.BinaryArithmetic;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.Statistics;
+import org.apache.doris.utframe.TestWithFeService;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class BulkLoadDataDescTest extends TestWithFeService {
+
+ private List<String> sinkCols1 = new ArrayList<>();
+ private List<String> sinkCols2 = new ArrayList<>();
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ connectContext.getState().setNereids(true);
+ connectContext.getSessionVariable().enableFallbackToOriginalPlanner =
false;
+ connectContext.getSessionVariable().enableNereidsTimeout = false;
+ connectContext.getSessionVariable().enableNereidsDML = true;
+ FeConstants.runningUnitTest = true;
+
+ createDatabase("nereids_load");
+ useDatabase("nereids_load");
+ String createTableSql = "CREATE TABLE `customer` (\n"
+ + " `custkey` int(11) NOT NULL,\n"
+ + " `c_name` varchar(25) NOT NULL,\n"
+ + " `c_address` varchar(40) NOT NULL,\n"
+ + " `c_nationkey` int(11) NOT NULL,\n"
+ + " `c_phone` varchar(15) NOT NULL,\n"
+ + " `c_acctbal` DECIMAL(15, 2) NOT NULL,\n"
+ + " `c_mktsegment` varchar(10) NOT NULL,\n"
+ + " `c_comment` varchar(117) NOT NULL\n"
+ + ") ENGINE=OLAP\n"
+ + "UNIQUE KEY(`custkey`)\n"
+ + "COMMENT 'OLAP'\n"
+ + "DISTRIBUTED BY HASH(`custkey`) BUCKETS 24\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ + "\"function_column.sequence_col\" = \"c_nationkey\","
+ + "\"storage_format\" = \"V2\"\n"
+ + ");";
+ createTable(createTableSql);
+ sinkCols1.add("custkey");
+ sinkCols1.add("c_name");
+ sinkCols1.add("c_address");
+ sinkCols1.add("c_nationkey");
+ sinkCols1.add("c_phone");
+ sinkCols1.add("c_acctbal");
+ sinkCols1.add("c_mktsegment");
+ sinkCols1.add("c_comment");
+
+ String createTableSql2 = "CREATE TABLE `customer_dup` (\n"
+ + " `custkey` int(11) NOT NULL,\n"
+ + " `c_name` varchar(25) NOT NULL,\n"
+ + " `address` varchar(40) NOT NULL,\n"
+ + " `c_nationkey` int(11) NOT NULL,\n"
+ + " `c_phone` varchar(15) NOT NULL,\n"
+ + " `c_acctbal` DECIMAL(15, 2) NOT NULL,\n"
+ + " `c_mktsegment` varchar(10) NOT NULL,\n"
+ + " `c_comment` varchar(117) NOT NULL\n"
+ + ") ENGINE=OLAP\n"
+ + "DUPLICATE KEY(`custkey`,`c_name`)\n"
+ + "COMMENT 'OLAP'\n"
+ + "DISTRIBUTED BY HASH(`custkey`) BUCKETS 24\n"
+ + "PROPERTIES (\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ + "\"storage_format\" = \"V2\"\n"
+ + ");";
+ createTable(createTableSql2);
+ sinkCols2.add("custkey");
+ sinkCols2.add("c_name");
+ sinkCols2.add("address");
+ sinkCols2.add("c_nationkey");
+ sinkCols2.add("c_phone");
+ sinkCols2.add("c_acctbal");
+ sinkCols2.add("c_mktsegment");
+ sinkCols2.add("c_comment");
+
+ }
+
+ @Test
+ public void testParseLoadStmt() throws Exception {
+ String loadSql1 = "LOAD LABEL customer_j23( "
+ + " DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " COLUMNS TERMINATED BY \"|\""
+ + " LINES TERMINATED BY \"\n\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment) "
+ + " SET ( custkey=case when c_custkey=-8 then -3 when
c_custkey=-1 then 11 else c_custkey end ) "
+ + " PRECEDING FILTER c_nationkey=\"CHINA\" "
+ + " WHERE custkey > 100"
+ + " ORDER BY c_custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+
+ List<Pair<LogicalPlan, StatementContext>> statements = new
NereidsParser().parseMultiple(loadSql1);
+ Assertions.assertFalse(statements.isEmpty());
+
+ List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+ expectedSinkColumns.add(Column.SEQUENCE_COL);
+
+ CaseWhen caseWhen = new CaseWhen(new ArrayList<WhenClause>() {
+ {
+ add(new WhenClause(
+ new EqualTo(
+ new UnboundSlot("c_custkey"),
+ new TinyIntLiteral((byte) -8)),
+ new TinyIntLiteral((byte) -3)));
+ add(new WhenClause(
+ new EqualTo(
+ new UnboundSlot("c_custkey"),
+ new TinyIntLiteral((byte) -1)),
+ new TinyIntLiteral((byte) 11)));
+ }
+ }, new UnboundSlot("c_custkey"));
+
+ List<NamedExpression> expectedProjects = new
ArrayList<NamedExpression>() {
+ {
+ add(new UnboundAlias(caseWhen, "custkey"));
+ add(new UnboundAlias(new UnboundSlot("c_address"),
"c_address"));
+ add(new UnboundAlias(new UnboundSlot("c_nationkey"),
"c_nationkey"));
+ add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+ add(new UnboundAlias(new UnboundSlot("c_acctbal"),
"c_acctbal"));
+ add(new UnboundAlias(new UnboundSlot("c_mktsegment"),
"c_mktsegment"));
+ add(new UnboundAlias(new UnboundSlot("c_comment"),
"c_comment"));
+ }
+ };
+ List<Expression> expectedConjuncts = new ArrayList<Expression>() {
+ {
+ add(new GreaterThan(caseWhen, new IntegerLiteral(100)));
+ }
+ };
+ assertInsertIntoPlan(statements, expectedSinkColumns,
expectedProjects, expectedConjuncts, true);
+ }
+
+ private void assertInsertIntoPlan(List<Pair<LogicalPlan,
StatementContext>> statements,
+ List<String> expectedSinkColumns,
+ List<NamedExpression> expectedProjects,
+ List<Expression> expectedConjuncts,
+ boolean expectedPreFilter) throws
AnalysisException {
+ Assertions.assertTrue(statements.get(0).first instanceof LoadCommand);
+ List<LogicalPlan> plans = ((LoadCommand)
statements.get(0).first).parseToInsertIntoPlan(connectContext);
+ Assertions.assertTrue(plans.get(0) instanceof UnboundOlapTableSink);
+ List<String> colNames = ((UnboundOlapTableSink<?>)
plans.get(0)).getColNames();
+ Assertions.assertEquals(colNames.size(), expectedSinkColumns.size());
+ for (String sinkCol : expectedSinkColumns) {
+ Assertions.assertTrue(colNames.contains(sinkCol));
+ }
+ Assertions.assertTrue(plans.get(0).child(0) instanceof LogicalProject);
+ LogicalProject<?> project = ((LogicalProject<?>)
plans.get(0).child(0));
+ Set<String> projects = project.getProjects().stream()
+ .map(Object::toString)
+ .collect(Collectors.toSet());
+ for (NamedExpression namedExpression : expectedProjects) {
+
Assertions.assertTrue(projects.contains(namedExpression.toString()));
+ }
+ Assertions.assertTrue(project.child(0) instanceof LogicalFilter);
+ LogicalFilter<?> filter = ((LogicalFilter<?>) project.child(0));
+ Set<String> filterConjuncts = filter.getConjuncts().stream()
+ .map(Object::toString)
+ .collect(Collectors.toSet());
+ for (Expression expectedConjunct : expectedConjuncts) {
+
Assertions.assertTrue(filterConjuncts.contains(expectedConjunct.toString()));
+ }
+
+ Assertions.assertTrue(filter.child(0) instanceof LogicalProject);
+ LogicalProject<?> tvfProject = (LogicalProject<?>) filter.child(0);
+ if (expectedPreFilter) {
+ Assertions.assertTrue(tvfProject.child(0) instanceof
LogicalFilter);
+ LogicalFilter<?> tvfFilter = (LogicalFilter<?>)
tvfProject.child(0);
+ Assertions.assertTrue(tvfFilter.child(0) instanceof
LogicalCheckPolicy);
+ Assertions.assertTrue(tvfFilter.child(0).child(0) instanceof
UnboundTVFRelation);
+ } else {
+ Assertions.assertTrue(tvfProject.child(0) instanceof
LogicalCheckPolicy);
+ Assertions.assertTrue(tvfProject.child(0).child(0) instanceof
UnboundTVFRelation);
+ }
+ }
+
+ @Test
+ public void testParseLoadStmtPartitions() throws Exception {
+ String loadSql1 = "LOAD LABEL customer_j23( "
+ + " DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " PARTITION (c_name, dt) "
+ + " COLUMNS TERMINATED BY \"|\""
+ + " LINES TERMINATED BY \"\n\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment, dt) "
+ + " COLUMNS FROM PATH AS (dt)"
+ + " ORDER BY c_custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ List<Pair<LogicalPlan, StatementContext>> statements = new
NereidsParser().parseMultiple(loadSql1);
+ Assertions.assertFalse(statements.isEmpty());
+
+ List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+ expectedSinkColumns.add(Column.SEQUENCE_COL);
+ expectedSinkColumns.add("dt");
+ List<NamedExpression> expectedProjects = new
ArrayList<NamedExpression>() {
+ {
+ add(new UnboundAlias(new UnboundSlot("c_custkey"), "custkey"));
+ add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+ add(new UnboundAlias(new UnboundSlot("c_address"),
"c_address"));
+ add(new UnboundAlias(new UnboundSlot("c_nationkey"),
"c_nationkey"));
+ add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+ add(new UnboundAlias(new UnboundSlot("c_acctbal"),
"c_acctbal"));
+ add(new UnboundAlias(new UnboundSlot("c_mktsegment"),
"c_mktsegment"));
+ add(new UnboundAlias(new UnboundSlot("c_comment"),
"c_comment"));
+ add(new UnboundSlot("dt"));
+ }
+ };
+ List<Expression> expectedConjuncts = new ArrayList<>();
+ assertInsertIntoPlan(statements, expectedSinkColumns,
expectedProjects, expectedConjuncts, false);
+ }
+
+ @Test
+ public void testParseLoadStmtColumFromPath() throws Exception {
+ String loadSql1 = "LOAD LABEL customer_j23( "
+ + " DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " PARTITION (c_name, dt) "
+ + " COLUMNS TERMINATED BY \"|\""
+ + " LINES TERMINATED BY \"\n\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment, dt) "
+ + " COLUMNS FROM PATH AS (pt) "
+ + " SET ( custkey=c_custkey+1 ) "
+ + " PRECEDING FILTER c_nationkey=\"CHINA\" "
+ + " WHERE custkey > 100"
+ + " ORDER BY c_custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ List<Pair<LogicalPlan, StatementContext>> statements = new
NereidsParser().parseMultiple(loadSql1);
+ Assertions.assertFalse(statements.isEmpty());
+
+ List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+ expectedSinkColumns.add(Column.SEQUENCE_COL);
+ expectedSinkColumns.add("pt");
+ List<NamedExpression> expectedProjects = new
ArrayList<NamedExpression>() {
+ {
+ add(new UnboundAlias(new Add(
+ new UnboundSlot("c_custkey"), new
TinyIntLiteral((byte) 1)), "custkey"));
+ add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+ add(new UnboundAlias(new UnboundSlot("c_address"),
"c_address"));
+ add(new UnboundAlias(new UnboundSlot("c_nationkey"),
"c_nationkey"));
+ add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+ add(new UnboundAlias(new UnboundSlot("c_acctbal"),
"c_acctbal"));
+ add(new UnboundAlias(new UnboundSlot("c_mktsegment"),
"c_mktsegment"));
+ add(new UnboundAlias(new UnboundSlot("c_comment"),
"c_comment"));
+ add(new UnboundSlot("pt"));
+ }
+ };
+ List<Expression> expectedConjuncts = new ArrayList<Expression>() {
+ {
+ add(new GreaterThan(new Add(new UnboundSlot("c_custkey"), new
TinyIntLiteral((byte) 1)),
+ new IntegerLiteral(100)));
+ }
+ };
+ assertInsertIntoPlan(statements, expectedSinkColumns,
expectedProjects, expectedConjuncts, true);
+ }
+
+ @Test
+ public void testParseLoadStmtNoColumn() throws Exception {
+ String loadSql1 = "LOAD LABEL customer_no_col( "
+ + " DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " FORMAT AS CSV"
+ + " ORDER BY custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+
+ List<Pair<LogicalPlan, StatementContext>> statements = new
NereidsParser().parseMultiple(loadSql1);
+ Assertions.assertFalse(statements.isEmpty());
+ List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+ expectedSinkColumns.add(Column.SEQUENCE_COL);
+ List<NamedExpression> expectedProjects = new
ArrayList<NamedExpression>() {
+ {
+ // when no specified columns, tvf columns equals to olap
columns
+ add(new UnboundAlias(new UnboundSlot("custkey"), "custkey"));
+ add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+ add(new UnboundAlias(new UnboundSlot("c_address"),
"c_address"));
+ add(new UnboundAlias(new UnboundSlot("c_nationkey"),
"c_nationkey"));
+ add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+ add(new UnboundAlias(new UnboundSlot("c_acctbal"),
"c_acctbal"));
+ add(new UnboundAlias(new UnboundSlot("c_mktsegment"),
"c_mktsegment"));
+ add(new UnboundAlias(new UnboundSlot("c_comment"),
"c_comment"));
+ }
+ };
+ List<Expression> expectedConjuncts = new ArrayList<>();
+ assertInsertIntoPlan(statements, expectedSinkColumns,
expectedProjects, expectedConjuncts, false);
+
+ // k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)
+ String loadSql2 = "LOAD LABEL customer_no_col2( "
+ + " DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " FORMAT AS CSV"
+ + " ORDER BY custkey "
+ + " PROPERTIES( "
+ + " \"csv_schema\" = \""
+ + " custkey:INT;"
+ + " c_name:STRING;"
+ + " c_address:STRING;"
+ + " c_nationkey:INT;"
+ + " c_phone:STRING;"
+ + " c_acctbal:DECIMAL(15, 2);"
+ + " c_mktsegment:STRING;"
+ + " c_comment:STRING;\""
+ + " ) "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+
+ List<Pair<LogicalPlan, StatementContext>> statements2 = new
NereidsParser().parseMultiple(loadSql2);
+ Assertions.assertFalse(statements2.isEmpty());
+ List<String> expectedSinkColumns2 = new ArrayList<>(sinkCols1);
+ expectedSinkColumns2.add(Column.SEQUENCE_COL);
+ List<NamedExpression> expectedProjects2 = new
ArrayList<NamedExpression>() {
+ {
+ add(new UnboundAlias(new UnboundSlot("custkey"), "custkey"));
+ add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+ add(new UnboundAlias(new UnboundSlot("c_address"),
"c_address"));
+ add(new UnboundAlias(new UnboundSlot("c_nationkey"),
"c_nationkey"));
+ add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+ add(new UnboundAlias(new UnboundSlot("c_acctbal"),
"c_acctbal"));
+ add(new UnboundAlias(new UnboundSlot("c_mktsegment"),
"c_mktsegment"));
+ add(new UnboundAlias(new UnboundSlot("c_comment"),
"c_comment"));
+ }
+ };
+ List<Expression> expectedConjuncts2 = new ArrayList<>();
+ assertInsertIntoPlan(statements2, expectedSinkColumns2,
expectedProjects2, expectedConjuncts2, false);
+ }
+
+ @Test
+ public void testParseLoadStmtWithParquetMappingFilter() throws Exception {
+ String loadSql1 = "LOAD LABEL customer_dup_mapping( "
+ + " DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer_dup"
+ + " FORMAT AS PARQUET"
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment) "
+ + " SET ( custkey=c_custkey+1, address=c_address+'_base')
"
+ + " PRECEDING FILTER c_nationkey=\"CHINA\" "
+ + " WHERE custkey = 100"
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ List<Pair<LogicalPlan, StatementContext>> statements = new
NereidsParser().parseMultiple(loadSql1);
+ Assertions.assertFalse(statements.isEmpty());
+ List<String> expectedSinkColumns = new ArrayList<>(sinkCols2);
+ List<NamedExpression> expectedProjects = new
ArrayList<NamedExpression>() {
+ {
+ add(new UnboundAlias(new Add(
+ new UnboundSlot("c_custkey"), new
TinyIntLiteral((byte) 1)), "custkey"));
+ add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+ add(new UnboundAlias(new Add(
+ new UnboundSlot("c_address"), new
StringLiteral("_base")), "address"));
+ add(new UnboundAlias(new UnboundSlot("c_nationkey"),
"c_nationkey"));
+ add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+ add(new UnboundAlias(new UnboundSlot("c_acctbal"),
"c_acctbal"));
+ add(new UnboundAlias(new UnboundSlot("c_mktsegment"),
"c_mktsegment"));
+ add(new UnboundAlias(new UnboundSlot("c_comment"),
"c_comment"));
+ }
+ };
+ List<Expression> expectedConjuncts = new ArrayList<Expression>() {
+ {
+ add(new EqualTo(new Add(
+ new UnboundSlot("c_custkey"), new
TinyIntLiteral((byte) 1)), new IntegerLiteral(100)));
+ }
+ };
+ assertInsertIntoPlan(statements, expectedSinkColumns,
expectedProjects, expectedConjuncts, true);
+ }
+
+ @Test
+ public void testParseLoadStmtWithDeleteOn() throws Exception {
+ String loadSqlWithDeleteOnErr1 = "LOAD LABEL customer_label1( "
+ + " APPEND DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " COLUMNS TERMINATED BY \"|\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment) "
+ + " SET ( custkey=c_custkey+1 ) "
+ + " PRECEDING FILTER c_nationkey=\"CHINA\" "
+ + " WHERE custkey > 100"
+ + " DELETE ON c_custkey < 120 "
+ + " ORDER BY custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ try {
+ List<Pair<LogicalPlan, StatementContext>> statements =
+ new NereidsParser().parseMultiple(loadSqlWithDeleteOnErr1);
+ Assertions.assertFalse(statements.isEmpty());
+ Assertions.assertTrue(statements.get(0).first instanceof
LoadCommand);
+ ((LoadCommand)
statements.get(0).first).parseToInsertIntoPlan(connectContext);
+ } catch (AnalysisException e) {
+
Assertions.assertTrue(e.getMessage().contains(BulkLoadDataDesc.EXPECT_MERGE_DELETE_ON));
+ }
+
+ String loadSqlWithDeleteOnErr2 = "LOAD LABEL customer_label1( "
+ + " MERGE DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " COLUMNS TERMINATED BY \"|\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment) "
+ + " SET ( custkey=c_custkey+1 ) "
+ + " PRECEDING FILTER c_nationkey=\"CHINA\" "
+ + " WHERE custkey > 100"
+ + " ORDER BY custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ try {
+ List<Pair<LogicalPlan, StatementContext>> statements =
+ new NereidsParser().parseMultiple(loadSqlWithDeleteOnErr2);
+ Assertions.assertFalse(statements.isEmpty());
+ Assertions.assertTrue(statements.get(0).first instanceof
LoadCommand);
+ ((LoadCommand)
statements.get(0).first).parseToInsertIntoPlan(connectContext);
+ } catch (AnalysisException e) {
+
Assertions.assertTrue(e.getMessage().contains(BulkLoadDataDesc.EXPECT_DELETE_ON));
+ }
+
+ String loadSqlWithDeleteOnOk = "LOAD LABEL customer_label2( "
+ + " MERGE DATA INFILE(\"s3://bucket/customer\") "
+ + " INTO TABLE customer"
+ + " COLUMNS TERMINATED BY \"|\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment) "
+ + " SET ( custkey=c_custkey+1 ) "
+ + " PRECEDING FILTER c_nationkey=\"CHINA\" "
+ + " WHERE custkey > 100"
+ + " DELETE ON custkey < 120 "
+ + " ORDER BY custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+
+ FeConstants.unitTestConstant = new ArrayList<Column>() {
+ {
+ add(new Column("c_custkey", PrimitiveType.INT, true));
+ add(new Column("c_name", PrimitiveType.VARCHAR, true));
+ add(new Column("c_address", PrimitiveType.VARCHAR, true));
+ add(new Column("c_nationkey", PrimitiveType.INT, true));
+ add(new Column("c_phone", PrimitiveType.VARCHAR, true));
+ add(new Column("c_acctbal", PrimitiveType.DECIMALV2, true));
+ add(new Column("c_mktsegment", PrimitiveType.VARCHAR, true));
+ add(new Column("c_comment", PrimitiveType.VARCHAR, true));
+ }
+ };
+ new MockUp<ExpressionEstimation>(ExpressionEstimation.class) {
+ @Mock
+ public ColumnStatistic visitCast(Cast cast, Statistics context) {
+ return ColumnStatistic.UNKNOWN;
+ }
+
+ @Mock
+ public ColumnStatistic visitBinaryArithmetic(BinaryArithmetic
binaryArithmetic, Statistics context) {
+ return ColumnStatistic.UNKNOWN;
+ }
+ };
+
+ List<Pair<LogicalPlan, StatementContext>> statements = new
NereidsParser().parseMultiple(loadSqlWithDeleteOnOk);
+ Assertions.assertFalse(statements.isEmpty());
+
+ List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+ expectedSinkColumns.add(Column.SEQUENCE_COL);
+ expectedSinkColumns.add(Column.DELETE_SIGN);
+ List<NamedExpression> expectedProjects = new ArrayList<>();
+ List<Expression> expectedConjuncts = new ArrayList<>();
+ assertInsertIntoPlan(statements, expectedSinkColumns,
expectedProjects, expectedConjuncts, true);
+ // new StmtExecutor(connectContext, loadSqlWithDeleteOnOk).execute();
+ }
+
+ @Test
+ public void testParseLoadStmtPatternPath() throws Exception {
+ String path1 = "part*";
+ String path2 = "*/part_000";
+ String path3 = "*part_000*";
+ String path4 = "*/*part_000*";
+ String loadTemplate = "LOAD LABEL customer_j23( "
+ + " DATA INFILE(\"s3://bucket/customer/PATTERN\") "
+ + " INTO TABLE customer"
+ + " PARTITION (c_name, dt) "
+ + " COLUMNS TERMINATED BY \"|\""
+ + " LINES TERMINATED BY \"\n\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment, dt) "
+ + " ORDER BY c_custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ Assertions.assertFalse(new NereidsParser()
+ .parseMultiple(loadTemplate.replace("PATTERN",
path1)).isEmpty());
+ Assertions.assertFalse(new NereidsParser()
+ .parseMultiple(loadTemplate.replace("PATTERN",
path2)).isEmpty());
+ Assertions.assertFalse(new NereidsParser()
+ .parseMultiple(loadTemplate.replace("PATTERN",
path3)).isEmpty());
+ Assertions.assertFalse(new NereidsParser()
+ .parseMultiple(loadTemplate.replace("PATTERN",
path4)).isEmpty());
+ }
+
+ @Test
+ public void testParseLoadStmtMultiLocations() throws Exception {
+ String loadMultiLocations = "LOAD LABEL customer_j23( "
+ + " DATA INFILE("
+ + " \"s3://bucket/customer/path1\", "
+ + " \"s3://bucket/customer/path2\", "
+ + " \"s3://bucket/customer/path3\") "
+ + " INTO TABLE customer"
+ + " PARTITION (c_name, dt) "
+ + " COLUMNS TERMINATED BY \"|\""
+ + " LINES TERMINATED BY \"\n\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment, dt) "
+ + " ORDER BY c_custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ Assertions.assertFalse(new NereidsParser()
+ .parseMultiple(loadMultiLocations).isEmpty());
+ }
+
+ @Test
+ public void testParseLoadStmtMultiBulkDesc() throws Exception {
+ String loadMultiLocations = "LOAD LABEL customer_j23( "
+ + " DATA INFILE("
+ + " \"s3://bucket/customer/path1\", "
+ + " \"s3://bucket/customer/path2\", "
+ + " \"s3://bucket/customer/path3\") "
+ + " INTO TABLE customer"
+ + " PARTITION (c_name) "
+ + " COLUMNS TERMINATED BY \"|\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment) "
+ + " ORDER BY c_custkey "
+ + " ,"
+ + " DATA INFILE(\"s3://bucket/customer/par_a*\") "
+ + " INTO TABLE customer_dup"
+ + " FORMAT AS PARQUET"
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment) "
+ + " SET ( custkey=c_custkey+1, address=c_address+'_base')
"
+ + " WHERE custkey < 50"
+ + " ,"
+ + " DATA INFILE("
+ + " \"s3://bucket/customer/p\") "
+ + " INTO TABLE customer"
+ + " PARTITION (c_name, dt) "
+ + " COLUMNS TERMINATED BY \"|\""
+ + " LINES TERMINATED BY \"\n\""
+ + " (c_custkey, c_name, c_address, c_nationkey, c_phone,
c_acctbal, c_mktsegment, c_comment, dt)"
+ + " SET ( custkey=c_custkey+1 ) "
+ + " PRECEDING FILTER c_nationkey=\"CHINA\" "
+ + " WHERE custkey > 100"
+ + " ORDER BY c_custkey "
+ + " ) "
+ + " WITH S3( "
+ + " \"s3.access_key\" = \"AK\", "
+ + " \"s3.secret_key\" = \"SK\", "
+ + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", "
+ + " \"s3.region\" = \"ap-beijing\") "
+ + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT
\"test\";";
+ Assertions.assertFalse(new NereidsParser()
+ .parseMultiple(loadMultiLocations).isEmpty());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
index 64a0dc5cad..a6eb67b061 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
@@ -32,15 +32,16 @@ import
org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.datasource.property.constants.S3Properties.Env;
import org.apache.doris.load.loadv2.LoadTask.MergeType;
import org.apache.doris.tablefunction.S3TableValuedFunction;
+import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Injectable;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import java.io.StringReader;
import java.util.Collections;
@@ -48,7 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public class S3TvfLoadStmtTest {
+public class S3TvfLoadStmtTest extends TestWithFeService {
private static final String ACCESS_KEY_VALUE = "ak";
@@ -70,7 +71,7 @@ public class S3TvfLoadStmtTest {
private Set<String> colNames;
- @Before
+ @BeforeAll
public void setUp() throws AnalysisException {
FeConstants.runningUnitTest = true;
@@ -101,7 +102,7 @@ public class S3TvfLoadStmtTest {
Maps.newHashMap(), "comment");
final SelectStmt selectStmt = (SelectStmt)
s3TvfLoadStmt.getQueryStmt();
final Expr whereClause = Deencapsulation.getField(selectStmt,
"whereClause");
- Assert.assertEquals(whereClause, new
CompoundPredicate(CompoundPredicate.Operator.AND, greater, less));
+ Assertions.assertEquals(whereClause, new
CompoundPredicate(CompoundPredicate.Operator.AND, greater, less));
}
@Test
@@ -115,15 +116,15 @@ public class S3TvfLoadStmtTest {
final TableRef tvfRef = Deencapsulation.invoke(S3TvfLoadStmt.class,
"buildTvfRef",
dataDescription, brokerDesc);
- Assert.assertTrue(tvfRef instanceof TableValuedFunctionRef);
+ Assertions.assertTrue(tvfRef instanceof TableValuedFunctionRef);
final S3TableValuedFunction tableFunction
= (S3TableValuedFunction) ((TableValuedFunctionRef)
tvfRef).getTableFunction();
final Map<String, String> locationProperties =
tableFunction.getLocationProperties();
- Assert.assertEquals(locationProperties.get(S3Properties.ENDPOINT),
ENDPOINT_VALUE);
- Assert.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY),
ACCESS_KEY_VALUE);
- Assert.assertEquals(locationProperties.get(S3Properties.SECRET_KEY),
SECRET_KEY_VALUE);
- Assert.assertEquals(locationProperties.get(S3Properties.REGION),
REGION_VALUE);
- Assert.assertEquals(tableFunction.getFilePath(), DATA_URI);
+ Assertions.assertEquals(locationProperties.get(S3Properties.ENDPOINT),
ENDPOINT_VALUE);
+
Assertions.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY),
ACCESS_KEY_VALUE);
+
Assertions.assertEquals(locationProperties.get(S3Properties.SECRET_KEY),
SECRET_KEY_VALUE);
+ Assertions.assertEquals(locationProperties.get(S3Properties.REGION),
REGION_VALUE);
+ Assertions.assertEquals(tableFunction.getFilePath(), DATA_URI);
}
@Injectable
@@ -176,13 +177,13 @@ public class S3TvfLoadStmtTest {
Deencapsulation.setField(s3TvfLoadStmt, "functionGenTableColNames",
Sets.newHashSet("c1", "c2", "c3"));
Deencapsulation.invoke(s3TvfLoadStmt, "rewriteExpr", columnsDescList);
- Assert.assertEquals(columnsDescList.size(), 5);
+ Assertions.assertEquals(columnsDescList.size(), 5);
final String orig4 = "((upper(`c1`) + 1) + 1)";
- Assert.assertEquals(orig4,
columnsDescList.get(4).getExpr().toString());
+ Assertions.assertEquals(orig4,
columnsDescList.get(4).getExpr().toString());
final List<ImportColumnDesc> filterColumns =
Deencapsulation.invoke(s3TvfLoadStmt,
"filterColumns", columnsDescList);
- Assert.assertEquals(filterColumns.size(), 4);
+ Assertions.assertEquals(filterColumns.size(), 4);
}
private static DataDescription buildDataDesc(Iterable<String> columns,
Expr fileFilter, Expr wherePredicate,
@@ -210,11 +211,12 @@ public class S3TvfLoadStmtTest {
private static List<ImportColumnDesc> getColumnsDescList(String columns)
throws Exception {
String columnsSQL = "COLUMNS (" + columns + ")";
return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt(
- new SqlParser(new SqlScanner(new
StringReader(columnsSQL))))).getColumns();
+ new org.apache.doris.analysis.SqlParser(
+ new org.apache.doris.analysis.SqlScanner(new
StringReader(columnsSQL))))).getColumns();
}
private static List<Column> getBaseSchema() {
- List<Column> columns = com.google.common.collect.Lists.newArrayList();
+ List<Column> columns = Lists.newArrayList();
Column c1 = new Column("c1", PrimitiveType.BIGINT);
c1.setIsKey(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]