Copilot commented on code in PR #61870:
URL: https://github.com/apache/doris/pull/61870#discussion_r3007875323
##########
fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java:
##########
@@ -395,4 +417,100 @@ private long getMemoryUsedPercent() {
public ReentrantReadWriteLock getLock() {
return lock;
}
+
+ private void postProcessCloudMetadata() {
+ if (Config.isNotCloudMode()) {
+ return;
+ }
+ Env servingEnv = Env.getServingEnv();
+ if (servingEnv == null) {
+ LOG.warn("serving env is null, skip process cloud metadata for
checkpoint");
+ return;
+ }
+ long start = System.currentTimeMillis();
+ for (Database db : env.getInternalCatalog().getDbs()) {
+ Database servingDb =
servingEnv.getInternalCatalog().getDbNullable(db.getId());
+ if (servingDb == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("serving db is null. dbId: {}, dbName: {}",
db.getId(), db.getFullName());
+ }
+ continue;
+ }
+
+ for (Table table : db.getTables()) {
+ Table servingTable = servingDb.getTableNullable(table.getId());
+ if (servingTable == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("serving table is null. dbId: {}, table:
{}", db.getId(), table);
+ }
+ continue;
+ }
+ if (!(table instanceof OlapTable) || !(servingTable instanceof
OlapTable)) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ OlapTable servingOlapTable = (OlapTable) servingTable;
+
+ List<Partition> partitions = olapTable.getAllPartitions();
+ for (Partition partition : partitions) {
+ Partition servingPartition =
servingOlapTable.getPartition(partition.getId());
+ if (servingPartition == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("serving partition is null. tableId: {},
partitionId: {}", table.getId(),
+ partition.getId());
+ }
+ continue;
+ }
+ // set tablet stats
+ setTabletStats(table.getId(), partition, servingPartition);
+ }
Review Comment:
`postProcessCloudMetadata()` traverses `servingDb/servingTable/...` without
acquiring any catalog/table read locks. Since DDL, load, or stats updates can
concurrently mutate partitions/tablets/replicas, this risks inconsistent reads
or `ConcurrentModificationException` (e.g., `Tablet#getReplicas()` exposes the
mutable internal list). Consider locking the serving `OlapTable` (and/or using
defensive copies of collections) while reading from `servingOlapTable` to make
the checkpoint post-processing robust under concurrent metadata changes.
```suggestion
servingDb.readLock();
try {
for (Table table : db.getTables()) {
Table servingTable =
servingDb.getTableNullable(table.getId());
if (servingTable == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("serving table is null. dbId: {},
table: {}", db.getId(), table);
}
continue;
}
if (!(table instanceof OlapTable) || !(servingTable
instanceof OlapTable)) {
continue;
}
OlapTable olapTable = (OlapTable) table;
OlapTable servingOlapTable = (OlapTable) servingTable;
servingOlapTable.readLock();
try {
List<Partition> partitions =
olapTable.getAllPartitions();
for (Partition partition : partitions) {
Partition servingPartition =
servingOlapTable.getPartition(partition.getId());
if (servingPartition == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("serving partition is null.
tableId: {}, partitionId: {}", table.getId(),
partition.getId());
}
continue;
}
// set tablet stats
setTabletStats(table.getId(), partition,
servingPartition);
}
} finally {
servingOlapTable.readUnlock();
}
}
} finally {
servingDb.readUnlock();
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java:
##########
@@ -111,32 +209,46 @@ protected void runAfterCatalogReady() {
} // end for dbs
if (builder.getTabletIdxCount() > 0) {
- reqList.add(builder.build());
+ futures.add(submitGetTabletStatsTask(builder.build(), filter ==
null));
}
+ try {
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error waiting for get tablet stats tasks to complete",
e);
+ }
Review Comment:
These `catch (InterruptedException | ExecutionException ...)` blocks swallow
`InterruptedException` without restoring the interrupt flag. Please
`Thread.currentThread().interrupt()` when catching `InterruptedException` (and
consider aborting the wait) so higher-level shutdown/interrupt logic works
correctly.
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java:
##########
@@ -146,23 +258,51 @@ protected void runAfterCatalogReady() {
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error waiting for get tablet stats tasks to complete",
e);
Review Comment:
Same as above: this block catches `InterruptedException` but doesn't restore
the thread interrupt status. Please call `Thread.currentThread().interrupt()`
when interrupted to avoid losing cancellation signals.
```suggestion
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error waiting for get tablet stats tasks to
complete", e);
} catch (ExecutionException e) {
LOG.error("Error waiting for get tablet stats tasks to
complete", e);
```
##########
fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java:
##########
@@ -104,11 +113,23 @@ public synchronized void doCheckpoint() throws
CheckpointException {
storage = new Storage(imageDir);
// get max image version
imageVersion = storage.getLatestImageSeq();
+ long latestImageCreateTime = storage.getLatestImageCreateTime();
// get max finalized journal id
checkPointVersion = editLog.getFinalizedJournalId();
- LOG.info("last checkpoint journal id: {}, current finalized
journal id: {}",
- imageVersion, checkPointVersion);
- if (imageVersion >= checkPointVersion) {
+ LOG.info("last checkpoint journal id: {}, create timestamp: {}.
current finalized journal id: {}",
+ imageVersion, latestImageCreateTime, checkPointVersion);
+ if (imageVersion < checkPointVersion) {
+ LOG.info("Trigger checkpoint since last checkpoint journal id:
{} is less than "
+ + "current finalized journal id: {}", imageVersion,
checkPointVersion);
+ } else if (Config.isCloudMode() &&
Config.cloud_checkpoint_image_stale_threshold_seconds > 0
+ && latestImageCreateTime > 0 &&
((System.currentTimeMillis() - latestImageCreateTime)
+ >= Config.cloud_checkpoint_image_stale_threshold_seconds *
1000L)) {
+ // No new finalized journals beyond the latest image.
+ // But in cloud mode, we may still want to force a checkpoint
if the latest image file is expired.
+ // This helps that image can keep the newer table version,
partition version, tablet stats.
+ LOG.info("Trigger checkpoint in cloud mode because latest
image is expired. "
+ + "latestImageSeq: {}, latestImageCreateTime: {}",
imageVersion, latestImageCreateTime);
Review Comment:
When forcing a checkpoint due to
`cloud_checkpoint_image_stale_threshold_seconds`, `imageVersion` can equal
`checkPointVersion` (no new finalized journals). In that case `Env.saveImage()`
will try to rename `image.ckpt` to `image.<replayedJournalId>` where the target
file already exists, and `File.renameTo` may fail on some
platforms/filesystems. Consider explicitly handling the "same version" case
(e.g., delete/replace the existing image file safely before rename, or ensure a
new image sequence is used) to avoid repeated checkpoint failures.
```suggestion
+ "latestImageSeq: {}, latestImageCreateTime: {}",
imageVersion, latestImageCreateTime);
// In this cloud-mode stale-image case, imageVersion can
equal checkPointVersion (no new finalized
// journals). Env.saveImage() will later try to rename
image.ckpt to image.<checkPointVersion>.
// If the target image.<checkPointVersion> already exists,
File.renameTo may fail on some platforms.
// To avoid this, ensure we use a strictly newer image
sequence when forcing the checkpoint.
if (imageVersion == checkPointVersion) {
long newCheckPointVersion = checkPointVersion + 1;
LOG.info("Adjust checkpoint version from {} to {} to
avoid duplicate image sequence when "
+ "forcing checkpoint in cloud mode (stale
image).",
checkPointVersion, newCheckPointVersion);
checkPointVersion = newCheckPointVersion;
}
```
##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3554,6 +3561,13 @@ public static int metaServiceRpcRetryTimes() {
"Maximal concurrent num of get tablet stat job."})
public static int max_get_tablet_stat_task_threads_num = 4;
+ @ConfField(mutable = true, description = {"Version of getting tablet stats
in cloud mode. "
+ + "Version 1: get all tablets; Version 2: get active and interval
expired tablets"})
+ public static int cloud_get_tablet_stats_version = 2;
+
+ @ConfField(description = {"Maximum concurrent number of get tablet stat
jobs."})
Review Comment:
The `cloud_sync_tablet_stats_task_threads_num` description currently says
"Maximum concurrent number of get tablet stat jobs", which is misleading (this
config controls FE->FE sync/push tasks). Please update the description to match
the actual purpose to avoid operator confusion.
```suggestion
@ConfField(description = {
"存算分离模式下 Master FE 发送给其它 FE 同步 tablet stats 任务的最大并发数",
"Maximal concurrent number of tablet stats sync/push tasks
between Master FE and other FEs in cloud mode."})
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java:
##########
@@ -180,8 +180,10 @@ public static class ReplicaContext {
*/
private long preWatermarkTxnId = -1;
private long postWatermarkTxnId = -1;
+ @SerializedName(value = "sc")
private long segmentCount = 0L;
- private long rowsetCount = 0L;
+ @SerializedName(value = "rsc")
+ private long rowsetCount = 1L; // [0-1] rowset
Review Comment:
`segmentCount`/`rowsetCount` were previously serialized with their field
names; adding `@SerializedName("sc")`/`@SerializedName("rsc")` without
`alternate` values will make Gson ignore older images/journals and reset these
fields to defaults on upgrade. Add `alternate={"segmentCount"}` and
`alternate={"rowsetCount"}` (and keep defaults consistent) to preserve backward
compatibility when loading existing metadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]