This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 121193a6d2f [fix](test)(dynamic-partition) fix some unstable test
cases and dynamic-partition logic (#63551)
121193a6d2f is described below
commit 121193a6d2f8c2c265793be8cd13beaf6d6cca8f
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri May 22 23:00:46 2026 -0700
[fix](test)(dynamic-partition) fix some unstable test cases and
dynamic-partition logic (#63551)
## What
This PR bundles fixes for several flaky regression tests plus two small
FE code changes uncovered while debugging them.
### Code logic fixes
-
**`fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java`**
Avoid the race between `ALTER TABLE ... SET (...)` and
`DynamicPartitionScheduler`.
When the ALTER only changes catalog-level metadata (e.g.
`partition.retention_count`), the
per-partition meta-dispatch loop is unnecessary and races with the
scheduler dropping
partitions between the partition snapshot and the dispatch, surfacing as
`Partition[...] does not exist`. The fix skips the per-partition loop
entirely when no
partition-level property needs to be sent, and tolerates concurrent
partition drops inside
the loop when partition-level updates are still required.
-
**`fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java`**
and
**`fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java`**
Add diagnostic logs for the `InsertLoadJob` jobId-mismatch path.
`LoadManager.recordFinishedLoadJob`
has a silent fallback that creates a new `InsertLoadJob` with an empty
`LoadStatistic`
when the incoming jobId is unknown, which shows up in `SHOW LOAD` as
all-zero `JobDetails`
(see flaky `test_insert_statistic`). A `WARN` is added on that fallback,
plus `DEBUG` logs
in `LoadManager.updateJobProgress` and
`LoadProcessor.doProcessReportExecStatus` so the
jobId used on the BE-report path can be cross-checked against the finish
path.
### Unstable case fixes
| Case file | Root cause / fix |
|---|---|
| `regression-test/suites/manager/test_manager_interface_1.groovy` |
After #54762, `admin set frontend config` no longer forwards to master
for non-`masterOnly` keys, so on multi-FE/cloud setups the SET and the
following SHOW may land on different FEs. Switch to `admin set all
frontends config` for `query_metadata_name_ids_timeout` so the value is
broadcast to all FEs. |
| `regression-test/suites/temp_table_p0/test_temp_table.groovy` | `SHOW
TABLETS` returns one row per replica, so on multi-BE clusters where
`force_olap_table_replication_allocation` forces `replication_num=3` the
hardcoded `assertEquals(size, 3)` fails (3 partitions × 1 bucket × 3
replicas = 9). Derive the expected replica count from the FE config. |
| `regression-test/suites/query_profile/scanner_profile.groovy` | The
hardcoded `contains(\"actualRows=9\")` check on a 10-bucket hash
distribution is flaky: with only 9 distinct INT keys, runtime tablet
pruning plus scanner-yield counter timing on BE can drop the produced
row count to 8. Switch to a regex that locates `actualRows` on the
`PhysicalOlapScan` node and asserts it falls in `[1, 9]`. |
|
`regression-test/suites/backup_restore/test_backup_restore_colocate.groovy`
| After RESTORE creates a brand-new colocate group in the target DB
(test 6, `reserve_colocate=true` restoring to a new db), the group stays
in `unstableGroups` until `ColocateTableCheckerAndBalancer` scans it.
Nereids skips colocate join while the group is unstable, so the
immediate EXPLAIN can fall back to BROADCAST. Poll `SHOW PROC
/colocation_group` for `IsStable=true` before the EXPLAIN. |
|
`regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy`
| The helper iterates every BASE TABLE in every database and
interpolated identifiers without backticks, so any table whose name is a
SQL keyword (e.g. `order`, left behind by `test_doris_jdbc_catalog`)
caused `mismatched input 'order'` and failed the whole suite. Wrap
interpolated identifiers in backticks for `use`, `desc`, `show create
table`, `show partitions`, `show replica status`, and `crc32_internal`
queries. |
|
`regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy`
| Drop the leaked `order` table at the end of the suite so it does not
poison `check_hash_bucket_table`. |
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../apache/doris/alter/SchemaChangeHandler.java | 33 +++++++++++++++++++---
.../org/apache/doris/load/loadv2/LoadManager.java | 20 +++++++++++++
.../org/apache/doris/qe/runtime/LoadProcessor.java | 6 ++++
regression-test/pipeline/p0/conf/fe.conf | 1 +
.../test_backup_restore_colocate.groovy | 25 ++++++++++++++++
.../check_hash_bucket_table.groovy | 14 ++++-----
.../jdbc/test_doris_jdbc_catalog.groovy | 4 +++
.../suites/manager/test_manager_interface_1.groovy | 14 ++++-----
.../suites/query_profile/scanner_profile.groovy | 9 +++++-
.../suites/temp_table_p0/test_temp_table.groovy | 12 +++++++-
10 files changed, 118 insertions(+), 20 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 2a1c3fba031..659908873ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2963,10 +2963,35 @@ public class SchemaChangeHandler extends AlterHandler {
skip = Boolean.parseBoolean(skipWriteIndexOnLoad) ? 1 : 0;
}
- for (Partition partition : partitions) {
- updatePartitionProperties(db, olapTable.getName(),
partition.getName(), storagePolicyId, isInMemory,
- null, compactionPolicy,
timeSeriesCompactionConfig, enableSingleCompaction, skip,
- disableAutoCompaction,
verticalCompactionNumColumnsPerGroup);
+ // Only iterate partitions when there are properties that actually
need to be
+ // dispatched to each partition's tablets. Pure catalog-level metadata
properties
+ // such as partition.retention_count do not require per-partition
updates, and
+ // iterating over a stale partition snapshot can race with concurrent
partition
+ // drops (e.g., by DynamicPartitionScheduler when retention_count or
dynamic_partition
+ // is enabled) and fail with "Partition does not exist".
+ boolean needPerPartitionUpdate = isInMemory >= 0 || storagePolicyId >= 0
+ || compactionPolicy != null ||
!timeSeriesCompactionConfig.isEmpty()
+ || enableSingleCompaction >= 0 || skip >= 0 ||
disableAutoCompaction >= 0
+ || verticalCompactionNumColumnsPerGroup >= 0;
+ if (needPerPartitionUpdate) {
+ for (Partition partition : partitions) {
+ try {
+ updatePartitionProperties(db, olapTable.getName(),
partition.getName(),
+ storagePolicyId, isInMemory, null,
compactionPolicy, timeSeriesCompactionConfig,
+ enableSingleCompaction, skip,
disableAutoCompaction,
+ verticalCompactionNumColumnsPerGroup);
+ } catch (DdlException e) {
+ // The partition may have been dropped concurrently (e.g.,
by
+ // DynamicPartitionScheduler). It is safe to skip the meta
dispatch
+ // for a partition that no longer exists.
+ if (olapTable.getPartition(partition.getName()) == null) {
+ LOG.info("partition {} of table {} was dropped
concurrently, "
+ + "skip updating its properties",
partition.getName(), olapTable.getName());
+ continue;
+ }
+ throw e;
+ }
+ }
}
olapTable.writeLockOrDdlException();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 3af3529177e..f61efa05cbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -216,11 +216,25 @@ public class LoadManager implements Writable {
LoadJob loadJob;
if (idToLoadJob.containsKey(jobId)) {
loadJob = idToLoadJob.get(jobId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("recordFinishedLoadJob: reuse existing load job,
jobId={}, label={}, dbId={}, jobType={}",
+ jobId, label, db.getId(), jobType);
+ }
if (loadJob instanceof InsertLoadJob) {
((InsertLoadJob) loadJob).setJobProperties(transactionId,
tableId, createTimestamp,
failMsg, trackingUrl, firstErrorMsg, userInfo);
}
} else {
+ // The jobId received here does not exist in idToLoadJob. This
means the InsertLoadJob
+ // that was registered during executor construction (and that
accumulated BE-reported
+ // load statistics via updateJobProgress) is NOT the one we are
about to snapshot here.
+ // A brand-new InsertLoadJob will be created below with an empty
LoadStatistic, so
+ // SHOW LOAD's JobDetails (ScannedRows / LoadBytes / All backends)
will be all zero.
+ // Logging this at WARN so CI failures of the form
+ // "test_insert_statistic: expected:<N> but was:<0>" can be
diagnosed directly.
+ LOG.warn("recordFinishedLoadJob: jobId={} not found in
idToLoadJob, creating a new "
+ + "{} load job for label={}, dbId={}. JobDetails
statistics will be empty.",
+ jobId, jobType, label, db.getId());
switch (jobType) {
case INSERT:
loadJob = new InsertLoadJob(label, transactionId,
db.getId(), tableId, createTimestamp, failMsg,
@@ -813,6 +827,12 @@ public class LoadManager implements Writable {
public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId,
TUniqueId fragmentId, long scannedRows,
long scannedBytes, boolean isDone) {
LoadJob job = idToLoadJob.get(jobId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updateJobProgress: jobId={}, beId={}, scannedRows={},
scannedBytes={}, isDone={}, "
+ + "found={}, jobIdMatched={}",
+ jobId, beId, scannedRows, scannedBytes, isDone,
+ idToLoadJob.containsKey(jobId), job == null ? -1L :
job.getId());
+ }
if (job != null) {
job.updateProgress(beId, loadId, fragmentId, scannedRows,
scannedBytes, isDone);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
index 45995a7aad7..38ba2ebbc79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
@@ -170,6 +170,12 @@ public class LoadProcessor extends AbstractJobProcessor {
@Override
protected void doProcessReportExecStatus(TReportExecStatusParams params,
SingleFragmentPipelineTask fragmentTask) {
if (params.isSetLoadedRows() && jobId != -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("doProcessReportExecStatus: forwarding load progress
to LoadManager, "
+ + "jobId={}, beId={}, queryId={},
loadedRows={}, loadedBytes={}, isDone={}",
+ jobId, params.getBackendId(),
DebugUtil.printId(params.getQueryId()),
+ params.getLoadedRows(), params.getLoadedBytes(),
params.isDone());
+ }
if (params.isSetFragmentInstanceReports()) {
for (TFragmentInstanceReport report :
params.getFragmentInstanceReports()) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index 57b5c932a2d..d426b488a7c 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -94,3 +94,4 @@ max_spilled_profile_num = 2000
check_table_lock_leaky=true
max_bucket_num_per_partition=512
enable_table_stream=true
+max_remote_file_system_cache_num=1000
diff --git
a/regression-test/suites/backup_restore/test_backup_restore_colocate.groovy
b/regression-test/suites/backup_restore/test_backup_restore_colocate.groovy
index 98262234edf..92e907fcb92 100644
--- a/regression-test/suites/backup_restore/test_backup_restore_colocate.groovy
+++ b/regression-test/suites/backup_restore/test_backup_restore_colocate.groovy
@@ -376,6 +376,26 @@ suite("test_backup_restore_colocate_with_partition",
"backup_restore") {
assertTrue(result.ColocateMismatchNum as int == 0)
}
+ // Wait until the colocate group of `db_name`.`group_name` becomes stable.
+ // After RESTORE creates a brand-new colocate group in a new db, the group
+ // is unstable until ColocateTableCheckerAndBalancer scans it (default
every
+ // tablet_checker_interval_ms = 20s). Nereids skips colocate join while the
+ // group is unstable, so EXPLAIN right after RESTORE FINISHED can miss
COLOCATE.
+ def waitColocateGroupStable = { db_name, group_name ->
+ def fullName = "${db_name}.${group_name}".toString()
+ def deadline = System.currentTimeMillis() + 60_000
+ while (System.currentTimeMillis() < deadline) {
+ def groups = sql_return_maparray("SHOW PROC '/colocation_group'")
+ def g = groups.find { it.GroupName == fullName }
+ if (g != null && g.IsStable == "true") {
+ log.info("colocate group ${fullName} is stable")
+ return
+ }
+ sleep(1000)
+ }
+ log.warn("colocate group ${fullName} did not become stable within 60s")
+ }
+
def syncer = getSyncer()
syncer.createS3Repository(repoName)
@@ -624,6 +644,11 @@ suite("test_backup_restore_colocate_with_partition",
"backup_restore") {
query = "select * from ${newDbName}.${tableName1} as t1,
${newDbName}.${tableName2} as t2 where t1.id=t2.id;"
+ // RESTORE to a brand-new db creates a new colocate group that is initially
+ // unstable; wait for ColocateTableCheckerAndBalancer to mark it stable,
otherwise
+ // EXPLAIN below may fall back to BROADCAST/SHUFFLE.
+ waitColocateGroupStable(newDbName, groupName)
+
explain {
sql("${query}")
contains("COLOCATE")
diff --git
a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy
b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy
index e1b38057da3..3fe6713f66b 100644
---
a/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy
+++
b/regression-test/suites/check_hash_bucket_table/check_hash_bucket_table.groovy
@@ -36,12 +36,12 @@ suite("check_hash_bucket_table") {
def bucketColumns = info["DistributionKey"]
if (bucketColumns == "RANDOM") {return false}
- def columnsDetail = sql_return_maparray "desc ${tblName} all;"
+ def columnsDetail = sql_return_maparray "desc `${tblName}` all;"
def bucketCols = bucketColumns.split(",").collect { it.trim() }
def bucketColsStr = bucketCols.collect { "`${it}`" }.join(",")
def partitionName = info["PartitionName"]
try {
- def tabletIdList = sql_return_maparray(""" show replica status
from ${tblName} partition(${partitionName}); """).collect { it.TabletId
}.toList()
+ def tabletIdList = sql_return_maparray(""" show replica status
from `${tblName}` partition(`${partitionName}`); """).collect { it.TabletId
}.toList()
def tabletIds = tabletIdList.toSet()
int replicaNum = tabletIdList.stream().filter { it ==
tabletIdList[0] }.count()
logger.info("""===== [check] Begin to check partition:
${db}.${tblName}, partition name: ${partitionName}, bucket num: ${bucketNum},
replica num: ${replicaNum}, bucket columns: ${bucketColsStr}""")
@@ -50,7 +50,7 @@ suite("check_hash_bucket_table") {
tabletIds.each { it2 ->
def tabletId = it2
try {
- def res = sql "select crc32_internal(${bucketColsStr})
% ${bucketNum} from ${db}.${tblName} tablet(${tabletId}) group by
crc32_internal(${bucketColsStr}) % ${bucketNum};"
+ def res = sql "select crc32_internal(${bucketColsStr})
% ${bucketNum} from `${db}`.`${tblName}` tablet(${tabletId}) group by
crc32_internal(${bucketColsStr}) % ${bucketNum};"
if (res.size() > 1) {
logger.info("""===== [check] check failed:
${db}.${tblName}, partition name: ${partitionName}, tabletId: ${tabletId},
bucket columns: ${bucketColsStr}, res.size()=${res.size()}, res=${res}""")
assert res.size() == 1
@@ -73,9 +73,9 @@ suite("check_hash_bucket_table") {
}
def checkTable = { String db, String tblName ->
- sql "use ${db};"
- def showStmt = sql_return_maparray("show create table
${tblName}")[0]["Create Table"]
- def partitionInfo = sql_return_maparray """ show partitions from
${tblName}; """
+ sql "use `${db}`;"
+ def showStmt = sql_return_maparray("show create table
`${tblName}`")[0]["Create Table"]
+ def partitionInfo = sql_return_maparray """ show partitions from
`${tblName}`; """
int checkedPartition = 0
partitionInfo.each {
if (checkPartition(db, tblName, it)) {
@@ -88,7 +88,7 @@ suite("check_hash_bucket_table") {
}
def checkDb = { String db ->
- sql "use ${db};"
+ sql "use `${db}`;"
dbNum.incrementAndGet()
def tables = sql("show full tables").stream().filter{ it[1] == "BASE
TABLE" }.collect{ it[0] }.toList()
def asyncMVs = sql_return_maparray("""select * from
mv_infos("database"="${db}");""").collect{ it.Name }.toSet()
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
b/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
index e9b66363dee..6c05a2fa293 100644
---
a/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
+++
b/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
@@ -262,6 +262,10 @@ suite("test_doris_jdbc_catalog", "p0,external") {
order_qt_keywork_table_name """ select * from `order` order by test_col;
"""
+ // cleanup reserved-keyword table so downstream infra suites (e.g.
check_hash_bucket_table)
+ // do not trip over an unquoted `order` table name
+ sql """ drop table if exists
internal.regression_test_jdbc_catalog_p0.`order` """
+
// //clean
// qt_sql """select current_catalog()"""
// sql "switch internal"
diff --git a/regression-test/suites/manager/test_manager_interface_1.groovy
b/regression-test/suites/manager/test_manager_interface_1.groovy
index 1c9aa226200..5467c830866 100644
--- a/regression-test/suites/manager/test_manager_interface_1.groovy
+++ b/regression-test/suites/manager/test_manager_interface_1.groovy
@@ -594,13 +594,13 @@ suite('test_manager_interface_1',"p0") {
assertTrue(x == 1);
- sql """ admin set frontend config("query_metadata_name_ids_timeout"=
"${val}")"""
- result = sql """
- admin show frontend config
+ sql """ admin set all frontends
config("query_metadata_name_ids_timeout"= "${val}")"""
+ result = sql """
+ admin show frontend config
"""
logger.info("result = ${result}" )
- x = 0
+ x = 0
for(int i = 0 ;i<result.size();i++) {
if (result[i][0] == "query_metadata_name_ids_timeout"){
x = 1
@@ -611,9 +611,9 @@ suite('test_manager_interface_1',"p0") {
}
}
assertTrue(x == 1);
-
- val -= 2
- sql """ admin set frontend config("query_metadata_name_ids_timeout"=
"${val}")"""
+
+ val -= 2
+ sql """ admin set all frontends
config("query_metadata_name_ids_timeout"= "${val}")"""
logger.info("result = ${result}" )
diff --git a/regression-test/suites/query_profile/scanner_profile.groovy
b/regression-test/suites/query_profile/scanner_profile.groovy
index 200599315a2..72587ee923d 100644
--- a/regression-test/suites/query_profile/scanner_profile.groovy
+++ b/regression-test/suites/query_profile/scanner_profile.groovy
@@ -86,5 +86,12 @@ suite('scanner_profile') {
String profileWithFilter = getProfileByToken(token)
logger.info("${token} Profile Data: ${profileWithFilter}")
- assertTrue(profileWithFilter.toString().contains("actualRows=9"))
+ // Verify actualRows is backfilled onto the scan node. The exact value is
+ // unstable because 9 INT keys hash-distribute into 10 buckets and a few
+ // tablets may be pruned at runtime, so only assert it is in [1, 9].
+ def matcher = (profileWithFilter.toString() =~
/PhysicalOlapScan\[scanner_profile\][^\n]*actualRows=(\d+)/)
+ assertTrue(matcher.find(), "actualRows not found on
PhysicalOlapScan[scanner_profile] in profile")
+ int actualRows = matcher.group(1) as int
+ assertTrue(actualRows >= 1 && actualRows <= 9,
+ "expect actualRows in [1, 9], got ${actualRows}")
}
\ No newline at end of file
diff --git a/regression-test/suites/temp_table_p0/test_temp_table.groovy
b/regression-test/suites/temp_table_p0/test_temp_table.groovy
index 86989c17dc4..4f9f55151ad 100644
--- a/regression-test/suites/temp_table_p0/test_temp_table.groovy
+++ b/regression-test/suites/temp_table_p0/test_temp_table.groovy
@@ -273,7 +273,17 @@ suite('test_temp_table', 'p0') {
assertEquals(show_column_result.size(), 3)
def show_tablets_result = sql "show tablets from t_test_temp_table1"
- assertEquals(show_tablets_result.size(), 3)
+ // t_test_temp_table1 has 3 partitions x 1 bucket = 3 tablets. SHOW
TABLETS returns one row
+ // per replica, so the row count depends on the cluster's
force_olap_table_replication_allocation.
+ def forceReplicaAlloc =
getFeConfig('force_olap_table_replication_allocation')
+ def replicaNum = 1
+ if (forceReplicaAlloc != null && !forceReplicaAlloc.isEmpty()) {
+ def m = (forceReplicaAlloc =~ /:(\d+)/)
+ if (m.find()) {
+ replicaNum = m.group(1).toInteger()
+ }
+ }
+ assertEquals(3 * replicaNum, show_tablets_result.size())
def tablet_id = show_tablets_result[0][0]
// admin user will see temporary table's internal name
show_tablets_result = sql "show tablet ${tablet_id}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]