This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b5227af6a1 [Feature](partitions) Support auto partition FE part
(#24079)
b5227af6a1 is described below
commit b5227af6a102f3837eab6740f8a010c44e1ab303
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Sep 11 17:48:19 2023 +0800
[Feature](partitions) Support auto partition FE part (#24079)
---
.../org/apache/doris/common/FeMetaVersion.java | 4 +-
fe/fe-core/src/main/cup/sql_parser.cup | 23 ++-
.../apache/doris/analysis/ListPartitionDesc.java | 21 ++-
.../org/apache/doris/analysis/PartitionDesc.java | 81 ++++++++-
.../apache/doris/analysis/PartitionExprUtil.java | 193 +++++++++++++++++++++
.../apache/doris/analysis/RangePartitionDesc.java | 19 +-
.../apache/doris/catalog/ListPartitionInfo.java | 41 ++++-
.../java/org/apache/doris/catalog/OlapTable.java | 2 +-
.../org/apache/doris/catalog/PartitionInfo.java | 36 +++-
.../apache/doris/catalog/RangePartitionInfo.java | 41 ++++-
.../org/apache/doris/planner/OlapTableSink.java | 31 +++-
.../apache/doris/service/FrontendServiceImpl.java | 135 ++++++++++++++
.../apache/doris/catalog/TruncateTableTest.java | 13 +-
.../common/util/DynamicPartitionUtilTest.java | 2 +-
.../doris/service/FrontendServiceImplTest.java | 159 +++++++++++++++++
gensrc/proto/internal_service.proto | 1 +
gensrc/thrift/Descriptors.thrift | 4 +
gensrc/thrift/FrontendService.thrift | 17 ++
.../suites/ddl_p0/test_truncate_table.groovy | 2 +-
19 files changed, 788 insertions(+), 37 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 1ae98bedcf..e33ef112e2 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -68,9 +68,11 @@ public final class FeMetaVersion {
public static final int VERSION_123 = 123;
// For auto-increment column
public static final int VERSION_124 = 124;
+ // For write/read auto create partition expr
+ public static final int VERSION_125 = 125;
// note: when increment meta version, should assign the latest version to
VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_124;
+ public static final int VERSION_CURRENT = VERSION_125;
// all logs meta version should >= the minimum version, so that we could
remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index ede6dd7ca6..94fcd2260f 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -3228,7 +3228,28 @@ opt_partition ::=
| KW_PARTITION KW_BY KW_LIST LPAREN ident_list:columns RPAREN
LPAREN opt_all_partition_desc_list:list RPAREN
{:
- RESULT = new ListPartitionDesc(columns, list);
+ RESULT = new ListPartitionDesc(columns, list);
+ :}
+ /* expr range partition */
+ | KW_AUTO KW_PARTITION KW_BY KW_RANGE function_call_expr:fnExpr
+ LPAREN opt_all_partition_desc_list:list RPAREN
+ {:
+ ArrayList<Expr> exprs = new ArrayList<Expr>();
+ exprs.add(fnExpr);
+ RESULT = RangePartitionDesc.createRangePartitionDesc(exprs, list);
+ :}
+ /* expr list partition */
+ | KW_AUTO KW_PARTITION KW_BY KW_LIST LPAREN expr_list:exprs RPAREN
+ LPAREN opt_all_partition_desc_list:list RPAREN
+ {:
+ RESULT = ListPartitionDesc.createListPartitionDesc(exprs, list);
+ :}
+ | KW_AUTO KW_PARTITION KW_BY KW_LIST function_call_expr:fnExpr
+ LPAREN opt_all_partition_desc_list:list RPAREN
+ {:
+ ArrayList<Expr> exprs = new ArrayList<Expr>();
+ exprs.add(fnExpr);
+ RESULT = ListPartitionDesc.createListPartitionDesc(exprs, list);
:}
;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java
index d0b6bebf05..0ca97ca960 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ListPartitionDesc.java
@@ -36,6 +36,24 @@ public class ListPartitionDesc extends PartitionDesc {
List<AllPartitionDesc> allPartitionDescs) throws
AnalysisException {
super(partitionColNames, allPartitionDescs);
type = PartitionType.LIST;
+ this.isAutoCreatePartitions = false;
+ }
+
+ public ListPartitionDesc(ArrayList<Expr> exprs, List<String>
partitionColNames,
+ List<AllPartitionDesc> allPartitionDescs) throws AnalysisException
{
+ if (exprs != null) {
+ this.partitionExprs = exprs;
+ }
+ this.partitionColNames = partitionColNames;
+ this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
+ this.type = PartitionType.LIST;
+ this.isAutoCreatePartitions = true;
+ }
+
+ public static ListPartitionDesc createListPartitionDesc(ArrayList<Expr>
exprs,
+ List<AllPartitionDesc> allPartitionDescs) throws AnalysisException
{
+ List<String> colNames = getColNamesFromExpr(exprs, true);
+ return new ListPartitionDesc(exprs, colNames, allPartitionDescs);
}
@Override
@@ -100,7 +118,8 @@ public class ListPartitionDesc extends PartitionDesc {
}
}
- ListPartitionInfo listPartitionInfo = new
ListPartitionInfo(partitionColumns);
+ ListPartitionInfo listPartitionInfo = new
ListPartitionInfo(this.isAutoCreatePartitions, this.partitionExprs,
+ partitionColumns);
for (SinglePartitionDesc desc : singlePartitionDescs) {
long partitionId = partitionNameToId.get(desc.getPartitionName());
listPartitionInfo.handleNewSinglePartitionDesc(desc, partitionId,
isTemp);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java
index 11cb795fd6..8c5ff8b0ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionDesc.java
@@ -27,11 +27,14 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.qe.ConnectContext;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.NotImplementedException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,14 +42,23 @@ import java.util.Set;
public class PartitionDesc {
protected List<String> partitionColNames;
protected List<SinglePartitionDesc> singlePartitionDescs;
-
+ protected ArrayList<Expr> partitionExprs; //eg: auto partition by range
date_trunc(column, 'day')
+ protected boolean isAutoCreatePartitions;
protected PartitionType type;
+ public static final ImmutableSet<String> RANGE_PARTITION_FUNCTIONS = new
ImmutableSortedSet.Builder<String>(
+
String.CASE_INSENSITIVE_ORDER).add("date_trunc").add("date_ceil").add("date_floor")
+ .build();
public PartitionDesc() {}
public PartitionDesc(List<String> partitionColNames,
List<AllPartitionDesc> allPartitionDescs) throws
AnalysisException {
this.partitionColNames = partitionColNames;
+ this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
+ }
+
+ public List<SinglePartitionDesc>
handleAllPartitionDesc(List<AllPartitionDesc> allPartitionDescs)
+ throws AnalysisException {
boolean isMultiPartition = false;
List<SinglePartitionDesc> tmpList = Lists.newArrayList();
if (allPartitionDescs != null) {
@@ -65,7 +77,7 @@ public class PartitionDesc {
throw new AnalysisException("multi partition column size except 1
but provided "
+ partitionColNames.size() + ".");
}
- this.singlePartitionDescs = tmpList;
+ return tmpList;
}
public List<SinglePartitionDesc> getSinglePartitionDescs() {
@@ -85,6 +97,62 @@ public class PartitionDesc {
return partitionColNames;
}
+ // 1. partition by list (column) : now support one slotRef
+ // 2. partition by range(column/function(column)) : support slotRef and
some
+ // special function eg: date_trunc, date_floor/ceil
+ public static List<String> getColNamesFromExpr(ArrayList<Expr> exprs,
boolean isListPartition)
+ throws AnalysisException {
+ List<String> colNames = new ArrayList<>();
+ for (Expr expr : exprs) {
+ if ((expr instanceof FunctionCallExpr) && (isListPartition ==
false)) {
+ FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
+ List<Expr> paramsExpr = functionCallExpr.getParams().exprs();
+ String name = functionCallExpr.getFnName().getFunction();
+ if (RANGE_PARTITION_FUNCTIONS.contains(name)) {
+ for (Expr param : paramsExpr) {
+ if (param instanceof SlotRef) {
+ if (colNames.isEmpty()) {
+ colNames.add(((SlotRef)
param).getColumnName());
+ } else {
+ throw new AnalysisException(
+ "auto create partition only support
one slotRef in function expr. "
+ + expr.toSql());
+ }
+ }
+ }
+ } else {
+ throw new AnalysisException(
+ "auto create partition only support function call
expr is date_trunc/date_floor/date_ceil. "
+ + expr.toSql());
+ }
+ } else if (expr instanceof SlotRef) {
+ if (colNames.isEmpty()) {
+ colNames.add(((SlotRef) expr).getColumnName());
+ } else {
+ throw new AnalysisException(
+ "auto create partition only support one slotRef in
expr. "
+ + expr.toSql());
+ }
+ } else {
+ if (!isListPartition) {
+ throw new AnalysisException(
+ "auto create partition only support slotRef and
date_trunc/date_floor/date_ceil"
+ + "function in range partitions. " +
expr.toSql());
+ } else {
+ throw new AnalysisException(
+ "auto create partition only support slotRef in
list partitions. "
+ + expr.toSql());
+ }
+ }
+ }
+ if (colNames.isEmpty()) {
+ throw new AnalysisException(
+ "auto create partition have not find any partition
columns. "
+ + exprs.get(0).toSql());
+ }
+ return colNames;
+ }
+
public void analyze(List<ColumnDef> columnDefs, Map<String, String>
otherProperties) throws AnalysisException {
if (partitionColNames == null || partitionColNames.isEmpty()) {
throw new AnalysisException("No partition columns.");
@@ -128,6 +196,15 @@ public class PartitionDesc {
if (this instanceof ListPartitionDesc &&
columnDef.isAllowNull()) {
throw new AnalysisException("The list partition column
must be NOT NULL");
}
+ if (this instanceof RangePartitionDesc && partitionExprs
!= null) {
+ if (partitionExprs.get(0) instanceof FunctionCallExpr)
{
+ if (!columnDef.getType().isDatetime() &&
!columnDef.getType().isDatetimeV2()) {
+ throw new AnalysisException(
+ "auto create partition function expr
need datetime/datetimev2 type. "
+ +
partitionExprs.get(0).toSql());
+ }
+ }
+ }
found = true;
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionExprUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionExprUtil.java
new file mode 100644
index 0000000000..dadf74b27c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PartitionExprUtil.java
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TStringLiteral;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class PartitionExprUtil {
+ public static final String DATETIME_FORMATTER = "%04d-%02d-%02d
%02d:%02d:%02d";
+ public static final String DATE_FORMATTER = "%04d-%02d-%02d";
+ public static final String DATETIME_NAME_FORMATTER =
"%04d%02d%02d%02d%02d%02d";
+ private static final Logger LOG =
LogManager.getLogger(PartitionExprUtil.class);
+ private static final PartitionExprUtil partitionExprUtil = new
PartitionExprUtil();
+
+ public static FunctionIntervalInfo getFunctionIntervalInfo(ArrayList<Expr>
partitionExprs,
+ PartitionType partitionType) throws AnalysisException {
+ if (partitionType != PartitionType.RANGE) {
+ return null;
+ }
+ if (partitionExprs.size() != 1) {
+ throw new AnalysisException("now only support one expr in range
partition");
+ }
+
+ Expr e = partitionExprs.get(0);
+ if (!(e instanceof FunctionCallExpr)) {
+ throw new AnalysisException("now range partition only support
FunctionCallExpr");
+ }
+ FunctionCallExpr functionCallExpr = (FunctionCallExpr) e;
+ String fnName = functionCallExpr.getFnName().getFunction();
+ String timeUnit;
+ int interval;
+ if ("date_trunc".equalsIgnoreCase(fnName)) {
+ List<Expr> paramsExprs = functionCallExpr.getParams().exprs();
+ if (paramsExprs.size() != 2) {
+ throw new AnalysisException("date_trunc params exprs size
should be 2.");
+ }
+ Expr param = paramsExprs.get(1);
+ if (!(param instanceof StringLiteral)) {
+ throw new AnalysisException("date_trunc param of time unit is
not string literal.");
+ }
+ timeUnit = ((StringLiteral) param).getStringValue().toLowerCase();
+ interval = 1;
+ } else {
+ throw new AnalysisException("now range partition only support
date_trunc.");
+ }
+ return partitionExprUtil.new FunctionIntervalInfo(timeUnit, interval);
+ }
+
+ public static DateLiteral getRangeEnd(DateLiteral beginTime,
FunctionIntervalInfo intervalInfo)
+ throws AnalysisException {
+ String timeUnit = intervalInfo.timeUnit;
+ int interval = intervalInfo.interval;
+ switch (timeUnit) {
+ case "year":
+ return beginTime.plusYears(interval);
+ case "month":
+ return beginTime.plusMonths(interval);
+ case "day":
+ return beginTime.plusDays(interval);
+ case "hour":
+ return beginTime.plusHours(interval);
+ case "minute":
+ return beginTime.plusMinutes(interval);
+ case "second":
+ return beginTime.plusSeconds(interval);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ public static Map<String, AddPartitionClause>
getAddPartitionClauseFromPartitionValues(OlapTable olapTable,
+ ArrayList<TStringLiteral> partitionValues, PartitionInfo
partitionInfo)
+ throws AnalysisException {
+ Map<String, AddPartitionClause> result = Maps.newHashMap();
+ ArrayList<Expr> partitionExprs = partitionInfo.getPartitionExprs();
+ PartitionType partitionType = partitionInfo.getType();
+ List<Column> partiitonColumn = partitionInfo.getPartitionColumns();
+ Type partitionColumnType = partiitonColumn.get(0).getType();
+ FunctionIntervalInfo intervalInfo =
getFunctionIntervalInfo(partitionExprs, partitionType);
+ Set<String> filterPartitionValues = new HashSet<String>();
+
+ for (TStringLiteral partitionValue : partitionValues) {
+ PartitionKeyDesc partitionKeyDesc = null;
+ String partitionName = "p";
+ String value = partitionValue.value;
+ if (filterPartitionValues.contains(value)) {
+ continue;
+ }
+ filterPartitionValues.add(value);
+ if (partitionType == PartitionType.RANGE) {
+ String beginTime = value;
+ DateLiteral beginDateTime = new DateLiteral(beginTime,
Type.DATETIMEV2);
+ partitionName += String.format(DATETIME_NAME_FORMATTER,
+ beginDateTime.getYear(), beginDateTime.getMonth(),
beginDateTime.getDay(),
+ beginDateTime.getHour(), beginDateTime.getMinute(),
beginDateTime.getSecond());
+ DateLiteral endDateTime = getRangeEnd(beginDateTime,
intervalInfo);
+ partitionKeyDesc =
createPartitionKeyDescWithRange(beginDateTime, endDateTime,
partitionColumnType);
+ } else if (partitionType == PartitionType.LIST) {
+ List<List<PartitionValue>> listValues = new ArrayList<>();
+ // TODO: need to support any type
+ String pointValue = value;
+ PartitionValue lowerValue = new PartitionValue(pointValue);
+ listValues.add(Collections.singletonList(lowerValue));
+ partitionKeyDesc = PartitionKeyDesc.createIn(
+ listValues);
+ partitionName += lowerValue.getStringValue();
+ } else {
+ throw new AnalysisException("now only support range and list
partition");
+ }
+
+ Map<String, String> partitionProperties = Maps.newHashMap();
+ DistributionDesc distributionDesc =
olapTable.getDefaultDistributionInfo().toDistributionDesc();
+
+ SinglePartitionDesc singleRangePartitionDesc = new
SinglePartitionDesc(true, partitionName,
+ partitionKeyDesc, partitionProperties);
+
+ AddPartitionClause addPartitionClause = new
AddPartitionClause(singleRangePartitionDesc,
+ distributionDesc, partitionProperties, false);
+ result.put(partitionName, addPartitionClause);
+ }
+ return result;
+ }
+
+ public static PartitionKeyDesc createPartitionKeyDescWithRange(DateLiteral
beginDateTime,
+ DateLiteral endDateTime, Type partitionColumnType) throws
AnalysisException {
+ String beginTime;
+ String endTime;
+ // maybe need check the range in FE also, like getAddPartitionClause.
+ if (partitionColumnType.isDate() || partitionColumnType.isDateV2()) {
+ beginTime = String.format(DATE_FORMATTER, beginDateTime.getYear(),
beginDateTime.getMonth(),
+ beginDateTime.getDay());
+ endTime = String.format(DATE_FORMATTER, endDateTime.getYear(),
endDateTime.getMonth(),
+ endDateTime.getDay());
+ } else if (partitionColumnType.isDatetime() ||
partitionColumnType.isDatetimeV2()) {
+ beginTime = String.format(DATETIME_FORMATTER,
+ beginDateTime.getYear(), beginDateTime.getMonth(),
beginDateTime.getDay(),
+ beginDateTime.getHour(), beginDateTime.getMinute(),
beginDateTime.getSecond());
+ endTime = String.format(DATETIME_FORMATTER,
+ endDateTime.getYear(), endDateTime.getMonth(),
endDateTime.getDay(),
+ endDateTime.getHour(), endDateTime.getMinute(),
endDateTime.getSecond());
+ } else {
+ throw new AnalysisException(
+ "not support range partition with column type : " +
partitionColumnType.toString());
+ }
+ PartitionValue lowerValue = new PartitionValue(beginTime);
+ PartitionValue upperValue = new PartitionValue(endTime);
+ return PartitionKeyDesc.createFixed(
+ Collections.singletonList(lowerValue),
+ Collections.singletonList(upperValue));
+ }
+
+ public class FunctionIntervalInfo {
+ public String timeUnit;
+ public int interval;
+
+ public FunctionIntervalInfo(String timeUnit, int interval) {
+ this.timeUnit = timeUnit;
+ this.interval = interval;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java
index cc18df6299..099e5b0b21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RangePartitionDesc.java
@@ -35,6 +35,22 @@ public class RangePartitionDesc extends PartitionDesc {
List<AllPartitionDesc> allPartitionDescs) throws
AnalysisException {
super(partitionColNames, allPartitionDescs);
type = org.apache.doris.catalog.PartitionType.RANGE;
+ this.isAutoCreatePartitions = false;
+ }
+
+ public RangePartitionDesc(ArrayList<Expr> exprs, List<String>
partitionColNames,
+ List<AllPartitionDesc> allPartitionDescs) throws AnalysisException
{
+ this.partitionExprs = exprs;
+ this.partitionColNames = partitionColNames;
+ this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
+ this.type = org.apache.doris.catalog.PartitionType.RANGE;
+ this.isAutoCreatePartitions = true;
+ }
+
+ public static RangePartitionDesc createRangePartitionDesc(ArrayList<Expr>
exprs,
+ List<AllPartitionDesc> allPartitionDescs) throws AnalysisException
{
+ List<String> colNames = getColNamesFromExpr(exprs, false);
+ return new RangePartitionDesc(exprs, colNames, allPartitionDescs);
}
@Override
@@ -116,7 +132,8 @@ public class RangePartitionDesc extends PartitionDesc {
* [ {10, 100, 1000}, {50, 500, MIN } )
* [ {50, 500, MIN }, {80, MIN, MIN } )
*/
- RangePartitionInfo rangePartitionInfo = new
RangePartitionInfo(partitionColumns);
+ RangePartitionInfo rangePartitionInfo = new
RangePartitionInfo(this.isAutoCreatePartitions, this.partitionExprs,
+ partitionColumns);
for (SinglePartitionDesc desc : singlePartitionDescs) {
long partitionId = partitionNameToId.get(desc.getPartitionName());
rangePartitionInfo.handleNewSinglePartitionDesc(desc, partitionId,
isTemp);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java
index c7a6b5e5a1..d657dc7d9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java
@@ -18,11 +18,13 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.AllPartitionDesc;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ListPartitionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.SinglePartitionDesc;
+import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ListUtil;
@@ -54,6 +56,14 @@ public class ListPartitionInfo extends PartitionInfo {
this.isMultiColumnPartition = partitionColumns.size() > 1;
}
+ public ListPartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr>
exprs, List<Column> partitionColumns) {
+ super(PartitionType.LIST, partitionColumns);
+ this.isAutoCreatePartitions = isAutoCreatePartitions;
+ if (exprs != null) {
+ this.partitionExprs.addAll(exprs);
+ }
+ }
+
public static PartitionInfo read(DataInput in) throws IOException {
PartitionInfo partitionInfo = new ListPartitionInfo();
partitionInfo.readFields(in);
@@ -186,16 +196,31 @@ public class ListPartitionInfo extends PartitionInfo {
@Override
public String toSql(OlapTable table, List<Long> partitionId) {
StringBuilder sb = new StringBuilder();
- sb.append("PARTITION BY LIST(");
int idx = 0;
- for (Column column : partitionColumns) {
- if (idx != 0) {
- sb.append(", ");
+ if (enableAutomaticPartition()) {
+ sb.append("AUTO PARTITION BY LIST ");
+ for (Expr e : partitionExprs) {
+ boolean isSlotRef = (e instanceof SlotRef);
+ if (isSlotRef) {
+ sb.append("(");
+ }
+ sb.append(e.toSql());
+ if (isSlotRef) {
+ sb.append(")");
+ }
}
- sb.append("`").append(column.getName()).append("`");
- idx++;
+ sb.append("\n(");
+ } else {
+ sb.append("PARTITION BY LIST(");
+ for (Column column : partitionColumns) {
+ if (idx != 0) {
+ sb.append(", ");
+ }
+ sb.append("`").append(column.getName()).append("`");
+ idx++;
+ }
+ sb.append(")\n(");
}
- sb.append(")\n(");
// sort list
List<Map.Entry<Long, PartitionItem>> entries = new
ArrayList<>(this.idToItem.entrySet());
@@ -269,6 +294,6 @@ public class ListPartitionInfo extends PartitionInfo {
allPartitionDescs.add(new SinglePartitionDesc(false,
partitionName, partitionKeyDesc, properties));
}
- return new ListPartitionDesc(partitionColumnNames, allPartitionDescs);
+ return new ListPartitionDesc(this.partitionExprs,
partitionColumnNames, allPartitionDescs);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 290a6dd660..35db545f14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -137,7 +137,7 @@ public class OlapTable extends Table {
private PartitionInfo partitionInfo;
@SerializedName("idToPartition")
private Map<Long, Partition> idToPartition = new HashMap<>();
- private Map<String, Partition> nameToPartition =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ private Map<String, Partition> nameToPartition = Maps.newTreeMap();
@SerializedName(value = "distributionInfo")
private DistributionInfo defaultDistributionInfo;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index d319882af4..b7ca3c622c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.MaxLiteral;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionValue;
@@ -40,6 +41,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -81,6 +83,10 @@ public class PartitionInfo implements Writable {
// so we defer adding meta serialization until memory engine feature is
more complete.
protected Map<Long, TTabletType> idToTabletType;
+ // the enable automatic partition will hold this, could create partition
by expr result
+ protected ArrayList<Expr> partitionExprs;
+ protected boolean isAutoCreatePartitions;
+
public PartitionInfo() {
this.type = PartitionType.UNPARTITIONED;
this.idToDataProperty = new HashMap<>();
@@ -88,6 +94,7 @@ public class PartitionInfo implements Writable {
this.idToInMemory = new HashMap<>();
this.idToTabletType = new HashMap<>();
this.idToStoragePolicy = new HashMap<>();
+ this.partitionExprs = new ArrayList<>();
}
public PartitionInfo(PartitionType type) {
@@ -97,6 +104,7 @@ public class PartitionInfo implements Writable {
this.idToInMemory = new HashMap<>();
this.idToTabletType = new HashMap<>();
this.idToStoragePolicy = new HashMap<>();
+ this.partitionExprs = new ArrayList<>();
}
public PartitionInfo(PartitionType type, List<Column> partitionColumns) {
@@ -215,6 +223,14 @@ public class PartitionInfo implements Writable {
return null;
}
+ public boolean enableAutomaticPartition() {
+ return isAutoCreatePartitions;
+ }
+
+ public ArrayList<Expr> getPartitionExprs() {
+ return this.partitionExprs;
+ }
+
public void checkPartitionItemListsMatch(List<PartitionItem> list1,
List<PartitionItem> list2) throws DdlException {
}
@@ -374,6 +390,13 @@ public class PartitionInfo implements Writable {
idToReplicaAllocation.get(entry.getKey()).write(out);
out.writeBoolean(idToInMemory.get(entry.getKey()));
}
+ int size = partitionExprs.size();
+ out.writeInt(size);
+ for (int i = 0; i < size; ++i) {
+ Expr e = this.partitionExprs.get(i);
+ Expr.writeTo(e, out);
+ }
+ out.writeBoolean(isAutoCreatePartitions);
}
public void readFields(DataInput in) throws IOException {
@@ -400,6 +423,14 @@ public class PartitionInfo implements Writable {
idToInMemory.put(partitionId, in.readBoolean());
}
+ if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_125) {
+ int size = in.readInt();
+ for (int i = 0; i < size; ++i) {
+ Expr e = Expr.readIn(in);
+ this.partitionExprs.add(e);
+ }
+ this.isAutoCreatePartitions = in.readBoolean();
+ }
}
@Override
@@ -438,12 +469,13 @@ public class PartitionInfo implements Writable {
&& Objects.equals(idToTempItem, that.idToTempItem) &&
Objects.equals(idToDataProperty,
that.idToDataProperty) && Objects.equals(idToStoragePolicy,
that.idToStoragePolicy)
&& Objects.equals(idToReplicaAllocation,
that.idToReplicaAllocation) && Objects.equals(
- idToInMemory, that.idToInMemory) &&
Objects.equals(idToTabletType, that.idToTabletType);
+ idToInMemory, that.idToInMemory) &&
Objects.equals(idToTabletType, that.idToTabletType)
+ && Objects.equals(partitionExprs, that.partitionExprs);
}
@Override
public int hashCode() {
return Objects.hash(type, partitionColumns, idToItem, idToTempItem,
idToDataProperty, idToStoragePolicy,
- idToReplicaAllocation, isMultiColumnPartition, idToInMemory,
idToTabletType);
+ idToReplicaAllocation, isMultiColumnPartition, idToInMemory,
idToTabletType, partitionExprs);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java
index cd3614bec8..952fa88d25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java
@@ -18,10 +18,12 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.AllPartitionDesc;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.SinglePartitionDesc;
+import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.RangeUtils;
@@ -54,6 +56,14 @@ public class RangePartitionInfo extends PartitionInfo {
this.isMultiColumnPartition = partitionColumns.size() > 1;
}
+ public RangePartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr>
exprs, List<Column> partitionColumns) {
+ super(PartitionType.RANGE, partitionColumns);
+ this.isAutoCreatePartitions = isAutoCreatePartitions;
+ if (exprs != null) {
+ this.partitionExprs.addAll(exprs);
+ }
+ }
+
@Override
public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc,
boolean isTemp) throws DdlException {
Range<PartitionKey> newRange = null;
@@ -252,16 +262,31 @@ public class RangePartitionInfo extends PartitionInfo {
@Override
public String toSql(OlapTable table, List<Long> partitionId) {
StringBuilder sb = new StringBuilder();
- sb.append("PARTITION BY RANGE(");
int idx = 0;
- for (Column column : partitionColumns) {
- if (idx != 0) {
- sb.append(", ");
+ if (enableAutomaticPartition()) {
+ sb.append("AUTO PARTITION BY RANGE ");
+ for (Expr e : partitionExprs) {
+ boolean isSlotRef = (e instanceof SlotRef);
+ if (isSlotRef) {
+ sb.append("(");
+ }
+ sb.append(e.toSql());
+ if (isSlotRef) {
+ sb.append(")");
+ }
}
- sb.append("`").append(column.getName()).append("`");
- idx++;
+ sb.append("\n(");
+ } else {
+ sb.append("PARTITION BY RANGE(");
+ for (Column column : partitionColumns) {
+ if (idx != 0) {
+ sb.append(", ");
+ }
+ sb.append("`").append(column.getName()).append("`");
+ idx++;
+ }
+ sb.append(")\n(");
}
- sb.append(")\n(");
// sort range
List<Map.Entry<Long, PartitionItem>> entries = new
ArrayList<>(this.idToItem.entrySet());
@@ -325,6 +350,6 @@ public class RangePartitionInfo extends PartitionInfo {
allPartitionDescs.add(new SinglePartitionDesc(false,
partitionName, partitionKeyDesc, properties));
}
- return new RangePartitionDesc(partitionColumnNames, allPartitionDescs);
+ return new RangePartitionDesc(this.partitionExprs,
partitionColumnNames, allPartitionDescs);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 57fc65dba5..8ab2013c63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -131,7 +131,7 @@ public class OlapTableSink extends DataSink {
if (partitionIds == null) {
partitionIds = dstTable.getPartitionIds();
- if (partitionIds.isEmpty()) {
+ if (partitionIds.isEmpty() &&
dstTable.getPartitionInfo().enableAutomaticPartition() == false) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE,
dstTable.getName());
}
}
@@ -178,7 +178,7 @@ public class OlapTableSink extends DataSink {
tSink.setNumReplicas(numReplicas);
tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer));
- tSink.setPartition(createPartition(tSink.getDbId(), dstTable));
+ tSink.setPartition(createPartition(tSink.getDbId(), dstTable,
analyzer));
List<TOlapTableLocationParam> locationParams =
createLocation(dstTable);
tSink.setLocation(locationParams.get(0));
if (singleReplicaLoad) {
@@ -293,7 +293,8 @@ public class OlapTableSink extends DataSink {
return distColumns;
}
- private TOlapTablePartitionParam createPartition(long dbId, OlapTable
table) throws UserException {
+ private TOlapTablePartitionParam createPartition(long dbId, OlapTable
table, Analyzer analyzer)
+ throws UserException {
TOlapTablePartitionParam partitionParam = new
TOlapTablePartitionParam();
partitionParam.setDbId(dbId);
partitionParam.setTableId(table.getId());
@@ -337,6 +338,22 @@ public class OlapTableSink extends DataSink {
}
}
}
+ // for auto create partition by function expr, there is no any
partition firstly,
+ // But this is required in thrift struct.
+ if (partitionIds.isEmpty()) {
+
partitionParam.setDistributedColumns(getDistColumns(table.getDefaultDistributionInfo()));
+ partitionParam.setPartitions(new
ArrayList<TOlapTablePartition>());
+ }
+ ArrayList<Expr> exprs = partitionInfo.getPartitionExprs();
+ if (exprs != null && analyzer != null) {
+ tupleDescriptor.setTable(table);
+ analyzer.registerTupleDescriptor(tupleDescriptor);
+ for (Expr e : exprs) {
+ e.analyze(analyzer);
+ }
+
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
+ }
+
partitionParam.setEnableAutomaticPartition(partitionInfo.enableAutomaticPartition());
break;
}
case UNPARTITIONED: {
@@ -362,16 +379,18 @@ public class OlapTableSink extends DataSink {
}
partitionParam.addToPartitions(tPartition);
partitionParam.setDistributedColumns(getDistColumns(partition.getDistributionInfo()));
+ partitionParam.setEnableAutomaticPartition(false);
break;
}
default: {
throw new UserException("unsupported partition for OlapTable,
partition=" + partType);
}
}
+ partitionParam.setPartitionType(partType.toThrift());
return partitionParam;
}
- private void setPartitionKeys(TOlapTablePartition tPartition,
PartitionItem partitionItem, int partColNum) {
+ public static void setPartitionKeys(TOlapTablePartition tPartition,
PartitionItem partitionItem, int partColNum) {
if (partitionItem instanceof RangePartitionItem) {
Range<PartitionKey> range = partitionItem.getItems();
// set start keys
@@ -439,6 +458,10 @@ public class OlapTableSink extends DataSink {
}
}
+ // for partition by function expr, there is no any partition firstly,
But this is required in thrift struct.
+ if (partitionIds.isEmpty()) {
+ locationParam.setTablets(new ArrayList<TTabletLocation>());
+ }
// check if disk capacity reach limit
// this is for load process, so use high water mark to check
Status st =
Env.getCurrentSystemInfo().checkExceedDiskCapacityLimit(allBePathsMap, true);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c19e299960..4bf92a24ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -19,10 +19,12 @@ package org.apache.doris.service;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.AddColumnsClause;
+import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnDef;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.NativeInsertStmt;
+import org.apache.doris.analysis.PartitionExprUtil;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.SqlParser;
@@ -37,7 +39,10 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
+import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
@@ -50,6 +55,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
@@ -72,6 +78,7 @@ import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
@@ -110,6 +117,8 @@ import org.apache.doris.thrift.TCommitTxnRequest;
import org.apache.doris.thrift.TCommitTxnResult;
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesResult;
+import org.apache.doris.thrift.TCreatePartitionRequest;
+import org.apache.doris.thrift.TCreatePartitionResult;
import org.apache.doris.thrift.TDescribeTableParams;
import org.apache.doris.thrift.TDescribeTableResult;
import org.apache.doris.thrift.TDescribeTablesParams;
@@ -155,6 +164,9 @@ import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TNodeInfo;
+import org.apache.doris.thrift.TOlapTableIndexTablets;
+import org.apache.doris.thrift.TOlapTablePartition;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPrivilegeCtrl;
import org.apache.doris.thrift.TPrivilegeHier;
@@ -180,10 +192,12 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
+import org.apache.doris.thrift.TStringLiteral;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableMetadataNameIds;
import org.apache.doris.thrift.TTableQueryStats;
import org.apache.doris.thrift.TTableStatus;
+import org.apache.doris.thrift.TTabletLocation;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
@@ -200,6 +214,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -3035,4 +3050,124 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// Return Ok anyway
return new TStatus(TStatusCode.OK);
}
+
+ @Override
+ public TCreatePartitionResult createPartition(TCreatePartitionRequest
request) throws TException {
+ LOG.info("Receive create partition request: {}", request);
+ long dbId = request.getDbId();
+ long tableId = request.getTableId();
+ TCreatePartitionResult result = new TCreatePartitionResult();
+ TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
+
+ Database db =
Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d
is not exists", dbId)));
+ result.setStatus(errorStatus);
+ return result;
+ }
+
+ Table table = db.getTable(tableId).get();
+ if (table == null) {
+ errorStatus.setErrorMsgs(
+ (Lists.newArrayList(String.format("dbId=%d tableId=%d is
not exists", dbId, tableId))));
+ result.setStatus(errorStatus);
+ return result;
+ }
+
+ if (!(table instanceof OlapTable)) {
+ errorStatus.setErrorMsgs(
+ Lists.newArrayList(String.format("dbId=%d tableId=%d is
not olap table", dbId, tableId)));
+ result.setStatus(errorStatus);
+ return result;
+ }
+
+ if (request.partitionValues == null) {
+ errorStatus.setErrorMsgs(Lists.newArrayList("partitionValues
should not null."));
+ result.setStatus(errorStatus);
+ return result;
+ }
+
+ OlapTable olapTable = (OlapTable) table;
+ PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ ArrayList<TStringLiteral> partitionValues = new
ArrayList<TStringLiteral>();
+ for (int i = 0; i < request.partitionValues.size(); i++) {
+ if (request.partitionValues.get(i).size() != 1) {
+ errorStatus.setErrorMsgs(
+ Lists.newArrayList("Only support single partition,
partitionValues size should equal 1."));
+ result.setStatus(errorStatus);
+ return result;
+ }
+ partitionValues.add(request.partitionValues.get(i).get(0));
+ }
+ Map<String, AddPartitionClause> addPartitionClauseMap;
+ try {
+ addPartitionClauseMap =
PartitionExprUtil.getAddPartitionClauseFromPartitionValues(olapTable,
+ partitionValues, partitionInfo);
+ } catch (AnalysisException ex) {
+ errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+ result.setStatus(errorStatus);
+ return result;
+ }
+
+ for (AddPartitionClause addPartitionClause :
addPartitionClauseMap.values()) {
+ try {
+ // here maybe check and limit created partitions num
+ Env.getCurrentEnv().addPartition(db, olapTable.getName(),
addPartitionClause);
+ } catch (DdlException e) {
+ LOG.warn(e);
+ errorStatus.setErrorMsgs(
+ Lists.newArrayList(String.format("create partition
failed. error:%s", e.getMessage())));
+ result.setStatus(errorStatus);
+ return result;
+ }
+ }
+
+ // build partition & tablets
+ List<TOlapTablePartition> partitions = Lists.newArrayList();
+ List<TTabletLocation> tablets = Lists.newArrayList();
+ for (String partitionName : addPartitionClauseMap.keySet()) {
+ Partition partition = table.getPartition(partitionName);
+ TOlapTablePartition tPartition = new TOlapTablePartition();
+ tPartition.setId(partition.getId());
+ int partColNum = partitionInfo.getPartitionColumns().size();
+ // set partition keys
+ OlapTableSink.setPartitionKeys(tPartition,
partitionInfo.getItem(partition.getId()), partColNum);
+ for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
+ tPartition.addToIndexes(new
TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
+
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
+ tPartition.setNumBuckets(index.getTablets().size());
+ }
+
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
+ partitions.add(tPartition);
+ // tablet
+ int quorum =
olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
/ 2
+ + 1;
+ for (MaterializedIndex index :
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
+ for (Tablet tablet : index.getTablets()) {
+ // we should ensure the replica backend is alive
+ // otherwise, there will be a 'unknown node id, id=xxx'
error for stream load
+ // BE id -> path hash
+ Multimap<Long, Long> bePathsMap =
tablet.getNormalReplicaBackendPathMap();
+ if (bePathsMap.keySet().size() < quorum) {
+ LOG.warn("auto go quorum exception");
+ }
+ tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(bePathsMap.keySet())));
+ }
+ }
+ }
+ result.setPartitions(partitions);
+ result.setTablets(tablets);
+
+ // build nodes
+ List<TNodeInfo> nodeInfos = Lists.newArrayList();
+ SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
+ for (Long id : systemInfoService.getAllBackendIds(false)) {
+ Backend backend = systemInfoService.getBackend(id);
+ nodeInfos.add(new TNodeInfo(backend.getId(), 0, backend.getHost(),
backend.getBrpcPort()));
+ }
+ result.setNodes(nodeInfos);
+ result.setStatus(new TStatus(TStatusCode.OK));
+ LOG.debug("send create partition result: {}", result);
+ return result;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/TruncateTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TruncateTableTest.java
index 0a05d7c618..c13730f32d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TruncateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TruncateTableTest.java
@@ -84,26 +84,27 @@ public class TruncateTableTest {
@Test
public void testTruncateWithCaseInsensitivePartitionName() throws
Exception {
+ //now in order to support auto create partition, need set partition
name is case sensitive
Database db =
Env.getCurrentInternalCatalog().getDbNullable("default_cluster:test");
OlapTable tbl = db.getOlapTableOrDdlException("case_sensitive_table");
- long p20211006Id = tbl.getPartition("P20211006").getId();
+ long p20211006Id = tbl.getPartition("p20211006").getId();
long p20211007Id = tbl.getPartition("P20211007").getId();
- long p20211008Id = tbl.getPartition("p20211008").getId();
+ long p20211008Id = tbl.getPartition("P20211008").getId();
// truncate p20211008(real name is P20211008)
- String truncateStr = "TRUNCATE TABLE test.case_sensitive_table
PARTITION p20211008; \n";
+ String truncateStr = "TRUNCATE TABLE test.case_sensitive_table
PARTITION P20211008; \n";
TruncateTableStmt truncateTableStmt
= (TruncateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(truncateStr, connectContext);
Env.getCurrentEnv().truncateTable(truncateTableStmt);
- Assert.assertNotEquals(p20211008Id,
tbl.getPartition("p20211008").getId());
+ Assert.assertNotEquals(p20211008Id,
tbl.getPartition("P20211008").getId());
// 2. truncate P20211007
truncateStr = "TRUNCATE TABLE test.case_sensitive_table PARTITION
P20211007; \n";
truncateTableStmt = (TruncateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(truncateStr, connectContext);
Env.getCurrentEnv().truncateTable(truncateTableStmt);
Assert.assertEquals(3, tbl.getPartitionInfo().idToDataProperty.size());
- Assert.assertNotEquals(p20211007Id,
tbl.getPartition("p20211007").getId());
+ Assert.assertNotEquals(p20211007Id,
tbl.getPartition("P20211007").getId());
Assert.assertEquals(p20211006Id,
tbl.getPartition("p20211006").getId());
Assert.assertNotNull(tbl.getPartition("p20211006"));
- Assert.assertNotNull(tbl.getPartition("P20211006"));
+ Assert.assertNotNull(tbl.getPartition("p20211006"));
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DynamicPartitionUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DynamicPartitionUtilTest.java
index ac9aded5b1..18ebaf6851 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DynamicPartitionUtilTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DynamicPartitionUtilTest.java
@@ -222,7 +222,7 @@ public class DynamicPartitionUtilTest {
List<Column> partitionColumnList = Lists.newArrayList();
Column partitionColumn = new Column();
partitionColumn.setType(Type.DATE);
- Deencapsulation.setField(rangePartitionInfo, partitionColumnList);
+ Deencapsulation.setField(rangePartitionInfo, "partitionColumns",
partitionColumnList);
try {
Deencapsulation.invoke(dynamicPartitionUtil, "checkTimeUnit",
"HOUR", rangePartitionInfo);
Assert.fail();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
new file mode 100644
index 0000000000..9a1264776a
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service;
+
+
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TCreatePartitionRequest;
+import org.apache.doris.thrift.TCreatePartitionResult;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStringLiteral;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import mockit.Mocked;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class FrontendServiceImplTest {
+ private static String runningDir = "fe/mocked/FrontendServiceImplTest/" +
UUID.randomUUID().toString() + "/";
+ private static ConnectContext connectContext;
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Mocked
+ ExecuteEnv exeEnv;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ FeConstants.runningUnitTest = true;
+ FeConstants.default_scheduler_interval_millisecond = 100;
+ Config.dynamic_partition_enable = true;
+ Config.dynamic_partition_check_interval_seconds = 1;
+ UtFrameUtils.createDorisCluster(runningDir);
+ // create connect context
+ connectContext = UtFrameUtils.createDefaultCtx();
+ // create database
+ String createDbStmtStr = "create database test;";
+ CreateDbStmt createDbStmt = (CreateDbStmt)
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+ Env.getCurrentEnv().createDb(createDbStmt);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ UtFrameUtils.cleanDorisFeDir(runningDir);
+ }
+
+ private static void createTable(String sql) throws Exception {
+ CreateTableStmt createTableStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+ Env.getCurrentEnv().createTable(createTableStmt);
+ }
+
+
+ @Test
+ public void testCreatePartitionRange() throws Exception {
+ String createOlapTblStmt = new String("CREATE TABLE
test.partition_range(\n"
+ + " event_day DATETIME,\n"
+ + " site_id INT DEFAULT '10',\n"
+ + " city_code VARCHAR(100)\n"
+ + ")\n"
+ + "DUPLICATE KEY(event_day, site_id, city_code)\n"
+ + "AUTO PARTITION BY range date_trunc( event_day,'day') (\n"
+ + "\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(event_day, site_id) BUCKETS 2\n"
+ + "PROPERTIES(\"replication_num\" = \"1\");");
+
+ createTable(createOlapTblStmt);
+ Database db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException("default_cluster:test");
+ OlapTable table = (OlapTable)
db.getTableOrAnalysisException("partition_range");
+
+ List<List<TStringLiteral>> partitionValues = new ArrayList<>();
+ List<TStringLiteral> values = new ArrayList<>();
+
+ TStringLiteral start = new TStringLiteral();
+ start.setValue("2023-08-07 00:00:00");
+ values.add(start);
+
+ partitionValues.add(values);
+
+ FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv);
+ TCreatePartitionRequest request = new TCreatePartitionRequest();
+ request.setDbId(db.getId());
+ request.setTableId(table.getId());
+ request.setPartitionValues(partitionValues);
+ TCreatePartitionResult partition = impl.createPartition(request);
+
+ Assert.assertEquals(partition.getStatus().getStatusCode(),
TStatusCode.OK);
+ Partition p20230807 = table.getPartition("p20230807000000");
+ Assert.assertNotNull(p20230807);
+ }
+
+ @Test
+ public void testCreatePartitionList() throws Exception {
+ String createOlapTblStmt = new String("CREATE TABLE
test.partition_list(\n"
+ + " event_day DATETIME,\n"
+ + " site_id INT DEFAULT '10',\n"
+ + " city_code VARCHAR(100) not null\n"
+ + ")\n"
+ + "DUPLICATE KEY(event_day, site_id, city_code)\n"
+ + "AUTO PARTITION BY list (city_code) (\n"
+ + "\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(event_day, site_id) BUCKETS 2\n"
+ + "PROPERTIES(\"replication_num\" = \"1\");");
+
+ createTable(createOlapTblStmt);
+ Database db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException("default_cluster:test");
+ OlapTable table = (OlapTable)
db.getTableOrAnalysisException("partition_list");
+
+ List<List<TStringLiteral>> partitionValues = new ArrayList<>();
+ List<TStringLiteral> values = new ArrayList<>();
+
+ TStringLiteral start = new TStringLiteral();
+ start.setValue("BEIJING");
+ values.add(start);
+
+ partitionValues.add(values);
+
+ FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv);
+ TCreatePartitionRequest request = new TCreatePartitionRequest();
+ request.setDbId(db.getId());
+ request.setTableId(table.getId());
+ request.setPartitionValues(partitionValues);
+ TCreatePartitionResult partition = impl.createPartition(request);
+
+ Assert.assertEquals(partition.getStatus().getStatusCode(),
TStatusCode.OK);
+ Partition pbeijing = table.getPartition("pBEIJING");
+ Assert.assertNotNull(pbeijing);
+ }
+}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 0a816b4096..05891874e5 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -94,6 +94,7 @@ message PTabletWriterOpenRequest {
optional bool is_vectorized = 12 [default = false];
optional int64 backend_id = 13 [default = -1];
optional bool enable_profile = 14 [default = false];
+ optional bool is_incremental = 15 [default = false];
};
message PTabletWriterOpenResult {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 3020570876..1b9f85e2b7 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -20,6 +20,7 @@ namespace java org.apache.doris.thrift
include "Types.thrift"
include "Exprs.thrift"
+include "Partitions.thrift"
struct TColumn {
1: required string column_name
@@ -191,6 +192,9 @@ struct TOlapTablePartitionParam {
6: required list<TOlapTablePartition> partitions
7: optional list<string> partition_columns
+ 8: optional list<Exprs.TExpr> partition_function_exprs
+ 9: optional bool enable_automatic_partition
+ 10: optional Partitions.TPartitionType partition_type
}
struct TOlapTableIndex {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 4d2ef60e43..c1d5f6b0c6 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1133,6 +1133,21 @@ struct TAutoIncrementRangeResult {
3: optional i64 length
}
+struct TCreatePartitionRequest {
+ 1: optional i64 txn_id
+ 2: optional i64 db_id
+ 3: optional i64 table_id
+ // for each partition column's partition values. [missing_rows,
partition_keys]->Left bound(for range) or Point(for list)
+ 4: optional list<list<Exprs.TStringLiteral>> partitionValues
+}
+
+struct TCreatePartitionResult {
+ 1: optional Status.TStatus status
+ 2: optional list<Descriptors.TOlapTablePartition> partitions
+ 3: optional list<Descriptors.TTabletLocation> tablets
+ 4: optional list<Descriptors.TNodeInfo> nodes
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1202,4 +1217,6 @@ service FrontendService {
Status.TStatus updateStatsCache(1: TUpdateFollowerStatsCacheRequest
request)
TAutoIncrementRangeResult getAutoIncrementRange(1:
TAutoIncrementRangeRequest request)
+
+ TCreatePartitionResult createPartition(1: TCreatePartitionRequest request)
}
diff --git a/regression-test/suites/ddl_p0/test_truncate_table.groovy
b/regression-test/suites/ddl_p0/test_truncate_table.groovy
index e983915f22..54487b23f0 100644
--- a/regression-test/suites/ddl_p0/test_truncate_table.groovy
+++ b/regression-test/suites/ddl_p0/test_truncate_table.groovy
@@ -51,7 +51,7 @@ suite("test_truncate_table") {
assertEquals(result.size(), 3)
assertEquals(result.get(0).get(1), "p1")
- sql """truncate table ${testTable} partitions (p1, P1);"""
+ sql """truncate table ${testTable} partitions (p1, p1);"""
result = sql "show partitions from ${testTable}"
logger.info("${result}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]