This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e29d1252004 [fix](planner) query should be cancelled if limit reached
(#44338) (#45222)
e29d1252004 is described below
commit e29d1252004a142551cae2959511eb66b7f21a73
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Dec 9 22:22:24 2024 -0800
[fix](planner) query should be cancelled if limit reached (#44338) (#45222)
cherry-pick #44338
---
be/src/vec/exec/scan/scanner_scheduler.cpp | 13 ++++++++
be/src/vec/exec/scan/vscanner.h | 2 ++
.../org/apache/doris/nereids/NereidsPlanner.java | 5 ---
.../org/apache/doris/planner/OriginalPlanner.java | 15 ---------
.../java/org/apache/doris/planner/Planner.java | 6 ----
.../main/java/org/apache/doris/qe/Coordinator.java | 36 +++++++++++-----------
6 files changed, 33 insertions(+), 44 deletions(-)
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f7b6887d746..eef7acf1cb9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -271,6 +271,7 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
size_t raw_bytes_read = 0;
bool first_read = true;
+ int64_t limit = scanner->limit();
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
@@ -319,6 +320,18 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.push_back(std::move(free_block));
}
+
+ if (limit > 0 && limit < ctx->batch_size()) {
+ // If this scanner has limit, and less than batch size,
+ // return immediately and no need to wait raw_bytes_threshold.
+ // This can save time that each scanner may only return a small
number of rows,
+ // but rows are enough from all scanners.
+ // If not break, the query like "select * from tbl where id=1
limit 10"
+ // may scan a lot data when the "id=1"'s filter ratio is high.
+ // If limit is larger than batch size, this rule is skipped,
+ // to avoid user specify a large limit and causing too much small
blocks.
+ break;
+ }
} // end for while
if (UNLIKELY(!status.ok())) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 03604621f05..acb715e6e47 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -167,6 +167,8 @@ public:
_query_statistics = query_statistics;
}
+ int64_t limit() const { return _limit; }
+
protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 7a8e29306c3..31ecae7f33d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -621,11 +621,6 @@ public class NereidsPlanner extends Planner {
return plan;
}
- @Override
- public boolean isBlockQuery() {
- return true;
- }
-
@Override
public DescriptorTable getDescTable() {
return descTable;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 24433af00e2..3f0071680cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -80,10 +80,6 @@ public class OriginalPlanner extends Planner {
this.analyzer = analyzer;
}
- public boolean isBlockQuery() {
- return isBlockQuery;
- }
-
public PlannerContext getPlannerContext() {
return plannerContext;
}
@@ -274,17 +270,6 @@ public class OriginalPlanner extends Planner {
if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
- if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() !=
null) {
- isBlockQuery = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("this is block query");
- }
- } else {
- isBlockQuery = false;
- if (LOG.isDebugEnabled()) {
- LOG.debug("this isn't block query");
- }
- }
// Check SelectStatement if optimization condition satisfied
if (selectStmt.isPointQueryShortCircuit()) {
// Optimize for point query like: SELECT * FROM t1 WHERE pk1 =
1 and pk2 = 2
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 0a7246f5e1c..0a22ec841dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -43,8 +43,6 @@ public abstract class Planner {
protected ArrayList<PlanFragment> fragments = Lists.newArrayList();
- protected boolean isBlockQuery = false;
-
protected TQueryOptions queryOptions;
public abstract List<ScanNode> getScanNodes();
@@ -115,10 +113,6 @@ public abstract class Planner {
return fragments;
}
- public boolean isBlockQuery() {
- return isBlockQuery;
- }
-
public TQueryOptions getQueryOptions() {
return queryOptions;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 0f5a598420d..500ba8f22f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -240,8 +240,6 @@ public class Coordinator implements CoordInterface {
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();
- private final boolean isBlockQuery;
-
private int numReceivedRows = 0;
private List<String> deltaUrls;
@@ -336,7 +334,6 @@ public class Coordinator implements CoordInterface {
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner
planner) {
this.context = context;
- this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
@@ -379,7 +376,6 @@ public class Coordinator implements CoordInterface {
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable
descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean
loadZeroTolerance, boolean enableProfile) {
- this.isBlockQuery = true;
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
@@ -1448,24 +1444,28 @@ public class Coordinator implements CoordInterface {
}
}
- if (resultBatch.isEos()) {
- this.returnedAllResults = true;
-
- // if this query is a block query do not cancel.
- Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
- boolean hasLimit = numLimitRows > 0;
- if (!isBlockQuery && instanceIds.size() > 1 && hasLimit &&
numReceivedRows >= numLimitRows) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("no block query, return num >= limit rows, need
cancel");
- }
- cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH,
"query reach limit");
+ if (resultBatch.getBatch() != null) {
+ numReceivedRows += resultBatch.getBatch().getRowsSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("number received rows: {}, {}", numReceivedRows,
DebugUtil.printId(queryId));
}
- if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().dryRunQuery) {
+ }
+
+ if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().dryRunQuery) {
+ if (resultBatch.isEos()) {
numReceivedRows = 0;
numReceivedRows +=
resultBatch.getQueryStatistics().getReturnedRows();
}
- } else if (resultBatch.getBatch() != null) {
- numReceivedRows += resultBatch.getBatch().getRowsSize();
+ }
+
+ Long limitRows = fragments.get(0).getPlanRoot().getLimit();
+ if (limitRows > 0 && numReceivedRows >= limitRows) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("reach limit rows: {}, received rows: {}, cancel
query, {}",
+ limitRows, numReceivedRows,
DebugUtil.printId(queryId));
+ }
+ cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR,
"reach limit");
+ resultBatch.setEos(true);
}
return resultBatch;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]