OneSizeFitsQuorum commented on code in PR #14219:
URL: https://github.com/apache/iotdb/pull/14219#discussion_r1859826216


##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java:
##########
@@ -33,66 +34,50 @@
  * <p>Note: every consensusGroup/dataRegion has and only has 1 instance of 
this class.
  */
 public class PipeConsensusSyncLagManager {
-  long userWriteProgress = 0;
-  long minReplicateProgress = Long.MAX_VALUE;
+  long syncLag = Long.MIN_VALUE;
+  ReentrantLock lock = new ReentrantLock();
   List<ConsensusPipeConnector> consensusPipeConnectorList = new 
CopyOnWriteArrayList<>();
 
-  private void updateReplicateProgress() {
-    minReplicateProgress = Long.MAX_VALUE;
-    // if there isn't a consensus pipe task, replicate progress is 
Long.MAX_VALUE.
-    if (consensusPipeConnectorList.isEmpty()) {
-      return;
-    }
-    // else we find the minimum progress in all consensus pipe task.
-    consensusPipeConnectorList.forEach(
-        consensusPipeConnector ->
-            minReplicateProgress =
-                Math.min(
-                    minReplicateProgress,
-                    
consensusPipeConnector.getConsensusPipeReplicateProgress()));
-  }
-
-  private void updateUserWriteProgress() {
-    // if there isn't a consensus pipe task, user write progress is 0.
-    if (consensusPipeConnectorList.isEmpty()) {
-      userWriteProgress = 0;
-      return;
-    }
-    // since the user write progress of different consensus pipes on the same 
DataRegion is the
-    // same, we only need to take out one Connector to calculate
-    try {
-      ConsensusPipeConnector connector = consensusPipeConnectorList.get(0);
-      userWriteProgress = connector.getConsensusPipeCommitProgress();
-    } catch (Exception e) {
-      // if removing the last connector happens after empty check, we may 
encounter
-      // OutOfBoundsException, in this case, we set userWriteProgress to 0.
-      userWriteProgress = 0;
-    }
+  private long getSyncLagForSpecificConsensusPipe(ConsensusPipeConnector 
consensusPipeConnector) {
+    long userWriteProgress = 
consensusPipeConnector.getConsensusPipeCommitProgress();
+    long replicateProgress = 
consensusPipeConnector.getConsensusPipeReplicateProgress();
+    return Math.max(userWriteProgress - replicateProgress, 0);
   }
 
   public void addConsensusPipeConnector(ConsensusPipeConnector 
consensusPipeConnector) {
     consensusPipeConnectorList.add(consensusPipeConnector);

Review Comment:
   lock?



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java:
##########
@@ -33,66 +34,50 @@
  * <p>Note: every consensusGroup/dataRegion has and only has 1 instance of 
this class.
  */
 public class PipeConsensusSyncLagManager {
-  long userWriteProgress = 0;
-  long minReplicateProgress = Long.MAX_VALUE;
+  long syncLag = Long.MIN_VALUE;
+  ReentrantLock lock = new ReentrantLock();
   List<ConsensusPipeConnector> consensusPipeConnectorList = new 
CopyOnWriteArrayList<>();

Review Comment:
   do we need copy write list anymore if we use lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to