Copilot commented on code in PR #61870:
URL: https://github.com/apache/doris/pull/61870#discussion_r3007875323


##########
fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java:
##########
@@ -395,4 +417,100 @@ private long getMemoryUsedPercent() {
     public ReentrantReadWriteLock getLock() {
         return lock;
     }
+
+    private void postProcessCloudMetadata() {
+        if (Config.isNotCloudMode()) {
+            return;
+        }
+        Env servingEnv = Env.getServingEnv();
+        if (servingEnv == null) {
+            LOG.warn("serving env is null, skip process cloud metadata for 
checkpoint");
+            return;
+        }
+        long start = System.currentTimeMillis();
+        for (Database db : env.getInternalCatalog().getDbs()) {
+            Database servingDb = 
servingEnv.getInternalCatalog().getDbNullable(db.getId());
+            if (servingDb == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("serving db is null. dbId: {}, dbName: {}", 
db.getId(), db.getFullName());
+                }
+                continue;
+            }
+
+            for (Table table : db.getTables()) {
+                Table servingTable = servingDb.getTableNullable(table.getId());
+                if (servingTable == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("serving table is null. dbId: {}, table: 
{}", db.getId(), table);
+                    }
+                    continue;
+                }
+                if (!(table instanceof OlapTable) || !(servingTable instanceof 
OlapTable)) {
+                    continue;
+                }
+                OlapTable olapTable = (OlapTable) table;
+                OlapTable servingOlapTable = (OlapTable) servingTable;
+
+                List<Partition> partitions = olapTable.getAllPartitions();
+                for (Partition partition : partitions) {
+                    Partition servingPartition = 
servingOlapTable.getPartition(partition.getId());
+                    if (servingPartition == null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("serving partition is null. tableId: {}, 
partitionId: {}", table.getId(),
+                                    partition.getId());
+                        }
+                        continue;
+                    }
+                    // set tablet stats
+                    setTabletStats(table.getId(), partition, servingPartition);
+                }

Review Comment:
   `postProcessCloudMetadata()` traverses `servingDb/servingTable/...` without 
acquiring any catalog/table read locks. Since DDL, load, or stats updates can 
concurrently mutate partitions/tablets/replicas, this risks inconsistent reads 
or `ConcurrentModificationException` (e.g., `Tablet#getReplicas()` exposes the 
mutable internal list). Consider locking the serving `OlapTable` (and/or using 
defensive copies of collections) while reading from `servingOlapTable` to make 
the checkpoint post-processing robust under concurrent metadata changes.
   ```suggestion
               servingDb.readLock();
               try {
                   for (Table table : db.getTables()) {
                       Table servingTable = 
servingDb.getTableNullable(table.getId());
                       if (servingTable == null) {
                           if (LOG.isDebugEnabled()) {
                               LOG.debug("serving table is null. dbId: {}, 
table: {}", db.getId(), table);
                           }
                           continue;
                       }
                       if (!(table instanceof OlapTable) || !(servingTable 
instanceof OlapTable)) {
                           continue;
                       }
                       OlapTable olapTable = (OlapTable) table;
                       OlapTable servingOlapTable = (OlapTable) servingTable;
   
                       servingOlapTable.readLock();
                       try {
                           List<Partition> partitions = 
olapTable.getAllPartitions();
                           for (Partition partition : partitions) {
                               Partition servingPartition = 
servingOlapTable.getPartition(partition.getId());
                               if (servingPartition == null) {
                                   if (LOG.isDebugEnabled()) {
                                       LOG.debug("serving partition is null. 
tableId: {}, partitionId: {}", table.getId(),
                                               partition.getId());
                                   }
                                   continue;
                               }
                               // set tablet stats
                               setTabletStats(table.getId(), partition, 
servingPartition);
                           }
                       } finally {
                           servingOlapTable.readUnlock();
                       }
                   }
               } finally {
                   servingDb.readUnlock();
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java:
##########
@@ -111,32 +209,46 @@ protected void runAfterCatalogReady() {
         } // end for dbs
 
         if (builder.getTabletIdxCount() > 0) {
-            reqList.add(builder.build());
+            futures.add(submitGetTabletStatsTask(builder.build(), filter == 
null));
         }
 
+        try {
+            for (Future<Void> future : futures) {
+                future.get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error waiting for get tablet stats tasks to complete", 
e);
+        }

Review Comment:
   These `catch (InterruptedException | ExecutionException ...)` blocks swallow 
`InterruptedException` without restoring the interrupt flag. Please 
`Thread.currentThread().interrupt()` when catching `InterruptedException` (and 
consider aborting the wait) so higher-level shutdown/interrupt logic works 
correctly.



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java:
##########
@@ -146,23 +258,51 @@ protected void runAfterCatalogReady() {
         } catch (InterruptedException | ExecutionException e) {
             LOG.error("Error waiting for get tablet stats tasks to complete", 
e);

Review Comment:
   Same as above: this block catches `InterruptedException` but doesn't restore 
the thread interrupt status. Please call `Thread.currentThread().interrupt()` 
when interrupted to avoid losing cancellation signals.
   ```suggestion
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               LOG.error("Error waiting for get tablet stats tasks to 
complete", e);
           } catch (ExecutionException e) {
               LOG.error("Error waiting for get tablet stats tasks to 
complete", e);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java:
##########
@@ -104,11 +113,23 @@ public synchronized void doCheckpoint() throws 
CheckpointException {
             storage = new Storage(imageDir);
             // get max image version
             imageVersion = storage.getLatestImageSeq();
+            long latestImageCreateTime = storage.getLatestImageCreateTime();
             // get max finalized journal id
             checkPointVersion = editLog.getFinalizedJournalId();
-            LOG.info("last checkpoint journal id: {}, current finalized 
journal id: {}",
-                    imageVersion, checkPointVersion);
-            if (imageVersion >= checkPointVersion) {
+            LOG.info("last checkpoint journal id: {}, create timestamp: {}. 
current finalized journal id: {}",
+                    imageVersion, latestImageCreateTime, checkPointVersion);
+            if (imageVersion < checkPointVersion) {
+                LOG.info("Trigger checkpoint since last checkpoint journal id: 
{} is less than "
+                        + "current finalized journal id: {}", imageVersion, 
checkPointVersion);
+            } else if (Config.isCloudMode() && 
Config.cloud_checkpoint_image_stale_threshold_seconds > 0
+                    && latestImageCreateTime > 0 && 
((System.currentTimeMillis() - latestImageCreateTime)
+                    >= Config.cloud_checkpoint_image_stale_threshold_seconds * 
1000L)) {
+                // No new finalized journals beyond the latest image.
+                // But in cloud mode, we may still want to force a checkpoint 
if the latest image file is expired.
+                // This helps that image can keep the newer table version, 
partition version, tablet stats.
+                LOG.info("Trigger checkpoint in cloud mode because latest 
image is expired. "
+                        + "latestImageSeq: {}, latestImageCreateTime: {}", 
imageVersion, latestImageCreateTime);

Review Comment:
   When forcing a checkpoint due to 
`cloud_checkpoint_image_stale_threshold_seconds`, `imageVersion` can equal 
`checkPointVersion` (no new finalized journals). In that case `Env.saveImage()` 
will try to rename `image.ckpt` to `image.<replayedJournalId>` where the target 
file already exists, and `File.renameTo` may fail on some 
platforms/filesystems. Consider explicitly handling the "same version" case 
(e.g., delete/replace the existing image file safely before rename, or ensure a 
new image sequence is used) to avoid repeated checkpoint failures.
   ```suggestion
                           + "latestImageSeq: {}, latestImageCreateTime: {}", 
imageVersion, latestImageCreateTime);
                   // In this cloud-mode stale-image case, imageVersion can 
equal checkPointVersion (no new finalized
                   // journals). Env.saveImage() will later try to rename 
image.ckpt to image.<checkPointVersion>.
                   // If the target image.<checkPointVersion> already exists, 
File.renameTo may fail on some platforms.
                   // To avoid this, ensure we use a strictly newer image 
sequence when forcing the checkpoint.
                   if (imageVersion == checkPointVersion) {
                       long newCheckPointVersion = checkPointVersion + 1;
                       LOG.info("Adjust checkpoint version from {} to {} to 
avoid duplicate image sequence when "
                               + "forcing checkpoint in cloud mode (stale 
image).",
                               checkPointVersion, newCheckPointVersion);
                       checkPointVersion = newCheckPointVersion;
                   }
   ```



##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3554,6 +3561,13 @@ public static int metaServiceRpcRetryTimes() {
         "Maximal concurrent num of get tablet stat job."})
     public static int max_get_tablet_stat_task_threads_num = 4;
 
+    @ConfField(mutable = true, description = {"Version of getting tablet stats 
in cloud mode. "
+            + "Version 1: get all tablets; Version 2: get active and interval 
expired tablets"})
+    public static int cloud_get_tablet_stats_version = 2;
+
+    @ConfField(description = {"Maximum concurrent number of get tablet stat 
jobs."})

Review Comment:
   The `cloud_sync_tablet_stats_task_threads_num` description currently says 
"Maximum concurrent number of get tablet stat jobs", which is misleading (this 
config controls FE->FE sync/push tasks). Please update the description to match 
the actual purpose to avoid operator confusion.
   ```suggestion
       @ConfField(description = {
               "存算分离模式下 Master FE 发送给其它 FE 同步 tablet stats 任务的最大并发数",
               "Maximal concurrent number of tablet stats sync/push tasks 
between Master FE and other FEs in cloud mode."})
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java:
##########
@@ -180,8 +180,10 @@ public static class ReplicaContext {
      */
     private long preWatermarkTxnId = -1;
     private long postWatermarkTxnId = -1;
+    @SerializedName(value = "sc")
     private long segmentCount = 0L;
-    private long rowsetCount = 0L;
+    @SerializedName(value = "rsc")
+    private long rowsetCount = 1L; // [0-1] rowset

Review Comment:
   `segmentCount`/`rowsetCount` were previously serialized with their field 
names; adding `@SerializedName("sc")`/`@SerializedName("rsc")` without 
`alternate` values will make Gson ignore older images/journals and reset these 
fields to defaults on upgrade. Add `alternate={"segmentCount"}` and 
`alternate={"rowsetCount"}` (and keep defaults consistent) to preserve backward 
compatibility when loading existing metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to