This is an automated email from the ASF dual-hosted git repository.
hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cfe94e17d04 [Enhancement](udf) Support volatility property for scalar
UDF (#62698)
cfe94e17d04 is described below
commit cfe94e17d045d0c7e484d791c587ce33249011e0
Author: linrrarity <[email protected]>
AuthorDate: Tue May 19 11:23:18 2026 +0800
[Enhancement](udf) Support volatility property for scalar UDF (#62698)
Problem Summary:
Previously, UDFs could be treated as deterministic in optimizer-related
paths, which is unsafe for UDFs whose results are not stable across
evaluations. That may cause invalid rewrite/planning decisions and lead
to incorrect query semantics in some cases.
Introduce `immutable`, `stable`, and `volatile` semantics through
`"volatility" = "immutable|stable|volatile"`, persist the property in
function metadata, and use it to drive deterministic and
volatile-expression behavior in Nereids.
Immutable UDFs are treated as deterministic, stable UDFs avoid volatile
identity handling while remaining non-deterministic, and volatile UDFs
receive per-call volatile identities to protect unsafe rewrites.
```sql
CREATE TABLE cte_uuid_seed (id INT) ENGINE=OLAP DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ("replication_num" = "1");
INSERT INTO cte_uuid_seed VALUES (1),(2),(3);
DROP FUNCTION IF EXISTS py_uuid_token(INT);
CREATE FUNCTION py_uuid_token(INT)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "py_uuid_token_impl",
"always_nullable" = "false",
"runtime_version" = "3.12.11"
)
AS $$
import uuid
def py_uuid_token_impl(x):
return f"{x}-{uuid.uuid4()}"
$$;
```
before:
```sql
SET enable_cte_materialize = true;
SET inline_cte_referenced_threshold = 10;
-- treated as volatile func(UniqueFunction), which caused wrong planning
WITH cte AS (SELECT id, py_uuid_token(id) AS token FROM cte_uuid_seed)
SELECT id, COUNT(DISTINCT token) AS distinct_tokens
FROM (SELECT id, token FROM cte UNION ALL SELECT id, token FROM cte) u
GROUP BY id ORDER BY id;
+------+-----------------+
| id | distinct_tokens |
+------+-----------------+
| 1 | 2 |
| 2 | 2 |
| 3 | 2 |
+------+-----------------+
```
now
```sql
+------+-----------------+
| id | distinct_tokens |
+------+-----------------+
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
+------+-----------------+
```
doc: https://github.com/apache/doris-website/pull/3570
---
.../java/org/apache/doris/catalog/Function.java | 11 ++
.../apache/doris/catalog/FunctionVolatility.java | 43 +++++
.../doris/catalog/FunctionToSqlConverter.java | 67 ++++++-
.../post/PushDownFilterThroughProject.java | 2 +-
.../nereids/rules/analysis/BindExpression.java | 10 +-
.../rules/InPredicateExtractNonConstant.java | 2 +-
.../expression/rules/PushIntoCaseWhenBranch.java | 4 +-
.../rules/rewrite/AddProjectForUniqueFunction.java | 8 +-
.../rules/rewrite/CollectFilterAboveConsumer.java | 2 +-
.../rules/rewrite/JoinExtractOrFromCaseWhen.java | 2 +-
.../rewrite/PushDownFilterThroughAggregation.java | 2 +-
.../rewrite/PushDownFilterThroughGenerate.java | 2 +-
.../rules/rewrite/PushDownFilterThroughJoin.java | 4 +-
.../rewrite/PushDownFilterThroughProject.java | 2 +-
.../rules/rewrite/PushFilterInsideJoin.java | 2 +-
.../rules/rewrite/PushProjectIntoUnion.java | 2 +-
.../doris/nereids/rules/rewrite/ReorderJoin.java | 4 +-
.../rewrite/eageraggregation/EagerAggRewriter.java | 2 +-
.../nereids/trees/expressions/Expression.java | 6 +-
.../trees/expressions/VolatileExpression.java | 29 +++
.../trees/expressions/VolatileIdentity.java | 90 +++++++++
.../expressions/functions/ExpressionTrait.java | 2 +-
.../functions/scalar/UniqueFunction.java | 23 ++-
.../trees/expressions/functions/udf/JavaUdf.java | 65 ++++++-
.../expressions/functions/udf/JavaUdfBuilder.java | 2 +-
.../trees/expressions/functions/udf/PythonUdf.java | 65 ++++++-
.../functions/udf/PythonUdfBuilder.java | 4 +-
.../plans/commands/CreateFunctionCommand.java | 28 +++
.../trees/plans/commands/ShowFunctionsCommand.java | 11 +-
.../trees/plans/logical/LogicalAggregate.java | 4 +-
.../trees/plans/logical/LogicalLoadProject.java | 2 +-
.../trees/plans/logical/LogicalProject.java | 2 +-
.../physical/PhysicalBucketedHashAggregate.java | 4 +-
.../plans/physical/PhysicalHashAggregate.java | 4 +-
.../apache/doris/nereids/util/ExpressionUtils.java | 11 +-
.../org/apache/doris/nereids/util/PlanUtils.java | 2 +-
.../apache/doris/catalog/CreateFunctionTest.java | 23 +++
.../doris/catalog/FunctionToSqlConverterTest.java | 136 ++++++++++++--
.../rewrite/CollectFilterAboveConsumerTest.java | 2 +-
.../functions/scalar/UniqueFunctionTest.java | 2 +-
.../functions/udf/UdfVolatilityTest.java | 91 +++++++++
.../plans/commands/ShowFunctionsCommandTest.java | 50 +++++
.../suites/javaudf_p0/test_javaudf_float.groovy | 3 +-
.../suites/mtmv_p0/test_expand_star_mtmv.groovy | 3 +-
.../pythonudf_p0/test_pythonudf_aggregate.groovy | 6 +-
.../pythonudf_p0/test_pythonudf_float.groovy | 3 +-
.../pythonudf_p0/test_pythonudf_volatility.groovy | 204 +++++++++++++++++++++
47 files changed, 966 insertions(+), 82 deletions(-)
diff --git a/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java
b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java
index 5c7c80f4535..93783a47725 100644
--- a/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java
+++ b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java
@@ -114,6 +114,8 @@ public class Function implements Writable {
protected String runtimeVersion;
@SerializedName("fc")
protected String functionCode;
+ @SerializedName("vol")
+ protected FunctionVolatility volatility = FunctionVolatility.IMMUTABLE;
// Only used for serialization
protected Function() {
@@ -174,6 +176,7 @@ public class Function implements Writable {
this.expirationTime = other.expirationTime;
this.runtimeVersion = other.runtimeVersion;
this.functionCode = other.functionCode;
+ this.volatility = other.getVolatility();
}
public Function clone() {
@@ -301,6 +304,14 @@ public class Function implements Writable {
this.functionCode = functionCode;
}
+ public FunctionVolatility getVolatility() {
+ return volatility == null ? FunctionVolatility.IMMUTABLE : volatility;
+ }
+
+ public void setVolatility(FunctionVolatility volatility) {
+ this.volatility = volatility == null ? FunctionVolatility.IMMUTABLE :
volatility;
+ }
+
// TODO(cmy): Currently we judge whether it is UDF by wheter the
'location' is set.
// Maybe we should use a separate variable to identify,
// but additional variables need to modify the persistence information.
diff --git
a/fe/fe-catalog/src/main/java/org/apache/doris/catalog/FunctionVolatility.java
b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/FunctionVolatility.java
new file mode 100644
index 00000000000..eefbbc4e79b
--- /dev/null
+++
b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/FunctionVolatility.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import java.util.Locale;
+
+/** Function volatility controls which optimizer rewrites are safe for a
function call. */
+public enum FunctionVolatility {
+ IMMUTABLE,
+ STABLE,
+ VOLATILE;
+
+ public static FunctionVolatility fromString(String value) {
+ if (value == null) {
+ return IMMUTABLE;
+ }
+ try {
+ return
FunctionVolatility.valueOf(value.trim().toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid volatility: '" + value
+ + "'. Expected one of: immutable, stable, volatile", e);
+ }
+ }
+
+ public String toSql() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java
index 8709eb5b6de..49db8ed7e24 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java
@@ -23,6 +23,8 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.ToSqlParams;
import org.apache.doris.catalog.Function.NullableMode;
+import com.google.common.base.Strings;
+
import java.util.List;
import java.util.stream.Collectors;
@@ -30,6 +32,7 @@ import java.util.stream.Collectors;
* Converts {@link Function} and its subclasses to their SQL representations.
*/
public class FunctionToSqlConverter {
+ private static final long DEFAULT_EXPIRATION_TIME = 360;
/**
* Converts a {@link Function} (or subclass) to its SQL representation.
@@ -54,13 +57,13 @@ public class FunctionToSqlConverter {
if (fn.isGlobal()) {
sb.append("GLOBAL ");
}
- sb.append("FUNCTION ");
+ sb.append(fn.isUDTFunction() ? "TABLES FUNCTION " : "FUNCTION ");
if (ifNotExists) {
sb.append("IF NOT EXISTS ");
}
sb.append(fn.signatureString())
- .append(" RETURNS " + fn.getReturnType())
+ .append(" RETURNS " + getScalarFunctionReturnTypeSql(fn))
.append(" PROPERTIES (");
sb.append("\n \"SYMBOL\"=").append("\"" + fn.getSymbolName() + "\"");
if (fn.getPrepareFnSymbol() != null) {
@@ -75,15 +78,54 @@ public class FunctionToSqlConverter {
.append("\"" + (fn.getLocation() == null ? "" :
fn.getLocation().toString()) + "\"");
boolean isReturnNull = fn.getNullableMode() ==
NullableMode.ALWAYS_NULLABLE;
sb.append(",\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull
+ "\"");
+ if (!fn.isUDTFunction()) {
+ sb.append(",\n \"VOLATILITY\"=").append("\"" +
fn.getVolatility().toSql() + "\"");
+ }
+ } else if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF) {
+ appendFileIfPresent(sb, fn, true);
+ boolean isReturnNull = fn.getNullableMode() ==
NullableMode.ALWAYS_NULLABLE;
+ sb.append(",\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull
+ "\"");
+ sb.append(",\n \"RUNTIME_VERSION\"=").append("\"" +
Strings.nullToEmpty(fn.getRuntimeVersion()) + "\"");
+ appendExpirationTimeIfNeeded(sb, fn);
+ if (!fn.isUDTFunction()) {
+ sb.append(",\n \"VOLATILITY\"=").append("\"" +
fn.getVolatility().toSql() + "\"");
+ }
} else {
sb.append(",\n \"OBJECT_FILE\"=")
.append("\"" + (fn.getLocation() == null ? "" :
fn.getLocation().toString()) + "\"");
}
sb.append(",\n \"TYPE\"=").append("\"" + fn.getBinaryType() + "\"");
- sb.append("\n);");
+ if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF &&
!Strings.isNullOrEmpty(fn.getFunctionCode())) {
+ // Preserve inline Python UDF bodies so SHOW CREATE FUNCTION
output can be replayed directly.
+ sb.append("\n)\nAS
$$\n").append(fn.getFunctionCode()).append("\n$$;");
+ } else {
+ sb.append("\n);");
+ }
return sb.toString();
}
+ private static String getScalarFunctionReturnTypeSql(ScalarFunction fn) {
+ if (fn.isUDTFunction()) {
+ return new ArrayType(fn.getReturnType()).toSql();
+ }
+ return fn.getReturnType().toSql();
+ }
+
+ private static void appendExpirationTimeIfNeeded(StringBuilder sb,
Function fn) {
+ if (fn.getExpirationTime() != DEFAULT_EXPIRATION_TIME) {
+ sb.append(",\n \"EXPIRATION_TIME\"=").append("\"" +
fn.getExpirationTime() + "\"");
+ }
+ }
+
+ private static void appendFileIfPresent(StringBuilder sb, Function fn,
boolean hasLeadingComma) {
+ if (fn.getLocation() != null) {
+ if (hasLeadingComma) {
+ sb.append(",");
+ }
+ sb.append("\n \"FILE\"=").append("\"" +
fn.getLocation().toString() + "\"");
+ }
+ }
+
/**
* Converts an {@link AggregateFunction} to its SQL representation.
*/
@@ -125,12 +167,29 @@ public class FunctionToSqlConverter {
.append("\"" + (fn.getLocation() == null ? "" :
fn.getLocation().toString()) + "\",");
boolean isReturnNull = fn.getNullableMode() ==
NullableMode.ALWAYS_NULLABLE;
sb.append("\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull +
"\",");
+ } else if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF) {
+ appendFileIfPresent(sb, fn, false);
+ if (fn.getLocation() != null) {
+ sb.append(",");
+ }
+ boolean isReturnNull = fn.getNullableMode() ==
NullableMode.ALWAYS_NULLABLE;
+ sb.append("\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull +
"\",");
+ sb.append("\n \"RUNTIME_VERSION\"=")
+ .append("\"" + Strings.nullToEmpty(fn.getRuntimeVersion())
+ "\",");
+ if (fn.getExpirationTime() != DEFAULT_EXPIRATION_TIME) {
+ sb.append("\n \"EXPIRATION_TIME\"=").append("\"" +
fn.getExpirationTime() + "\",");
+ }
} else {
sb.append("\n \"OBJECT_FILE\"=")
.append("\"" + (fn.getLocation() == null ? "" :
fn.getLocation().toString()) + "\",");
}
sb.append("\n \"TYPE\"=").append("\"" + fn.getBinaryType() + "\"");
- sb.append("\n);");
+ if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF &&
!Strings.isNullOrEmpty(fn.getFunctionCode())) {
+ // Preserve inline Python UDAF bodies so SHOW CREATE FUNCTION
output can be replayed directly.
+ sb.append("\n)\nAS
$$\n").append(fn.getFunctionCode()).append("\n$$;");
+ } else {
+ sb.append("\n);");
+ }
return sb.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
index 55cf9d64314..9b4e262d6a1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
@@ -45,7 +45,7 @@ public class PushDownFilterThroughProject extends
PlanPostProcessor {
PhysicalProject<? extends Plan> project = (PhysicalProject<? extends
Plan>) child;
Map<Slot, Expression> childAlias = project.getAliasToProducer();
if
(filter.getInputSlots().stream().map(childAlias::get).filter(Objects::nonNull)
- .anyMatch(Expression::containsUniqueFunction)) {
+ .anyMatch(Expression::containsVolatileExpression)) {
return filter;
}
PhysicalFilter<? extends Plan> newFilter =
filter.withConjunctsAndChild(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java
index d5cd32d6b85..a38fdb3edbd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java
@@ -1419,7 +1419,7 @@ public class BindExpression implements
AnalysisRuleFactory {
// 2. for 'group by a + random(), a + random() + 1', the two
'random()' will be different.
int containsUniqueGroupByCount = 0;
for (Expression groupByExpr : groupByExpressions) {
- if (groupByExpr.containsUniqueFunction()) {
+ if (groupByExpr.containsVolatileExpression()) {
containsUniqueGroupByCount++;
}
}
@@ -1433,7 +1433,7 @@ public class BindExpression implements
AnalysisRuleFactory {
groupByExpressions.size());
for (Expression groupByExpr : groupByExpressions) {
Expression newGroupByExpr = groupByExpr;
- if (groupByExpr.containsUniqueFunction()) {
+ if (groupByExpr.containsVolatileExpression()) {
Expression ignoreUniqueIdExpr =
ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(groupByExpr, true);
Expression previousGroupByExpr =
ignoreUniqueIdGroupByExprs.get(ignoreUniqueIdExpr);
if (previousGroupByExpr == null) {
@@ -1476,7 +1476,7 @@ public class BindExpression implements
AnalysisRuleFactory {
// c) let E3 = rewrite E2 with enable unique ids. then E3 is the bind
unique id expression for E.
private <T extends Expression> T bindExprUniqueIdWithGroupBy(T expression,
Map<Expression, Expression> bindUniqueIdReplaceMap) {
- if (!expression.containsUniqueFunction() ||
bindUniqueIdReplaceMap.isEmpty()) {
+ if (!expression.containsVolatileExpression() ||
bindUniqueIdReplaceMap.isEmpty()) {
return expression;
}
@@ -1522,7 +1522,7 @@ public class BindExpression implements
AnalysisRuleFactory {
private Map<Expression, Expression>
getGroupByUniqueFuncReplaceMap(List<Expression> groupByByExpressions) {
Map<Expression, Expression> replaceMap = Maps.newHashMap();
for (Expression expression : groupByByExpressions) {
- if (expression.containsUniqueFunction()) {
+ if (expression.containsVolatileExpression()) {
Expression ignoreUniqueIdExpr =
ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(expression, true);
// for sql:
// select distinct a + random(), a + random()
@@ -1554,7 +1554,7 @@ public class BindExpression implements
AnalysisRuleFactory {
=
ImmutableList.builderWithExpectedSize(boundGroupingSet.size());
for (Expression groupBy : boundGroupingSet) {
Expression newGroupBy = groupBy;
- if (groupBy.containsUniqueFunction()) {
+ if (groupBy.containsVolatileExpression()) {
Expression ignoreUniqueIdGroupBy =
ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(groupBy, true);
Expression previousGroupBy =
ignoreUniqueIdGroupByExpressions.get(ignoreUniqueIdGroupBy);
if (previousGroupBy == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java
index 6b35d31303b..48ca394733d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java
@@ -50,7 +50,7 @@ public class InPredicateExtractNonConstant implements
ExpressionPatternRuleFacto
matchesType(InPredicate.class)
.when(inPredicate ->
inPredicate.getOptions().size() <=
InPredicateDedup.REWRITE_OPTIONS_MAX_SIZE
- &&
!inPredicate.getCompareExpr().containsUniqueFunction())
+ &&
!inPredicate.getCompareExpr().containsVolatileExpression())
.then(this::rewrite)
.toRule(ExpressionRuleType.IN_PREDICATE_EXTRACT_NON_CONSTANT)
);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java
index 9421aec15ce..4f400eb54a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java
@@ -166,7 +166,7 @@ public class PushIntoCaseWhenBranch implements
ExpressionPatternRuleFactory {
// so there will exist twice 'first' in the rewritten IF expression,
which may increase the computation cost.
// if the plan is not filter and not join, then push down action may
not have positive effect,
// considering this, we give up the rewrite if the plan is not
condition plan or first contains unique function.
- if (first.containsUniqueFunction() || !isConditionPlan) {
+ if (first.containsVolatileExpression() || !isConditionPlan) {
return Optional.empty();
}
If ifExpr = new If(new IsNull(first), second, first);
@@ -182,7 +182,7 @@ public class PushIntoCaseWhenBranch implements
ExpressionPatternRuleFactory {
// so there will exist twice 'first' in the rewritten IF expression,
which may increase the computation cost.
// if the plan is not filter and not join, then push down action may
not have positive effect,
// considering this, we give up the rewrite if the plan is not
condition plan or first contains unique function.
- if (first.containsUniqueFunction() || !isConditionPlan) {
+ if (first.containsVolatileExpression() || !isConditionPlan) {
return Optional.empty();
}
If ifExpr = new If(new EqualTo(first, second), new
NullLiteral(nullIf.getDataType()), first);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java
index 2c2537ac476..97fc2a671c4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java
@@ -274,7 +274,7 @@ public class AddProjectForUniqueFunction implements
RewriteRuleFactory {
*/
@VisibleForTesting
public List<NamedExpression> tryGenUniqueFunctionAlias(Collection<?
extends Expression> targets) {
- Map<UniqueFunction, Integer> unqiueFunctionCounter =
Maps.newLinkedHashMap();
+ Map<Expression, Integer> unqiueFunctionCounter =
Maps.newLinkedHashMap();
for (Expression target : targets) {
target.foreach(e -> {
Expression expr = (Expression) e;
@@ -286,10 +286,12 @@ public class AddProjectForUniqueFunction implements
RewriteRuleFactory {
ImmutableList.Builder<NamedExpression> builder
=
ImmutableList.builderWithExpectedSize(unqiueFunctionCounter.size());
- for (Entry<UniqueFunction, Integer> entry :
unqiueFunctionCounter.entrySet()) {
+ for (Entry<Expression, Integer> entry :
unqiueFunctionCounter.entrySet()) {
if (entry.getValue() > 1) {
ExprId exprId = StatementScopeIdGenerator.newExprId();
- String name = "$_" + entry.getKey().getName() + "_" +
exprId.asInt() + "_$";
+ String functionName = entry.getKey() instanceof Function
+ ? ((Function) entry.getKey()).getName() : "volatile";
+ String name = "$_" + functionName + "_" + exprId.asInt() +
"_$";
builder.add(new Alias(exprId, entry.getKey(), name));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java
index 62f29f40441..25fa5d2ccc8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java
@@ -38,7 +38,7 @@ public class CollectFilterAboveConsumer extends
OneRewriteRuleFactory {
LogicalCTEConsumer cteConsumer = filter.child();
Set<Expression> exprs = filter.getConjuncts();
for (Expression expr : exprs) {
- if (expr.containsUniqueFunction()) {
+ if (expr.containsVolatileExpression()) {
continue;
}
Expression rewrittenExpr = expr.rewriteUp(e -> {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java
index 87fdf7a48fe..81317f20903 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java
@@ -108,7 +108,7 @@ public class JoinExtractOrFromCaseWhen implements
RewriteRuleFactory {
// 1. expr contains slots from both sides;
private boolean isConditionNeedRewrite(Expression expr, Set<Slot>
leftSlots, Set<Slot> rightSlots) {
- if (expr.containsUniqueFunction()) {
+ if (expr.containsVolatileExpression()) {
return false;
}
Set<Slot> exprSlots = expr.getInputSlots();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java
index 2f993fade57..0945162f6d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java
@@ -69,7 +69,7 @@ public class PushDownFilterThroughAggregation extends
OneRewriteRuleFactory {
// 2. if the conjunct contains unique function, it should not
be pushed down;
// e.g. 'select a, sum(a) from t group by a having a +
random() > 10'
// not equals 'select a, sum(a) from t where a + random() >
10 group by a'
- if (!conjunct.containsUniqueFunction()
+ if (!conjunct.containsVolatileExpression()
&& !conjunctSlots.isEmpty() &&
canPushDownSlots.containsAll(conjunctSlots)) {
pushDownPredicates.add(conjunct);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java
index 851aba0e21a..85de47ab127 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java
@@ -50,7 +50,7 @@ public class PushDownFilterThroughGenerate extends
OneRewriteRuleFactory {
filter.getConjuncts().forEach(conjunct -> {
Set<Slot> conjunctSlots = conjunct.getInputSlots();
if (!conjunctSlots.isEmpty() &&
childOutputs.containsAll(conjunctSlots)
- && !conjunct.containsUniqueFunction()) {
+ && !conjunct.containsVolatileExpression()) {
pushDownPredicates.add(conjunct);
} else {
remainPredicates.add(conjunct);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
index b16abf35250..ddcff70759f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
@@ -120,7 +120,7 @@ public class PushDownFilterThroughJoin extends
OneRewriteRuleFactory {
Set<Expression> rightPredicates = Sets.newLinkedHashSet();
Set<Expression> remainingPredicates = Sets.newLinkedHashSet();
for (Expression p : filterPredicates) {
- if (p.containsUniqueFunction()) {
+ if (p.containsVolatileExpression()) {
remainingPredicates.add(p);
continue;
}
@@ -162,7 +162,7 @@ public class PushDownFilterThroughJoin extends
OneRewriteRuleFactory {
if (!(predicate instanceof EqualTo)) {
return false;
}
- if (predicate.containsUniqueFunction()) {
+ if (predicate.containsVolatileExpression()) {
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
index d9581d919a7..1a46b51a246 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
@@ -125,7 +125,7 @@ public class PushDownFilterThroughProject implements
RewriteRuleFactory {
// `project(b + random(1, 10) as a) -> filter(b + random(1, 10) >
1)`, it contains two distinct RANDOM.
if (childOutputs.containsAll(conjunctSlots)
&&
conjunctSlots.stream().map(childAlias::get).filter(Objects::nonNull)
- .noneMatch(Expression::containsUniqueFunction)) {
+
.noneMatch(Expression::containsVolatileExpression)) {
pushDownPredicates.add(conjunct);
} else {
remainPredicates.add(conjunct);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
index d3ba907c5bd..7c529ee6669 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
@@ -59,7 +59,7 @@ public class PushFilterInsideJoin extends
OneRewriteRuleFactory {
List<Expression> otherConditions =
Lists.newArrayListWithExpectedSize(
filter.getConjuncts().size() +
join.getOtherJoinConjuncts().size());
for (Expression expr : filter.getConjuncts()) {
- if (expr.containsUniqueFunction()) {
+ if (expr.containsVolatileExpression()) {
remainConditions.add(expr);
} else {
otherConditions.add(expr);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java
index afef7be2549..53deff85eb6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java
@@ -104,7 +104,7 @@ public class PushProjectIntoUnion extends
OneRewriteRuleFactory {
Set<Slot> uniqueFunctionSlots = Sets.newHashSet();
for (int i = 0; i < constExprs.size(); i++) {
NamedExpression ne = constExprs.get(i);
- if (ne.containsUniqueFunction()) {
+ if (ne.containsVolatileExpression()) {
uniqueFunctionSlots.add(union.getOutput().get(i));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
index 6b3b540477c..3f5c520b27f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
@@ -100,7 +100,7 @@ public class ReorderJoin extends OneRewriteRuleFactory {
for (Expression conjunct : filter.getConjuncts()) {
// after reorder and push down the random() down to lower
join,
// the rewritten sql may have less rows() than the origin
sql
- if (conjunct.containsUniqueFunction()) {
+ if (conjunct.containsVolatileExpression()) {
uniqueExprConjuncts.add(conjunct);
} else {
nonUniqueExprConjuncts.add(conjunct);
@@ -153,7 +153,7 @@ public class ReorderJoin extends OneRewriteRuleFactory {
// (t1 join t2) join t3 where t1.a = t3.x + random()
// if reorder, then may have ((t1 join t3) on t1.a = t3.x +
random()) join t2,
// then the reorder result will less rows than origin.
- if (conjunct.containsUniqueFunction()) {
+ if (conjunct.containsVolatileExpression()) {
return plan;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java
index 217854d6033..867ba560bc3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java
@@ -506,7 +506,7 @@ public class EagerAggRewriter extends
DefaultPlanRewriter<PushDownAggContext> {
if (filter.child() instanceof LogicalRelation) {
return genAggregate(filter, context);
}
- if
(filter.getConjuncts().stream().anyMatch(Expression::containsUniqueFunction)) {
+ if
(filter.getConjuncts().stream().anyMatch(Expression::containsVolatileExpression))
{
return genAggregate(filter, context);
}
List<SlotReference> filterInputSlots = filter.getInputSlots().stream()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
index 70619c34007..ce3edfbeb32 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
@@ -30,7 +30,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import
org.apache.doris.nereids.trees.expressions.functions.generator.TableGeneratingFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Lambda;
-import
org.apache.doris.nereids.trees.expressions.functions.scalar.UniqueFunction;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.shape.LeafExpression;
@@ -388,8 +387,9 @@ public abstract class Expression extends
AbstractTreeNode<Expression> implements
&& ((SlotReference) this).getOriginalColumn().get().isKey();
}
- public boolean containsUniqueFunction() {
- return containsType(UniqueFunction.class);
+ public boolean containsVolatileExpression() {
+ return anyMatch(expr -> expr instanceof VolatileExpression
+ && ((VolatileExpression) expr).isVolatile());
}
/** containsNullLiteralChildren */
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileExpression.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileExpression.java
new file mode 100644
index 00000000000..d43d326b6a3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileExpression.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions;
+
+/** Expression that may carry per-call volatile identity for optimizer rewrite
safety. */
+public interface VolatileExpression {
+ VolatileIdentity getVolatileIdentity();
+
+ Expression withIgnoreUniqueId(boolean ignoreUniqueId);
+
+ default boolean isVolatile() {
+ return getVolatileIdentity().isVolatile();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileIdentity.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileIdentity.java
new file mode 100644
index 00000000000..26bd32caa8b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileIdentity.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Optional;
+
+/** Value object for volatile per-call identity and temporary group-by binding
comparison. */
+public class VolatileIdentity {
+ public static final VolatileIdentity NON_VOLATILE = new
VolatileIdentity(Optional.empty(), false);
+
+ private final Optional<ExprId> uniqueId;
+ private final boolean ignoreUniqueId;
+
+ public VolatileIdentity(ExprId uniqueId, boolean ignoreUniqueId) {
+ this(Optional.of(uniqueId), ignoreUniqueId);
+ }
+
+ private VolatileIdentity(Optional<ExprId> uniqueId, boolean
ignoreUniqueId) {
+ Preconditions.checkArgument(!ignoreUniqueId || uniqueId.isPresent(),
+ "ignoreUniqueId is meaningful only for volatile expressions");
+ this.uniqueId = uniqueId;
+ this.ignoreUniqueId = ignoreUniqueId;
+ }
+
+ public static VolatileIdentity newVolatileIdentity() {
+ return new VolatileIdentity(StatementScopeIdGenerator.newExprId(),
false);
+ }
+
+ public static VolatileIdentity of(ExprId uniqueId, boolean ignoreUniqueId)
{
+ return new VolatileIdentity(uniqueId, ignoreUniqueId);
+ }
+
+ public boolean isVolatile() {
+ return uniqueId.isPresent();
+ }
+
+ public ExprId getUniqueId() {
+ return uniqueId.get();
+ }
+
+ public Optional<ExprId> getUniqueIdOptional() {
+ return uniqueId;
+ }
+
+ public boolean ignoreUniqueId() {
+ return ignoreUniqueId;
+ }
+
+ public VolatileIdentity withIgnoreUniqueId(boolean ignoreUniqueId) {
+ Preconditions.checkState(isVolatile(), "Only volatile expressions can
ignore unique id");
+ return new VolatileIdentity(uniqueId, ignoreUniqueId);
+ }
+
+ /** Compare volatile expressions by identity unless either side
temporarily ignores it. */
+ public boolean equalsByIdentity(VolatileIdentity other, boolean
fallbackEquals) {
+ if ((!isVolatile() && !other.isVolatile())
+ || (ignoreUniqueId && other.ignoreUniqueId())) {
+ return fallbackEquals;
+ }
+ if ((!isVolatile() || !other.isVolatile())
+ || (ignoreUniqueId || other.ignoreUniqueId())) {
+ return false;
+ }
+ return uniqueId.equals(other.getUniqueIdOptional());
+ }
+
+ public int hashCodeByIdentity(int fallbackHash) {
+ if (!isVolatile() || ignoreUniqueId) {
+ return fallbackHash;
+ }
+ return getUniqueId().asInt();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java
index 8c788faa9d9..a8d9c8dbb69 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java
@@ -107,7 +107,7 @@ public interface ExpressionTrait extends
TreeNode<Expression> {
/**
* foldable() mainly use in fold expression. Udf and UniqueFunction are
not foldable.
* But if want to check an expression contains non-idempotent, such as
`rand()`, `uuid()`, etc.,
- * you should use Expression::containsUniqueFunction instead.
+ * you should use Expression::containsVolatileExpression instead.
*/
default boolean foldable() {
return true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java
index c29d83e043c..83875fd2dce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java
@@ -19,6 +19,8 @@ package
org.apache.doris.nereids.trees.expressions.functions.scalar;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.VolatileExpression;
+import org.apache.doris.nereids.trees.expressions.VolatileIdentity;
import java.util.List;
@@ -95,8 +97,9 @@ import java.util.List;
* the same.
*
*/
-public abstract class UniqueFunction extends ScalarFunction {
+public abstract class UniqueFunction extends ScalarFunction implements
VolatileExpression {
+ protected final VolatileIdentity volatileIdentity;
protected final ExprId uniqueId;
// when compare and bind unique id with group by expressions, should
ignore the unique id
@@ -105,22 +108,30 @@ public abstract class UniqueFunction extends
ScalarFunction {
/** constructor for withChildren and reuse signature */
public UniqueFunction(UniqueFunctionParams functionParams) {
super(functionParams);
+ this.volatileIdentity = VolatileIdentity.of(functionParams.uniqueId,
functionParams.ignoreUniqueId);
this.uniqueId = functionParams.uniqueId;
this.ignoreUniqueId = functionParams.ignoreUniqueId;
}
public UniqueFunction(String name, ExprId uniqueId, boolean
ignoreUniqueId, Expression... arguments) {
super(name, arguments);
+ this.volatileIdentity = VolatileIdentity.of(uniqueId, ignoreUniqueId);
this.uniqueId = uniqueId;
this.ignoreUniqueId = ignoreUniqueId;
}
public UniqueFunction(String name, ExprId uniqueId, boolean
ignoreUniqueId, List<Expression> arguments) {
super(name, arguments);
+ this.volatileIdentity = VolatileIdentity.of(uniqueId, ignoreUniqueId);
this.uniqueId = uniqueId;
this.ignoreUniqueId = ignoreUniqueId;
}
+ @Override
+ public VolatileIdentity getVolatileIdentity() {
+ return volatileIdentity;
+ }
+
public abstract UniqueFunction withIgnoreUniqueId(boolean ignoreUniqueId);
@Override
@@ -149,19 +160,13 @@ public abstract class UniqueFunction extends
ScalarFunction {
UniqueFunction other = (UniqueFunction) o;
// in BindExpression phase, when compare two expression equals except
the unique id,
// will set ignoreUniqueId = true temporarily, after bind expression,
will recover ignoreUniqueId = false
- if (ignoreUniqueId || other.ignoreUniqueId) {
- return super.equals(other);
- }
- return uniqueId.equals(other.uniqueId);
+ return volatileIdentity.equalsByIdentity(other.volatileIdentity,
super.equals(other));
}
// The contains method needs to use hashCode, so similar to equals, it
only compares exprId
@Override
public int computeHashCode() {
// direct return exprId to speed up
- if (ignoreUniqueId) {
- return super.computeHashCode();
- }
- return uniqueId.asInt();
+ return volatileIdentity.hashCodeByIdentity(super.computeHashCode());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
index 07cd4556324..3e53200fa9c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
@@ -22,11 +22,14 @@ import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.Function.NullableMode;
import org.apache.doris.catalog.FunctionName;
import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.catalog.FunctionVolatility;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.util.URI;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.VolatileExpression;
+import org.apache.doris.nereids.trees.expressions.VolatileIdentity;
import
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
import org.apache.doris.nereids.trees.expressions.functions.Udf;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction;
@@ -43,12 +46,14 @@ import java.util.stream.Collectors;
/**
* Java UDF for Nereids
*/
-public class JavaUdf extends ScalarFunction implements
ExplicitlyCastableSignature, Udf {
+public class JavaUdf extends ScalarFunction implements
ExplicitlyCastableSignature, Udf, VolatileExpression {
private final String dbName;
private final long functionId;
private final Function.BinaryType binaryType;
private final FunctionSignature signature;
private final NullableMode nullableMode;
+ private final FunctionVolatility volatility;
+ private final VolatileIdentity volatileIdentity;
private final String objectFile;
private final String symbol;
private final String prepareFn;
@@ -62,7 +67,8 @@ public class JavaUdf extends ScalarFunction implements
ExplicitlyCastableSignatu
*/
public JavaUdf(String name, long functionId, String dbName,
Function.BinaryType binaryType,
FunctionSignature signature,
- NullableMode nullableMode, String objectFile, String symbol,
String prepareFn, String closeFn,
+ NullableMode nullableMode, FunctionVolatility volatility,
VolatileIdentity volatileIdentity,
+ String objectFile, String symbol, String prepareFn, String closeFn,
String checkSum, boolean isStaticLoad, long expirationTime,
Expression... args) {
super(name, args);
this.dbName = dbName;
@@ -70,6 +76,8 @@ public class JavaUdf extends ScalarFunction implements
ExplicitlyCastableSignatu
this.binaryType = binaryType;
this.signature = signature;
this.nullableMode = nullableMode;
+ this.volatility = volatility;
+ this.volatileIdentity = volatileIdentity;
this.objectFile = objectFile;
this.symbol = symbol;
this.prepareFn = prepareFn;
@@ -106,10 +114,55 @@ public class JavaUdf extends ScalarFunction implements
ExplicitlyCastableSignatu
public JavaUdf withChildren(List<Expression> children) {
Preconditions.checkArgument(children.size() == this.children.size());
return new JavaUdf(getName(), functionId, dbName, binaryType,
signature, nullableMode,
+ volatility, volatileIdentity,
objectFile, symbol, prepareFn, closeFn, checkSum,
isStaticLoad, expirationTime,
children.toArray(new Expression[0]));
}
+ @Override
+ public VolatileIdentity getVolatileIdentity() {
+ return volatileIdentity;
+ }
+
+ @Override
+ public JavaUdf withIgnoreUniqueId(boolean ignoreUniqueId) {
+ Preconditions.checkState(isVolatile(), "Only volatile Java UDF can
ignore unique id");
+ return new JavaUdf(getName(), functionId, dbName, binaryType,
signature, nullableMode,
+ volatility,
volatileIdentity.withIgnoreUniqueId(ignoreUniqueId),
+ objectFile, symbol, prepareFn, closeFn, checkSum,
isStaticLoad, expirationTime,
+ children.toArray(new Expression[0]));
+ }
+
+ /** Return a copy with a new per-call identity when this UDF is VOLATILE.
*/
+ public JavaUdf withFreshVolatileIdentity() {
+ if (volatility != FunctionVolatility.VOLATILE) {
+ return this;
+ }
+ return new JavaUdf(getName(), functionId, dbName, binaryType,
signature, nullableMode,
+ volatility, VolatileIdentity.newVolatileIdentity(),
+ objectFile, symbol, prepareFn, closeFn, checkSum,
isStaticLoad, expirationTime,
+ children.toArray(new Expression[0]));
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return volatility == FunctionVolatility.IMMUTABLE;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof JavaUdf)) {
+ return false;
+ }
+ JavaUdf other = (JavaUdf) o;
+ return volatileIdentity.equalsByIdentity(other.volatileIdentity,
super.equals(o));
+ }
+
+ @Override
+ public int computeHashCode() {
+ return volatileIdentity.hashCodeByIdentity(super.computeHashCode());
+ }
+
/**
* translate catalog java udf to nereids java udf
*/
@@ -130,7 +183,7 @@ public class JavaUdf extends ScalarFunction implements
ExplicitlyCastableSignatu
.toArray(SlotReference[]::new);
JavaUdf udf = new JavaUdf(fnName, scalar.getId(), dbName,
scalar.getBinaryType(), sig,
- scalar.getNullableMode(),
+ scalar.getNullableMode(), scalar.getVolatility(),
createVolatileIdentity(scalar.getVolatility()),
scalar.getLocation() == null ? null :
scalar.getLocation().getLocation(),
scalar.getSymbolName(),
scalar.getPrepareFnSymbol(),
@@ -166,9 +219,15 @@ public class JavaUdf extends ScalarFunction implements
ExplicitlyCastableSignatu
expr.setId(functionId);
expr.setStaticLoad(isStaticLoad);
expr.setExpirationTime(expirationTime);
+ expr.setVolatility(volatility);
return expr;
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
+
+ private static VolatileIdentity createVolatileIdentity(FunctionVolatility
volatility) {
+ return volatility == FunctionVolatility.VOLATILE
+ ? VolatileIdentity.newVolatileIdentity() :
VolatileIdentity.NON_VOLATILE;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java
index 6ddfdab8a15..6ab90cb42cd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java
@@ -88,7 +88,7 @@ public class JavaUdfBuilder extends UdfBuilder {
for (int i = 0; i < exprs.size(); ++i) {
processedExprs.add(TypeCoercionUtils.castIfNotSameType(exprs.get(i),
argTypes.get(i)));
}
- return Pair.ofSame(udf.withChildren(processedExprs));
+ return
Pair.ofSame(udf.withFreshVolatileIdentity().withChildren(processedExprs));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java
index 98a9e161308..65c8f2f14d7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java
@@ -22,11 +22,14 @@ import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.Function.NullableMode;
import org.apache.doris.catalog.FunctionName;
import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.catalog.FunctionVolatility;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.util.URI;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.VolatileExpression;
+import org.apache.doris.nereids.trees.expressions.VolatileIdentity;
import
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
import org.apache.doris.nereids.trees.expressions.functions.Udf;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction;
@@ -43,12 +46,14 @@ import java.util.stream.Collectors;
/**
* Python UDF for Nereids
*/
-public class PythonUdf extends ScalarFunction implements
ExplicitlyCastableSignature, Udf {
+public class PythonUdf extends ScalarFunction implements
ExplicitlyCastableSignature, Udf, VolatileExpression {
private final String dbName;
private final long functionId;
private final Function.BinaryType binaryType;
private final FunctionSignature signature;
private final NullableMode nullableMode;
+ private final FunctionVolatility volatility;
+ private final VolatileIdentity volatileIdentity;
private final String objectFile;
private final String symbol;
private final String prepareFn;
@@ -64,7 +69,8 @@ public class PythonUdf extends ScalarFunction implements
ExplicitlyCastableSigna
*/
public PythonUdf(String name, long functionId, String dbName,
Function.BinaryType binaryType,
FunctionSignature signature,
- NullableMode nullableMode, String objectFile, String
symbol, String prepareFn, String closeFn,
+ NullableMode nullableMode, FunctionVolatility volatility,
VolatileIdentity volatileIdentity,
+ String objectFile, String symbol, String prepareFn,
String closeFn,
String checkSum, boolean isStaticLoad, long
expirationTime,
String runtimeVersion, String functionCode, Expression...
args) {
super(name, args);
@@ -73,6 +79,8 @@ public class PythonUdf extends ScalarFunction implements
ExplicitlyCastableSigna
this.binaryType = binaryType;
this.signature = signature;
this.nullableMode = nullableMode;
+ this.volatility = volatility;
+ this.volatileIdentity = volatileIdentity;
this.objectFile = objectFile;
this.symbol = symbol;
this.prepareFn = prepareFn;
@@ -111,10 +119,55 @@ public class PythonUdf extends ScalarFunction implements
ExplicitlyCastableSigna
public PythonUdf withChildren(List<Expression> children) {
Preconditions.checkArgument(children.size() == this.children.size());
return new PythonUdf(getName(), functionId, dbName, binaryType,
signature, nullableMode,
+ volatility, volatileIdentity,
objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad,
expirationTime,
runtimeVersion, functionCode, children.toArray(new Expression[0]));
}
+ @Override
+ public VolatileIdentity getVolatileIdentity() {
+ return volatileIdentity;
+ }
+
+ @Override
+ public PythonUdf withIgnoreUniqueId(boolean ignoreUniqueId) {
+ Preconditions.checkState(isVolatile(), "Only volatile Python UDF can
ignore unique id");
+ return new PythonUdf(getName(), functionId, dbName, binaryType,
signature, nullableMode,
+ volatility, volatileIdentity.withIgnoreUniqueId(ignoreUniqueId),
+ objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad,
expirationTime,
+ runtimeVersion, functionCode, children.toArray(new Expression[0]));
+ }
+
+ /** Return a copy with a new per-call identity when this UDF is VOLATILE.
*/
+ public PythonUdf withFreshVolatileIdentity() {
+ if (volatility != FunctionVolatility.VOLATILE) {
+ return this;
+ }
+ return new PythonUdf(getName(), functionId, dbName, binaryType,
signature, nullableMode,
+ volatility, VolatileIdentity.newVolatileIdentity(),
+ objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad,
expirationTime,
+ runtimeVersion, functionCode, children.toArray(new Expression[0]));
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return volatility == FunctionVolatility.IMMUTABLE;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PythonUdf)) {
+ return false;
+ }
+ PythonUdf other = (PythonUdf) o;
+ return volatileIdentity.equalsByIdentity(other.volatileIdentity,
super.equals(o));
+ }
+
+ @Override
+ public int computeHashCode() {
+ return volatileIdentity.hashCodeByIdentity(super.computeHashCode());
+ }
+
/**
* translate catalog java udf to nereids java udf
*/
@@ -135,7 +188,7 @@ public class PythonUdf extends ScalarFunction implements
ExplicitlyCastableSigna
.toArray(SlotReference[]::new);
PythonUdf udf = new PythonUdf(fnName, scalar.getId(), dbName,
scalar.getBinaryType(), sig,
- scalar.getNullableMode(),
+ scalar.getNullableMode(), scalar.getVolatility(),
createVolatileIdentity(scalar.getVolatility()),
scalar.getLocation() == null ? null :
scalar.getLocation().getLocation(),
scalar.getSymbolName(),
scalar.getPrepareFnSymbol(),
@@ -175,9 +228,15 @@ public class PythonUdf extends ScalarFunction implements
ExplicitlyCastableSigna
expr.setExpirationTime(expirationTime);
expr.setRuntimeVersion(runtimeVersion);
expr.setFunctionCode(functionCode);
+ expr.setVolatility(volatility);
return expr;
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
+
+ private static VolatileIdentity createVolatileIdentity(FunctionVolatility
volatility) {
+ return volatility == FunctionVolatility.VOLATILE
+ ? VolatileIdentity.newVolatileIdentity() :
VolatileIdentity.NON_VOLATILE;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java
index 7185594099b..85ff1035c3d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java
@@ -60,7 +60,7 @@ public class PythonUdfBuilder extends UdfBuilder {
@Override
public Class<? extends BoundFunction> functionClass() {
- return JavaUdf.class;
+ return PythonUdf.class;
}
@Override
@@ -88,7 +88,7 @@ public class PythonUdfBuilder extends UdfBuilder {
for (int i = 0; i < exprs.size(); ++i) {
processedExprs.add(TypeCoercionUtils.castIfNotSameType(exprs.get(i),
argTypes.get(i)));
}
- return Pair.ofSame(udf.withChildren(processedExprs));
+ return
Pair.ofSame(udf.withFreshVolatileIdentity().withChildren(processedExprs));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java
index d2d14b02012..bc5edcbb59b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.Function.NullableMode;
import org.apache.doris.catalog.FunctionName;
import org.apache.doris.catalog.FunctionUtil;
+import org.apache.doris.catalog.FunctionVolatility;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarFunction;
@@ -152,6 +153,7 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
public static final String IS_STATIC_LOAD = "static_load";
public static final String EXPIRATION_TIME = "expiration_time";
public static final String RUNTIME_VERSION = "runtime_version";
+ public static final String VOLATILITY = "volatility";
private static final Pattern PYTHON_VERSION_PATTERN =
Pattern.compile("^3\\.\\d{1,2}(?:\\.\\d{1,2})?$");
private static final Logger LOG =
LogManager.getLogger(CreateFunctionCommand.class);
@@ -182,6 +184,7 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
// if not, will core dump when input is not null column, but need return
null
// like https://github.com/apache/doris/pull/14002/files
private NullableMode returnNullMode = NullableMode.ALWAYS_NULLABLE;
+ private FunctionVolatility volatility = FunctionVolatility.IMMUTABLE;
private String runtimeVersion;
private String functionCode;
@@ -320,6 +323,9 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
throw new AnalysisException("do not support 'NATIVE' udf type
after doris version 1.2.0,"
+ "please use JAVA_UDF or RPC instead");
}
+ if (properties.containsKey(VOLATILITY) && (isAggregate ||
isTableFunction)) {
+ throw new AnalysisException("volatility property only supports
scalar JAVA_UDF and PYTHON_UDF");
+ }
userFile = properties.getOrDefault(FILE_KEY,
properties.get(OBJECT_FILE_KEY));
originalUserFile = userFile; // Keep original jar name for BE
@@ -340,6 +346,9 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
}
if (binaryType == Function.BinaryType.JAVA_UDF) {
FunctionUtil.checkEnableJavaUdf();
+ if (!isAggregate && !isTableFunction) {
+ volatility = analyzeVolatility();
+ }
// always_nullable the default value is true, equal null means true
Boolean isReturnNull = parseBooleanFromProperties(IS_RETURN_NULL);
@@ -354,6 +363,9 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
extractExpirationTime();
} else if (binaryType == Function.BinaryType.PYTHON_UDF) {
FunctionUtil.checkEnablePythonUdf();
+ if (!isAggregate && !isTableFunction) {
+ volatility = analyzeVolatility();
+ }
// always_nullable the default value is true, equal null means true
Boolean isReturnNull = parseBooleanFromProperties(IS_RETURN_NULL);
@@ -370,6 +382,19 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
+ "'3.X.X' or '3.XX.XX' (e.g. '3.10.2').",
runtimeVersionString));
}
runtimeVersion = runtimeVersionString;
+ } else if (properties.containsKey(VOLATILITY)) {
+ throw new AnalysisException("volatility property only supports
JAVA_UDF and PYTHON_UDF");
+ }
+ }
+
+ private FunctionVolatility analyzeVolatility() throws AnalysisException {
+ if (!properties.containsKey(VOLATILITY)) {
+ return FunctionVolatility.VOLATILE;
+ }
+ try {
+ return FunctionVolatility.fromString(properties.get(VOLATILITY));
+ } catch (IllegalArgumentException e) {
+ throw new AnalysisException(e.getMessage());
}
}
@@ -481,6 +506,7 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
function.setUDTFunction(true);
function.setRuntimeVersion(runtimeVersion);
function.setFunctionCode(functionCode);
+ function.setVolatility(volatility);
// Todo: maybe in create tables function, need register two function,
one is
// normal and one is outer as those have different result when result
is NULL.
}
@@ -555,6 +581,7 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
function.setExpirationTime(expirationTime);
function.setRuntimeVersion(runtimeVersion);
function.setFunctionCode(functionCode);
+ function.setVolatility(volatility);
}
private void analyzeUdf() throws AnalysisException {
@@ -592,6 +619,7 @@ public class CreateFunctionCommand extends Command
implements ForwardWithSync {
function.setExpirationTime(expirationTime);
function.setRuntimeVersion(runtimeVersion);
function.setFunctionCode(functionCode);
+ function.setVolatility(volatility);
}
private void analyzeJavaUdaf(String clazz) throws AnalysisException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java
index f64f201a7ae..081d482308a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java
@@ -293,9 +293,13 @@ public class ShowFunctionsCommand extends ShowCommand {
if (!Strings.isNullOrEmpty(function.getRuntimeVersion())) {
properties.put("RUNTIME_VERSION", function.getRuntimeVersion());
}
-
if (function instanceof ScalarFunction) {
ScalarFunction scalarFunction = (ScalarFunction) function;
+ if (!scalarFunction.isUDTFunction()
+ && (function.getBinaryType() ==
Function.BinaryType.JAVA_UDF
+ || function.getBinaryType() ==
Function.BinaryType.PYTHON_UDF)) {
+ properties.put("VOLATILITY", function.getVolatility().toSql());
+ }
properties.put("SYMBOL",
Strings.nullToEmpty(scalarFunction.getSymbolName()));
if (scalarFunction.getPrepareFnSymbol() != null) {
properties.put("PREPARE_FN",
scalarFunction.getPrepareFnSymbol());
@@ -348,4 +352,9 @@ public class ShowFunctionsCommand extends ShowCommand {
.collect(Collectors.joining(", "));
}
+ @VisibleForTesting
+ String buildPropertiesForTest(Function function) {
+ return buildProperties(function);
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
index 7dba523df42..6eb00636a50 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
@@ -473,7 +473,7 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
}
DataTrait childFd = child(0).getLogicalProperties().getTrait();
- if
(groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) {
+ if
(groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) {
return;
}
@@ -517,7 +517,7 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
return;
}
- if
(groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) {
+ if
(groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java
index 207d80495f0..5c2118e60e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java
@@ -287,7 +287,7 @@ public class LogicalLoadProject<CHILD_TYPE extends Plan>
extends LogicalUnary<CH
continue;
}
// a+random(1,10) should continue, otherwise the a(determinant),
a+random(1,10) (dependency) will be added.
- if (expr.containsUniqueFunction()) {
+ if (expr.containsVolatileExpression()) {
continue;
}
builder.addDeps(expr.getInputSlots(),
ImmutableSet.of(expr.toSlot()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
index fe076a7f8fc..28823b0be8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
@@ -319,7 +319,7 @@ public class LogicalProject<CHILD_TYPE extends Plan>
extends LogicalUnary<CHILD_
continue;
}
// a+random(1,10) should continue, otherwise the a(determinant),
a+random(1,10) (dependency) will be added.
- if (expr.containsUniqueFunction()) {
+ if (expr.containsVolatileExpression()) {
continue;
}
builder.addDeps(expr.getInputSlots(),
ImmutableSet.of(expr.toSlot()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java
index 11d04dc0348..09565bdc820 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java
@@ -208,7 +208,7 @@ public class PhysicalBucketedHashAggregate<CHILD_TYPE
extends Plan> extends Phys
public void computeUnique(DataTrait.Builder builder) {
DataTrait childFd = child(0).getLogicalProperties().getTrait();
- if
(groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) {
+ if
(groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) {
return;
}
@@ -240,7 +240,7 @@ public class PhysicalBucketedHashAggregate<CHILD_TYPE
extends Plan> extends Phys
DataTrait childFd = child(0).getLogicalProperties().getTrait();
builder.addUniformSlot(childFd);
- if
(groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) {
+ if
(groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
index 2230f6281f5..395ff521198 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
@@ -416,7 +416,7 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan>
extends PhysicalUnar
public void computeUnique(DataTrait.Builder builder) {
DataTrait childFd = child(0).getLogicalProperties().getTrait();
- if
(groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) {
+ if
(groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) {
return;
}
@@ -455,7 +455,7 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan>
extends PhysicalUnar
DataTrait childFd = child(0).getLogicalProperties().getTrait();
builder.addUniformSlot(childFd);
- if
(groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) {
+ if
(groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
index 5376f31b10f..bd1eb02ab98 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
@@ -52,6 +52,7 @@ import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.VolatileExpression;
import org.apache.doris.nereids.trees.expressions.WhenClause;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.functions.BoundFunction;
@@ -72,7 +73,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.scalar.If;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Length;
import org.apache.doris.nereids.trees.expressions.functions.scalar.NullIf;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Nvl;
-import
org.apache.doris.nereids.trees.expressions.functions.scalar.UniqueFunction;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.expressions.literal.ComparableLiteral;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
@@ -647,7 +647,8 @@ public class ExpressionUtils {
*/
public static Expression setIgnoreUniqueIdForUniqueFunc(Expression
expression, boolean ignoreUniqueId) {
return expression.rewriteDownShortCircuit(e ->
- e instanceof UniqueFunction ? ((UniqueFunction)
e).withIgnoreUniqueId(ignoreUniqueId) : e);
+ e instanceof VolatileExpression && ((VolatileExpression)
e).isVolatile()
+ ? ((VolatileExpression)
e).withIgnoreUniqueId(ignoreUniqueId) : e);
}
public static <E extends Expression> List<E> rewriteDownShortCircuit(
@@ -1346,10 +1347,12 @@ public class ExpressionUtils {
* check if the expressions contain a unique function which exists
multiple times
*/
public static boolean containUniqueFunctionExistMultiple(Collection<?
extends Expression> expressions) {
- Set<UniqueFunction> counterSet = Sets.newHashSet();
+ Set<Expression> counterSet = Sets.newHashSet();
for (Expression expression : expressions) {
if (expression.anyMatch(
- expr -> expr instanceof UniqueFunction &&
!counterSet.add((UniqueFunction) expr))) {
+ expr -> expr instanceof VolatileExpression
+ && ((VolatileExpression) expr).isVolatile()
+ && !counterSet.add((Expression) expr))) {
return true;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
index a635c2b1fd0..0f8bb5e5ed1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
@@ -195,7 +195,7 @@ public class PlanUtils {
List<? extends Expression> targetExpressions) {
Set<Slot> uniqueFunctionSlots = Sets.newHashSet();
for (Entry<Slot, Expression> kv :
ExpressionUtils.generateReplaceMap(childProjects).entrySet()) {
- if (kv.getValue().containsUniqueFunction()) {
+ if (kv.getValue().containsVolatileExpression()) {
uniqueFunctionSlots.add(kv.getKey());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java
index fc309de75b0..426a45074b8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java
@@ -114,6 +114,20 @@ public class CreateFunctionTest {
Assert.assertTrue(containsIgnoreCase(dorisAssert.query(queryStr).explainQuery(),
"concat(left(CAST(CAST(k1 as BIGINT) AS VARCHAR(65533)), 3),
'****',"
+ " right(CAST(CAST(k1 AS BIGINT) AS VARCHAR(65533)),
4))"));
+
+ String pythonUdfSql = "create function db1.py_stable(int) returns int "
+ + "properties('type'='PYTHON_UDF', 'symbol'='evaluate', "
+ + "'runtime_version'='3.10.2', 'volatility'='stable');";
+ createFunction(pythonUdfSql, ctx);
+ Assert.assertEquals(2, db.getFunctions().size());
+ Function pythonFn = findFunction(db, "py_stable");
+ Assert.assertEquals(FunctionVolatility.STABLE,
pythonFn.getVolatility());
+ Assert.assertTrue(FunctionToSqlConverter.toSql(pythonFn,
false).contains("\"VOLATILITY\"=\"stable\""));
+
+ String defaultVolatileSql = "create function db1.py_default(int)
returns int "
+ + "properties('type'='PYTHON_UDF', 'symbol'='evaluate',
'runtime_version'='3.10.2');";
+ createFunction(defaultVolatileSql, ctx);
+ Assert.assertEquals(FunctionVolatility.VOLATILE, findFunction(db,
"py_default").getVolatility());
}
@Test
@@ -204,4 +218,13 @@ public class CreateFunctionTest {
private boolean containsIgnoreCase(String str, String sub) {
return str.toLowerCase().contains(sub.toLowerCase());
}
+
+ private Function findFunction(Database db, String functionName) {
+ for (Function function : db.getFunctions()) {
+ if (functionName.equals(function.functionName())) {
+ return function;
+ }
+ }
+ throw new AssertionError("function not found: " + functionName);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java
index 26b22fa1bae..d60a4502d2d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java
@@ -19,6 +19,8 @@ package org.apache.doris.catalog;
import org.apache.doris.catalog.Function.BinaryType;
import org.apache.doris.catalog.Function.NullableMode;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.URI;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -30,11 +32,12 @@ public class FunctionToSqlConverterTest {
// ======================== ScalarFunction — JAVA_UDF
========================
@Test
- void testScalarFunction_javaUdf_basicSql() {
+ void testScalarFunction_javaUdf_basicSql() throws AnalysisException {
FunctionName name = new FunctionName("testDb", "my_add");
Type[] argTypes = {Type.INT, Type.INT};
ScalarFunction fn = ScalarFunction.createUdf(BinaryType.JAVA_UDF,
name, argTypes,
- Type.INT, false, null, "com.example.MyAdd", null, null);
+ Type.INT, false, URI.create("file:///tmp/java-udf.jar"),
"com.example.MyAdd", null, null);
+ fn.setVolatility(FunctionVolatility.VOLATILE);
String sql = FunctionToSqlConverter.toSql(fn, false);
@@ -42,8 +45,9 @@ public class FunctionToSqlConverterTest {
Assertions.assertTrue(sql.contains("my_add(int, int)"));
Assertions.assertTrue(sql.contains("RETURNS int"));
Assertions.assertTrue(sql.contains("\"SYMBOL\"=\"com.example.MyAdd\""));
- Assertions.assertTrue(sql.contains("\"FILE\"=\"\""));
+
Assertions.assertTrue(sql.contains("\"FILE\"=\"file:///tmp/java-udf.jar\""));
Assertions.assertTrue(sql.contains("\"TYPE\"=\"JAVA_UDF\""));
+ Assertions.assertTrue(sql.contains("\"VOLATILITY\"=\"volatile\""));
Assertions.assertTrue(sql.contains("\"ALWAYS_NULLABLE\"="));
Assertions.assertFalse(sql.contains("OBJECT_FILE"));
Assertions.assertFalse(sql.contains("IF NOT EXISTS"));
@@ -102,6 +106,88 @@ public class FunctionToSqlConverterTest {
Assertions.assertFalse(sql.contains("CLOSE_FN"));
}
+ @Test
+ void testScalarFunction_pythonUdf_inlineReplaySql() {
+ FunctionName name = new FunctionName("testDb", "py_inline");
+ Type[] argTypes = {Type.INT};
+ ScalarFunction fn = ScalarFunction.createUdf(BinaryType.PYTHON_UDF,
name, argTypes,
+ Type.INT, false, null, "evaluate", null, null);
+ fn.setRuntimeVersion("3.10.2");
+ fn.setExpirationTime(30);
+ fn.setFunctionCode("def evaluate(x):\n return x + 1");
+ fn.setVolatility(FunctionVolatility.IMMUTABLE);
+
+ String sql = FunctionToSqlConverter.toSql(fn, false);
+
+ Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\""));
+ Assertions.assertTrue(sql.contains("\"EXPIRATION_TIME\"=\"30\""));
+ Assertions.assertTrue(sql.contains("\"VOLATILITY\"=\"immutable\""));
+ Assertions.assertTrue(sql.contains("\"TYPE\"=\"PYTHON_UDF\""));
+ Assertions.assertTrue(sql.contains("AS $$\ndef evaluate(x):\n
return x + 1\n$$;"));
+ Assertions.assertFalse(sql.contains("\"FILE\"="));
+ Assertions.assertFalse(sql.endsWith(");"));
+ }
+
+ @Test
+ void testScalarFunction_pythonUdf_moduleReplaySql() throws
AnalysisException {
+ FunctionName name = new FunctionName("testDb", "py_module");
+ Type[] argTypes = {Type.INT};
+ ScalarFunction fn = ScalarFunction.createUdf(BinaryType.PYTHON_UDF,
name, argTypes,
+ Type.INT, false, URI.create("file:///tmp/pyudf.zip"),
"pkg.mod.evaluate", null, null);
+ fn.setRuntimeVersion("3.10.2");
+ fn.setVolatility(FunctionVolatility.STABLE);
+
+ String sql = FunctionToSqlConverter.toSql(fn, false);
+
+
Assertions.assertTrue(sql.contains("\"FILE\"=\"file:///tmp/pyudf.zip\""));
+ Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\""));
+ Assertions.assertTrue(sql.contains("\"VOLATILITY\"=\"stable\""));
+ Assertions.assertTrue(sql.endsWith(");"));
+ Assertions.assertFalse(sql.contains("AS $$"));
+ Assertions.assertFalse(sql.contains("EXPIRATION_TIME"));
+ }
+
+ @Test
+ void testScalarFunction_javaUdtfReplaySql() {
+ FunctionName name = new FunctionName("testDb", "java_table_fn");
+ Type[] argTypes = {Type.INT};
+ ScalarFunction fn = ScalarFunction.createUdf(BinaryType.JAVA_UDF,
name, argTypes,
+ Type.INT, false, null, "com.example.TableFn", null, null);
+ fn.setUDTFunction(true);
+ fn.setVolatility(FunctionVolatility.IMMUTABLE);
+
+ String sql = FunctionToSqlConverter.toSql(fn, true);
+
+ Assertions.assertTrue(sql.startsWith("CREATE TABLES FUNCTION IF NOT
EXISTS "));
+ Assertions.assertTrue(sql.contains("java_table_fn(int)"));
+ Assertions.assertTrue(sql.contains("RETURNS array<int>"));
+ Assertions.assertTrue(sql.contains("\"TYPE\"=\"JAVA_UDF\""));
+ Assertions.assertFalse(sql.contains("VOLATILITY"));
+ }
+
+ @Test
+ void testScalarFunction_pythonUdtfReplaySql() {
+ FunctionName name = new FunctionName("testDb", "py_table_fn");
+ Type[] argTypes = {Type.INT};
+ ScalarFunction fn = ScalarFunction.createUdf(BinaryType.PYTHON_UDF,
name, argTypes,
+ Type.INT, false, null, "evaluate", null, null);
+ fn.setUDTFunction(true);
+ fn.setRuntimeVersion("3.10.2");
+ fn.setFunctionCode("def evaluate(x):\n yield x");
+ fn.setVolatility(FunctionVolatility.IMMUTABLE);
+
+ String sql = FunctionToSqlConverter.toSql(fn, false);
+
+ Assertions.assertTrue(sql.startsWith("CREATE TABLES FUNCTION "));
+ Assertions.assertTrue(sql.contains("py_table_fn(int)"));
+ Assertions.assertTrue(sql.contains("RETURNS array<int>"));
+ Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\""));
+ Assertions.assertTrue(sql.contains("\"TYPE\"=\"PYTHON_UDF\""));
+ Assertions.assertTrue(sql.contains("AS $$\ndef evaluate(x):\n yield
x\n$$;"));
+ Assertions.assertFalse(sql.contains("\"FILE\"="));
+ Assertions.assertFalse(sql.contains("VOLATILITY"));
+ }
+
// ======================== ScalarFunction — IF NOT EXISTS
========================
@Test
@@ -120,15 +206,15 @@ public class FunctionToSqlConverterTest {
// ======================== ScalarFunction — NATIVE
========================
@Test
- void testScalarFunction_native_usesObjectFile() {
+ void testScalarFunction_native_usesObjectFile() throws AnalysisException {
FunctionName name = new FunctionName("testDb", "native_fn");
Type[] argTypes = {Type.INT};
ScalarFunction fn = ScalarFunction.createUdf(BinaryType.NATIVE, name,
argTypes,
- Type.INT, false, null, "native_sym", null, null);
+ Type.INT, false, URI.create("file:///tmp/native.so"),
"native_sym", null, null);
String sql = FunctionToSqlConverter.toSql(fn, false);
- Assertions.assertTrue(sql.contains("\"OBJECT_FILE\"="));
+
Assertions.assertTrue(sql.contains("\"OBJECT_FILE\"=\"file:///tmp/native.so\""));
Assertions.assertTrue(sql.contains("\"TYPE\"=\"NATIVE\""));
// NATIVE uses OBJECT_FILE, not plain FILE — and should not have
ALWAYS_NULLABLE
Assertions.assertFalse(sql.contains("ALWAYS_NULLABLE"));
@@ -165,7 +251,7 @@ public class FunctionToSqlConverterTest {
// ======================== AggregateFunction — JAVA_UDF
========================
@Test
- void testAggregateFunction_javaUdf_basicSql() {
+ void testAggregateFunction_javaUdf_basicSql() throws AnalysisException {
FunctionName name = new FunctionName("testDb", "my_sum");
Type[] argTypes = {Type.BIGINT};
AggregateFunction fn =
AggregateFunction.AggregateFunctionBuilder.createUdfBuilder()
@@ -175,6 +261,7 @@ public class FunctionToSqlConverterTest {
.intermediateType(Type.BIGINT)
.hasVarArgs(false)
.symbolName("com.example.MySum")
+ .location(URI.create("file:///tmp/java-udaf.jar"))
.build();
String sql = FunctionToSqlConverter.toSql(fn, false);
@@ -183,7 +270,7 @@ public class FunctionToSqlConverterTest {
Assertions.assertTrue(sql.contains("my_sum(bigint)"));
Assertions.assertTrue(sql.contains("RETURNS bigint"));
Assertions.assertTrue(sql.contains("\"SYMBOL\"=\"com.example.MySum\""));
- Assertions.assertTrue(sql.contains("\"FILE\"=\"\""));
+
Assertions.assertTrue(sql.contains("\"FILE\"=\"file:///tmp/java-udaf.jar\""));
Assertions.assertTrue(sql.contains("\"TYPE\"=\"JAVA_UDF\""));
Assertions.assertTrue(sql.contains("\"ALWAYS_NULLABLE\"="));
Assertions.assertFalse(sql.contains("INIT_FN"));
@@ -210,15 +297,42 @@ public class FunctionToSqlConverterTest {
Assertions.assertTrue(sql.contains("CREATE AGGREGATE FUNCTION IF NOT
EXISTS "));
}
+ @Test
+ void testAggregateFunction_pythonUdf_inlineReplaySql() {
+ FunctionName name = new FunctionName("testDb", "py_agg");
+ Type[] argTypes = {Type.INT};
+ AggregateFunction fn =
AggregateFunction.AggregateFunctionBuilder.createUdfBuilder()
+ .binaryType(BinaryType.PYTHON_UDF)
+ .name(name)
+ .argsType(argTypes)
+ .retType(Type.INT)
+ .intermediateType(Type.INT)
+ .hasVarArgs(false)
+ .symbolName("SumState")
+ .build();
+ fn.setRuntimeVersion("3.10.2");
+ fn.setExpirationTime(45);
+ fn.setFunctionCode("class SumState:\n pass");
+
+ String sql = FunctionToSqlConverter.toSql(fn, false);
+
+ Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\""));
+ Assertions.assertTrue(sql.contains("\"EXPIRATION_TIME\"=\"45\""));
+ Assertions.assertTrue(sql.contains("\"TYPE\"=\"PYTHON_UDF\""));
+ Assertions.assertTrue(sql.contains("AS $$\nclass SumState:\n
pass\n$$;"));
+ Assertions.assertFalse(sql.contains("\"FILE\"="));
+ Assertions.assertFalse(sql.endsWith(");"));
+ }
+
// ======================== AggregateFunction — NATIVE
========================
@Test
- void testAggregateFunction_native_includesSymbolFunctions() {
+ void testAggregateFunction_native_includesSymbolFunctions() throws
AnalysisException {
FunctionName name = new FunctionName("testDb", "native_agg");
Type[] argTypes = {Type.BIGINT};
AggregateFunction fn = new AggregateFunction(
name, argTypes, Type.BIGINT, false,
- Type.BIGINT, null,
+ Type.BIGINT, URI.create("file:///tmp/native-agg.so"),
"my_init", "my_update", "my_merge",
"my_serialize", "my_finalize",
"my_get_value", "my_remove");
@@ -231,7 +345,7 @@ public class FunctionToSqlConverterTest {
Assertions.assertTrue(sql.contains("\"MERGE_FN\"=\"my_merge\""));
Assertions.assertTrue(sql.contains("\"SERIALIZE_FN\"=\"my_serialize\""));
Assertions.assertTrue(sql.contains("\"FINALIZE_FN\"=\"my_finalize\""));
- Assertions.assertTrue(sql.contains("\"OBJECT_FILE\"="));
+
Assertions.assertTrue(sql.contains("\"OBJECT_FILE\"=\"file:///tmp/native-agg.so\""));
Assertions.assertTrue(sql.contains("\"TYPE\"=\"NATIVE\""));
// NATIVE uses OBJECT_FILE, not plain FILE
Assertions.assertFalse(sql.contains("ALWAYS_NULLABLE"));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java
index a12704bd238..87e9ea07396 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java
@@ -81,7 +81,7 @@ class CollectFilterAboveConsumerTest {
"exactly one conjunct (the deterministic one) should be
collected, "
+ "unique-function conjunct must NOT be collected");
Expression onlyCollected = filters.iterator().next();
- Assertions.assertFalse(onlyCollected.containsUniqueFunction(),
+ Assertions.assertFalse(onlyCollected.containsVolatileExpression(),
"collected conjunct must not contain a unique function");
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java
index fd40a770164..c0dd9dc0d14 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java
@@ -1044,7 +1044,7 @@ class UniqueFunctionTest extends SqlTestBase {
if (output instanceof Alias) {
Alias alias = (Alias) output;
exprIdToExpressionMap.put(alias.getExprId(),
alias.child());
- if (output.containsUniqueFunction()) {
+ if (output.containsVolatileExpression()) {
boolean notExists =
uniqueOutputExpressions.add(alias.child());
Assertions.assertTrue(notExists);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/udf/UdfVolatilityTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/udf/UdfVolatilityTest.java
new file mode 100644
index 00000000000..89fd24821ba
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/udf/UdfVolatilityTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.udf;
+
+import org.apache.doris.catalog.Function;
+import org.apache.doris.catalog.Function.NullableMode;
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.catalog.FunctionVolatility;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.VolatileIdentity;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.nereids.util.ExpressionUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class UdfVolatilityTest {
+
+ @Test
+ void testImmutablePythonUdfIsNotVolatileExpression() {
+ PythonUdf udf = pythonUdf(FunctionVolatility.IMMUTABLE,
VolatileIdentity.NON_VOLATILE);
+
+ Assertions.assertTrue(udf.isDeterministic());
+ Assertions.assertFalse(udf.containsVolatileExpression());
+ Assertions.assertEquals(PythonUdf.class, new
PythonUdfBuilder(udf).functionClass());
+ }
+
+ @Test
+ void testVolatilePythonUdfUsesUniqueIdentity() {
+ PythonUdf first = pythonUdf(FunctionVolatility.VOLATILE,
VolatileIdentity.newVolatileIdentity());
+ PythonUdf second = pythonUdf(FunctionVolatility.VOLATILE,
VolatileIdentity.newVolatileIdentity());
+
+ Assertions.assertFalse(first.isDeterministic());
+ Assertions.assertTrue(first.containsVolatileExpression());
+ Assertions.assertNotEquals(first, second);
+
+ Expression ignoredFirst =
ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(first, true);
+ Expression ignoredSecond =
ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(second, true);
+ Assertions.assertEquals(ignoredFirst, ignoredSecond);
+ }
+
+ @Test
+ void testVolatileAndImmutableUdfAreNotEqual() {
+ PythonUdf immutable = pythonUdf(FunctionVolatility.IMMUTABLE,
VolatileIdentity.NON_VOLATILE);
+ PythonUdf volatileUdf = pythonUdf(FunctionVolatility.VOLATILE,
VolatileIdentity.newVolatileIdentity());
+
+ Assertions.assertNotEquals(immutable, volatileUdf);
+ Assertions.assertNotEquals(volatileUdf, immutable);
+ }
+
+ @Test
+ void testJavaUdfVolatility() {
+ JavaUdf udf = javaUdf(FunctionVolatility.STABLE,
VolatileIdentity.NON_VOLATILE);
+
+ Assertions.assertFalse(udf.isDeterministic());
+ Assertions.assertFalse(udf.containsVolatileExpression());
+ }
+
+ private PythonUdf pythonUdf(FunctionVolatility volatility,
VolatileIdentity volatileIdentity) {
+ return new PythonUdf("py_fn", 1, "db1",
Function.BinaryType.PYTHON_UDF, signature(),
+ NullableMode.ALWAYS_NULLABLE, volatility, volatileIdentity,
+ null, "evaluate", null, null, "", false, 360, "3.10.2", "",
+ new IntegerLiteral(1));
+ }
+
+ private JavaUdf javaUdf(FunctionVolatility volatility, VolatileIdentity
volatileIdentity) {
+ return new JavaUdf("java_fn", 1, "db1", Function.BinaryType.JAVA_UDF,
signature(),
+ NullableMode.ALWAYS_NULLABLE, volatility, volatileIdentity,
+ null, "evaluate", null, null, "", false, 360, new
IntegerLiteral(1));
+ }
+
+ private FunctionSignature signature() {
+ return
FunctionSignature.ret(IntegerType.INSTANCE).args(IntegerType.INSTANCE);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java
index ba42ef9c6e2..716b9fa24da 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java
@@ -24,6 +24,10 @@ import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.AccessPrivilegeWithCols;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Function;
+import org.apache.doris.catalog.FunctionName;
+import org.apache.doris.catalog.FunctionVolatility;
+import org.apache.doris.catalog.ScalarFunction;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.Auth;
@@ -111,6 +115,52 @@ public class ShowFunctionsCommandTest extends
TestWithFeService {
Assertions.assertTrue(sf.like("test_for_create_function",
"test_for_create_function%"));
}
+ @Test
+ void testBuildProperties_scalarUdfEmitsVolatility() {
+ ShowFunctionsCommand sf = new ShowFunctionsCommand("test", true, null);
+ ScalarFunction fn =
ScalarFunction.createUdf(Function.BinaryType.JAVA_UDF,
+ new FunctionName("test", "java_scalar_fn"), new Type[]
{Type.INT},
+ Type.INT, false, null, "com.example.ScalarFn", null, null);
+ fn.setVolatility(FunctionVolatility.IMMUTABLE);
+
+ String properties = sf.buildPropertiesForTest(fn);
+
+
Assertions.assertTrue(properties.contains("SYMBOL=com.example.ScalarFn"));
+ Assertions.assertTrue(properties.contains("VOLATILITY=immutable"));
+ }
+
+ @Test
+ void testBuildProperties_javaUdtfDoesNotEmitVolatility() {
+ ShowFunctionsCommand sf = new ShowFunctionsCommand("test", true, null);
+ ScalarFunction fn =
ScalarFunction.createUdf(Function.BinaryType.JAVA_UDF,
+ new FunctionName("test", "java_table_fn"), new Type[]
{Type.INT},
+ Type.INT, false, null, "com.example.TableFn", null, null);
+ fn.setUDTFunction(true);
+ fn.setVolatility(FunctionVolatility.IMMUTABLE);
+
+ String properties = sf.buildPropertiesForTest(fn);
+
+
Assertions.assertTrue(properties.contains("SYMBOL=com.example.TableFn"));
+ Assertions.assertFalse(properties.contains("VOLATILITY"));
+ }
+
+ @Test
+ void testBuildProperties_pythonUdtfDoesNotEmitVolatility() {
+ ShowFunctionsCommand sf = new ShowFunctionsCommand("test", true, null);
+ ScalarFunction fn =
ScalarFunction.createUdf(Function.BinaryType.PYTHON_UDF,
+ new FunctionName("test", "py_table_fn"), new Type[] {Type.INT},
+ Type.INT, false, null, "evaluate", null, null);
+ fn.setUDTFunction(true);
+ fn.setRuntimeVersion("3.10.2");
+ fn.setVolatility(FunctionVolatility.IMMUTABLE);
+
+ String properties = sf.buildPropertiesForTest(fn);
+
+ Assertions.assertTrue(properties.contains("RUNTIME_VERSION=3.10.2"));
+ Assertions.assertTrue(properties.contains("SYMBOL=evaluate"));
+ Assertions.assertFalse(properties.contains("VOLATILITY"));
+ }
+
@Test
void testAuth() throws Exception {
auth = Env.getCurrentEnv().getAuth();
diff --git a/regression-test/suites/javaudf_p0/test_javaudf_float.groovy
b/regression-test/suites/javaudf_p0/test_javaudf_float.groovy
index 5372bda71c4..c67488acadc 100644
--- a/regression-test/suites/javaudf_p0/test_javaudf_float.groovy
+++ b/regression-test/suites/javaudf_p0/test_javaudf_float.groovy
@@ -56,7 +56,8 @@ suite("test_javaudf_float") {
sql """ CREATE FUNCTION java_udf_float_test(FLOAT,FLOAT) RETURNS FLOAT
PROPERTIES (
"file"="file://${jarPath}",
"symbol"="org.apache.doris.udf.FloatTest",
- "type"="JAVA_UDF"
+ "type"="JAVA_UDF",
+ "volatility"="immutable"
); """
qt_select """ SELECT java_udf_float_test(cast(2.83645 as
float),cast(111.1111111 as float)) as result; """
diff --git a/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy
index f550dc78c3d..034765a0a45 100644
--- a/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy
@@ -62,7 +62,8 @@ suite("test_expand_star_mtmv","mtmv") {
sql """ CREATE FUNCTION ${functionName}(date, date) RETURNS boolean
PROPERTIES (
"file"="file://${jarPath}",
"symbol"="org.apache.doris.udf.DateTest1",
- "type"="JAVA_UDF"
+ "type"="JAVA_UDF",
+ "volatility"="immutable"
); """
sql """
diff --git
a/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy
index 6897e42ee24..59940e4ed41 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy
@@ -28,7 +28,8 @@ suite("test_pythonudf_aggregate") {
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
- "runtime_version" = "${runtime_version}"
+ "runtime_version" = "${runtime_version}",
+ "volatility" = "immutable"
)
AS \$\$
def evaluate(score):
@@ -120,7 +121,8 @@ def evaluate(score):
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
- "runtime_version" = "${runtime_version}"
+ "runtime_version" = "${runtime_version}",
+ "volatility" = "immutable"
)
AS \$\$
def evaluate(age):
diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy
index 7a26136ed2d..0d621a5abf3 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy
@@ -53,7 +53,8 @@ suite("test_pythonudf_float") {
"symbol"="float_test.evaluate",
"type"="PYTHON_UDF",
"runtime_version" = "${runtime_version}",
- "always_nullable" = "true"
+ "always_nullable" = "true",
+ "volatility" = "immutable"
); """
qt_select """ SELECT python_udf_float_test(cast(2.83645 as
float),cast(111.1111111 as float)) as result; """
diff --git
a/regression-test/suites/pythonudf_p0/test_pythonudf_volatility.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_volatility.groovy
new file mode 100644
index 00000000000..691e21efedb
--- /dev/null
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_volatility.groovy
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+
+suite("test_pythonudf_volatility") {
+ def runtimeVersion = getPythonUdfRuntimeVersion()
+ def functions = [
+ "py_vol_immutable",
+ "py_vol_stable",
+ "py_vol_volatile",
+ "py_vol_default"
+ ]
+ def materializedViews = [
+ "py_vol_immutable_mv",
+ "py_vol_stable_mv",
+ "py_vol_volatile_mv",
+ "py_vol_default_mv"
+ ]
+
+ materializedViews.each { mv ->
+ sql """ DROP MATERIALIZED VIEW IF EXISTS ${mv}; """
+ }
+ sql """ DROP TABLE IF EXISTS py_vol_tbl; """
+ functions.each { fn ->
+ sql """ DROP FUNCTION IF EXISTS ${fn}(INT); """
+ }
+
+ sql """
+ CREATE TABLE py_vol_tbl (
+ k INT
+ )
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ PROPERTIES("replication_num" = "1");
+ """
+ sql """ INSERT INTO py_vol_tbl VALUES (1), (2); """
+
+ sql """
+ CREATE FUNCTION py_vol_immutable(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtimeVersion}",
+ "volatility" = "immutable",
+ "expiration_time" = "30"
+ )
+AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 1
+\$\$;
+ """
+
+ sql """
+ CREATE FUNCTION py_vol_stable(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtimeVersion}",
+ "volatility" = "stable"
+ )
+AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 2
+\$\$;
+ """
+
+ sql """
+ CREATE FUNCTION py_vol_volatile(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtimeVersion}",
+ "volatility" = "volatile"
+ )
+AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 3
+\$\$;
+ """
+
+ sql """
+ CREATE FUNCTION py_vol_default(INT)
+ RETURNS INT
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "symbol" = "evaluate",
+ "runtime_version" = "${runtimeVersion}"
+ )
+AS \$\$
+def evaluate(x):
+ if x is None:
+ return None
+ return x + 4
+\$\$;
+ """
+
+ def result = sql """
+ SELECT
+ py_vol_immutable(1),
+ py_vol_stable(1),
+ py_vol_volatile(1),
+ py_vol_default(1);
+ """
+ Assert.assertEquals([[2, 3, 4, 5]], result)
+
+ def showCreateResult = sql """ SHOW CREATE FUNCTION py_vol_immutable(INT);
"""
+ assertTrue(showCreateResult.size() == 1)
+ def replaySql = showCreateResult[0][1].toString()
+ assertTrue(replaySql.contains("\"RUNTIME_VERSION\"=\"${runtimeVersion}\""))
+ assertTrue(replaySql.contains("\"VOLATILITY\"=\"immutable\""))
+ assertTrue(replaySql.contains("\"EXPIRATION_TIME\"=\"30\""))
+ assertFalse(replaySql.contains("\"FILE\"="))
+ assertTrue(replaySql.contains("AS \$\$"))
+ assertTrue(replaySql.contains("return x + 1"))
+
+ sql """ DROP FUNCTION py_vol_immutable(INT); """
+ sql replaySql
+ result = sql """ SELECT py_vol_immutable(1); """
+ Assert.assertEquals([[2]], result)
+
+ explain {
+ sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_immutable(k)
IN (1, k + 1)"
+ contains "OR["
+ notContains " IN "
+ }
+
+ explain {
+ sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_stable(k) IN
(1, k + 2)"
+ contains "OR["
+ notContains " IN "
+ }
+
+ explain {
+ sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_volatile(k) IN
(1, k + 3)"
+ contains " IN "
+ notContains "OR["
+ }
+
+ explain {
+ sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_default(k) IN
(1, k + 4)"
+ contains " IN "
+ notContains "OR["
+ }
+
+ result = sql """
+ SELECT py_vol_volatile(k), COUNT(*)
+ FROM py_vol_tbl
+ GROUP BY py_vol_volatile(k)
+ ORDER BY 1;
+ """
+ Assert.assertEquals("[[4, 1], [5, 1]]", result.toString())
+
+ createMV("""
+ CREATE MATERIALIZED VIEW py_vol_immutable_mv
+ AS SELECT py_vol_immutable(k) AS v FROM py_vol_tbl;
+ """)
+
+ test {
+ sql """
+ CREATE MATERIALIZED VIEW py_vol_stable_mv
+ AS SELECT py_vol_stable(k) AS v_stable FROM py_vol_tbl;
+ """
+ exception "can not contain nonDeterministic expression or unnest"
+ }
+
+ test {
+ sql """
+ CREATE MATERIALIZED VIEW py_vol_volatile_mv
+ AS SELECT py_vol_volatile(k) AS v_volatile FROM py_vol_tbl;
+ """
+ exception "can not contain nonDeterministic expression or unnest"
+ }
+
+ test {
+ sql """
+ CREATE MATERIALIZED VIEW py_vol_default_mv
+ AS SELECT py_vol_default(k) AS v_default FROM py_vol_tbl;
+ """
+ exception "can not contain nonDeterministic expression or unnest"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]