wuchong commented on code in PR #1548:
URL: https://github.com/apache/fluss/pull/1548#discussion_r2348155519


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -959,7 +984,8 @@ private void tryProcessCommitKvSnapshot(
                                 
event.getAddCompletedSnapshotData().getCompletedSnapshot();
                         // add completed snapshot
                         CompletedSnapshotStore completedSnapshotStore =
-                                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
+                                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+                                        
coordinatorContext.getTablePathById(tb.getTableId()), tb);

Review Comment:
   move the access of `coordinatorContext`  out of the `ioExecutor`, otherwise, 
it's not thread-safe. 



##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java:
##########
@@ -339,7 +342,22 @@ public void registerMetrics(BucketMetricGroup 
bucketMetricGroup) {
         metricGroup.gauge(
                 MetricNames.LOG_NUM_SEGMENTS, () -> 
localLog.getSegments().numberOfSegments());
         metricGroup.gauge(MetricNames.LOG_END_OFFSET, 
localLog::getLocalLogEndOffset);
-        metricGroup.gauge(MetricNames.LOG_SIZE, () -> 
localLog.getSegments().sizeInBytes());
+    }
+
+    public long logSize() {
+        return localLog.getSegments().sizeInBytes();
+    }
+
+    public long logicalStorageSize() {
+        if (remoteLogEndOffset <= 0L) {
+            return localLog.getSegments().sizeInBytes();
+        } else {
+            AtomicLong logicalStorageSize = new AtomicLong(remoteLogSize);

Review Comment:
   Why use `AtomicLong` here? There should be multi-thread problems here. Using 
`.reduce(remoteLogSize, Long::sum)` should also work?



##########
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java:
##########
@@ -51,4 +66,128 @@ protected final void putVariables(Map<String, String> 
variables) {
         variables.put("host", hostname);
         variables.put("server_id", serverId);
     }
+
+    // ------------------------------------------------------------------------
+    //  table buckets groups
+    // ------------------------------------------------------------------------
+
+    public @Nullable BucketMetricGroup getTableBucketMetricGroup(
+            TablePath tablePath, TableBucket tableBucket) {
+        SimpleTableMetricGroup tableMetricGroup = 
metricGroupByTable.get(tablePath);
+        if (tableMetricGroup == null) {
+            return null;
+        }
+        return tableMetricGroup.buckets.get(tableBucket);
+    }
+
+    public void addTableBucketMetricGroup(
+            PhysicalTablePath physicalTablePath,
+            long tableId,
+            @Nullable Long partitionId,
+            Set<Integer> assignments) {

Review Comment:
   `assignments` -> `buckets`?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -512,6 +513,14 @@ private void processCreateTable(CreateTableEvent 
createTableEvent) {
                     null,
                     null,
                     tableBuckets);
+
+            // register table metrics.
+            coordinatorMetricGroup.addTableBucketMetricGroup(

Review Comment:
   If Coordinator restarts, how the existing table register the table and 
bucket metrics?



##########
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java:
##########
@@ -51,4 +66,128 @@ protected final void putVariables(Map<String, String> 
variables) {
         variables.put("host", hostname);
         variables.put("server_id", serverId);
     }
+
+    // ------------------------------------------------------------------------
+    //  table buckets groups
+    // ------------------------------------------------------------------------
+
+    public @Nullable BucketMetricGroup getTableBucketMetricGroup(
+            TablePath tablePath, TableBucket tableBucket) {
+        SimpleTableMetricGroup tableMetricGroup = 
metricGroupByTable.get(tablePath);
+        if (tableMetricGroup == null) {
+            return null;
+        }
+        return tableMetricGroup.buckets.get(tableBucket);
+    }
+
+    public void addTableBucketMetricGroup(
+            PhysicalTablePath physicalTablePath,
+            long tableId,
+            @Nullable Long partitionId,
+            Set<Integer> assignments) {
+        TablePath tablePath = physicalTablePath.getTablePath();
+        SimpleTableMetricGroup tableMetricGroup =
+                metricGroupByTable.computeIfAbsent(
+                        tablePath, table -> new 
SimpleTableMetricGroup(registry, tablePath, this));
+        assignments.forEach(
+                bucket ->
+                        tableMetricGroup.addBucketMetricGroup(
+                                physicalTablePath.getPartitionName(),
+                                new TableBucket(tableId, partitionId, 
bucket)));
+    }
+
+    public void removeTableMetricGroup(TablePath tablePath, long tableId) {
+        SimpleTableMetricGroup tableMetricGroup = 
metricGroupByTable.remove(tablePath);
+        if (tableMetricGroup != null) {
+            tableMetricGroup.removeBucketMetricsGroupForTable(tableId);
+            tableMetricGroup.close();
+        }
+    }
+
+    public void removeTablePartitionMetricsGroup(
+            TablePath tablePath, long tableId, long partitionId) {
+        SimpleTableMetricGroup tableMetricGroup = 
metricGroupByTable.get(tablePath);
+        if (tableMetricGroup != null) {
+            tableMetricGroup.removeBucketMetricsGroupForPartition(tableId, 
partitionId);
+        }
+    }
+
+    /** The metric group for table. */
+    public static class SimpleTableMetricGroup extends AbstractMetricGroup {

Review Comment:
   can be `private`.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java:
##########
@@ -66,6 +66,22 @@ public SharedKvFileRegistry(Executor asyncDisposalExecutor) {
         this.open = true;
     }
 
+    public long getFileSize() {
+        synchronized (registeredKvEntries) {
+            long size = 0;
+            for (SharedKvEntry entry : registeredKvEntries.values()) {
+                size += entry.kvFileHandle.getSize();
+            }
+            return size;
+        }
+    }
+
+    public long getNumSnapshots() {

Review Comment:
   this is not used, can be removed. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java:
##########
@@ -104,6 +136,18 @@ public CompletedSnapshotStore 
getOrCreateCompletedSnapshotStore(TableBucket tabl
                                 "Created snapshot store for table bucket {} in 
{} ms.",
                                 bucket,
                                 end - start);
+
+                        BucketMetricGroup bucketMetricGroup =
+                                
coordinatorMetricGroup.getTableBucketMetricGroup(
+                                        tablePath, tableBucket);
+                        if (bucketMetricGroup != null) {

Review Comment:
   Will we miss reporting some snapshot results if the `bucketMetricGroup` not 
registered yet? Would be better to add warning logs for this case



##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -338,6 +353,33 @@ private int writerIdCount() {
         return onlineReplicas().map(Replica::writerIdCount).reduce(0, 
Integer::sum);
     }
 
+    private long logicalStorageLogSize() {
+        return onlineReplicas().map(Replica::logicalStorageLogSize).reduce(0L, 
Long::sum);
+    }
+
+    private long logicalStorageKvSize() {
+        return onlineReplicas().map(Replica::logicalStorageKvSize).reduce(0L, 
Long::sum);
+    }
+
+    private long physicalStorageLocalSize() {
+        AtomicLong localSize = new AtomicLong(0L);

Review Comment:
   Why use `AtomicLong` not `.reduce` for this metric?



##########
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java:
##########
@@ -37,7 +37,7 @@ public BucketMetricGroup(
             MetricRegistry registry,
             @Nullable String partitionName,
             int bucket,
-            TableMetricGroup parent) {
+            AbstractMetricGroup parent) {

Review Comment:
   This is problematic. Because `BucketMetricGroup#getTableMetricGroup` returns 
the parent as `TableMetricGroup`. 



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java:
##########
@@ -66,6 +66,22 @@ public SharedKvFileRegistry(Executor asyncDisposalExecutor) {
         this.open = true;
     }
 
+    public long getFileSize() {
+        synchronized (registeredKvEntries) {

Review Comment:
   It's super heavy to access locks just for metrics. We can maintain a 
volatile varibles for file size and numSnapshots, and update them once the 
`registeredKvEntries` is updated. This is an incremental computation and only 
happens when a new snapshot is created. So it should be more lightweight. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to