This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 49faccbf50a [enhancement](nereids) speedup sql cache with use variable
as partition predicate (#37943)
49faccbf50a is described below
commit 49faccbf50a4fa3bb266454e9134e4bfe91fef3b
Author: 924060929 <[email protected]>
AuthorDate: Tue Jul 16 22:11:18 2024 +0800
[enhancement](nereids) speedup sql cache with use variable as partition
predicate (#37943)
follow up #37090
support reuse sql cache when use variable as partition predicate and
variable change:
```sql
set @dt='2024-07-16';
-- create cache 1
select * from tbl where dt = @dt;
set @dt='2024-07-17';
-- create cache 2, will not invalidate cache 1
select * from tbl where dt = @dt;
set @dt='2024-07-16';
-- reuse cache 1
select * from tbl where dt = @dt;
```
---
.../doris/common/NereidsSqlCacheManager.java | 43 ++++++++++++++++++++--
.../org/apache/doris/nereids/SqlCacheContext.java | 18 +++++++++
.../java/org/apache/doris/qe/ConnectProcessor.java | 14 +++++++
.../cache/parse_sql_from_sql_cache.groovy | 20 ++++++++++
4 files changed, 91 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
index cbc3c173af6..6c4d5901709 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
@@ -25,12 +25,14 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.RowFilterPolicy;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.SqlCacheContext;
+import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
@@ -124,7 +126,9 @@ public class NereidsSqlCacheManager {
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity =
connectContext.getCurrentUserIdentity();
- String key = currentUserIdentity.toString() + ":" + sql.trim();
+ String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
+ ? currentUserIdentity.toString() + ":" + sql.trim()
+ : currentUserIdentity.toString() + ":" +
DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
if (sqlCaches.getIfPresent(key) == null &&
sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
@@ -142,7 +146,9 @@ public class NereidsSqlCacheManager {
}
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity =
connectContext.getCurrentUserIdentity();
- String key = currentUserIdentity.toString() + ":" + sql.trim();
+ String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
+ ? currentUserIdentity.toString() + ":" + sql.trim()
+ : currentUserIdentity.toString() + ":" +
DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
if (sqlCaches.getIfPresent(key) == null &&
sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
SqlCache cache = (SqlCache) analyzer.getCache();
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
@@ -162,8 +168,7 @@ public class NereidsSqlCacheManager {
/** tryParseSql */
public Optional<LogicalSqlCache> tryParseSql(ConnectContext
connectContext, String sql) {
UserIdentity currentUserIdentity =
connectContext.getCurrentUserIdentity();
- Env env = connectContext.getEnv();
- String key = currentUserIdentity.toString() + ":" + sql.trim();
+ String key = currentUserIdentity + ":" + sql.trim();
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
if (sqlCacheContext == null) {
return Optional.empty();
@@ -171,6 +176,36 @@ public class NereidsSqlCacheManager {
// LOG.info("Total size: " +
GraphLayout.parseInstance(sqlCacheContext).totalSize());
+ List<Variable> currentVariables =
resolveUserVariables(sqlCacheContext);
+ if (usedVariablesChanged(currentVariables, sqlCacheContext)) {
+ String md5 = DebugUtil.printId(
+
sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)));
+
+ String md5CacheKey = currentUserIdentity + ":" + md5;
+ SqlCacheContext sqlCacheContextWithVariable =
sqlCaches.getIfPresent(md5CacheKey);
+
+ // already exist cache in the fe, but the variable is different to
this query,
+ // we should create another cache context in fe, use another cache
key
+ connectContext.getStatementContext()
+ .getSqlCacheContext().ifPresent(ctx ->
ctx.setCacheKeyType(CacheKeyType.MD5));
+
+ if (sqlCacheContextWithVariable != null) {
+ return tryParseSqlWithoutCheckVariable(
+ connectContext, md5CacheKey,
sqlCacheContextWithVariable, currentUserIdentity
+ );
+ } else {
+ return Optional.empty();
+ }
+ } else {
+ return tryParseSqlWithoutCheckVariable(connectContext, key,
sqlCacheContext, currentUserIdentity);
+ }
+ }
+
+ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
+ ConnectContext connectContext, String key,
+ SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity)
{
+ Env env = connectContext.getEnv();
+
// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
index a0c95a9113e..4cf2418d91e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
@@ -86,6 +86,8 @@ public class SqlCacheContext {
private volatile PUniqueId cacheKeyMd5;
private volatile ResultSet resultSetInFe;
+ private volatile CacheKeyType cacheKeyType = CacheKeyType.SQL;
+
public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) {
this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity
cannot be null");
this.queryId = Objects.requireNonNull(queryId, "queryId cannot be
null");
@@ -392,6 +394,14 @@ public class SqlCacheContext {
this.resultSetInFe = resultSetInFe;
}
+ public CacheKeyType getCacheKeyType() {
+ return cacheKeyType;
+ }
+
+ public void setCacheKeyType(CacheKeyType cacheKeyType) {
+ this.cacheKeyType = cacheKeyType;
+ }
+
/** FullTableName */
@lombok.Data
@lombok.AllArgsConstructor
@@ -434,4 +444,12 @@ public class SqlCacheContext {
this.scanPartitions.add(partitionId);
}
}
+
+ /** CacheKeyType */
+ public enum CacheKeyType {
+ // use `userIdentity`:`sql`.trim() as Cache key in FE
+ SQL,
+ // use MD5 as Cache key in FE
+ MD5
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index aac7951ecdf..7b0ef09a744 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -49,6 +49,8 @@ import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
+import org.apache.doris.nereids.SqlCacheContext;
+import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.exceptions.ParseException;
@@ -230,9 +232,15 @@ public abstract class ConnectProcessor {
boolean nereidsUseServerPrep =
(sessionVariable.enableServeSidePreparedStatement
&& !sessionVariable.isEnableInsertGroupCommit())
|| mysqlCommand == MysqlCommand.COM_QUERY;
+ CacheKeyType cacheKeyType = null;
if (nereidsUseServerPrep && sessionVariable.isEnableNereidsPlanner()) {
if (wantToParseSqlFromSqlCache) {
cachedStmts = parseFromSqlCache(originStmt);
+ Optional<SqlCacheContext> sqlCacheContext =
ConnectContext.get()
+ .getStatementContext().getSqlCacheContext();
+ if (sqlCacheContext.isPresent()) {
+ cacheKeyType = sqlCacheContext.get().getCacheKeyType();
+ }
if (cachedStmts != null) {
stmts = cachedStmts;
}
@@ -310,6 +318,12 @@ public abstract class ConnectProcessor {
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
ctx.setExecutor(executor);
+ if (cacheKeyType != null) {
+ SqlCacheContext sqlCacheContext =
+
executor.getContext().getStatementContext().getSqlCacheContext().get();
+ sqlCacheContext.setCacheKeyType(cacheKeyType);
+ }
+
try {
executor.execute();
if (connectType.equals(ConnectType.MYSQL)) {
diff --git
a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
index d95c3edc344..2c17da24661 100644
--- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
@@ -503,6 +503,26 @@ suite("parse_sql_from_sql_cache") {
assertHasCache "select @custom_variable from
test_use_plan_cache17 where id = 1 and value = 1"
def result1 = sql "select @custom_variable from
test_use_plan_cache17 where id = 1 and value = 1"
assertTrue(result1.size() == 1 &&
result1[0][0].toString().toInteger() == 10)
+
+
+ sql "set @custom_variable2=1"
+ assertNoCache "select * from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+ def res = sql "select * from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+ assertTrue(res[0][0] == 1)
+ assertHasCache "select * from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+
+ sql "set @custom_variable2=2"
+ assertNoCache "select* from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+ // should not invalidate cache with @custom_variable2=1
+ res = sql "select * from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+ assertTrue(res[0][0] == 2)
+ assertHasCache "select * from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+
+ sql "set @custom_variable2=1"
+ // should reuse cache
+ assertHasCache "select * from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+ res = sql "select * from test_use_plan_cache17 where id =
@custom_variable2 and value = 1"
+ assertTrue(res[0][0] == 1)
}
}),
extraThread("test_udf", {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]