yunfengzhou-hub commented on code in PR #7117:
URL: https://github.com/apache/paimon/pull/7117#discussion_r2785910945


##########
docs/content/maintenance/metrics.md:
##########
@@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented 
some key standard Fli
             <td>Gauge</td>
             <td>Time difference between reading the data file and file 
creation.</td>
         </tr>
+        <tr>
+            <td>sourceParallelismUpperBound</td>
+            <td>Flink Source Enumerator</td>
+            <td>Gauge</td>
+            <td>Recommended upper bound of parallelism for auto-scaling 
systems. For fixed bucket tables, this equals the bucket number. For dynamic or 
postpone bucket tables (bucket < 0), this equals the flink max parallelism. 
Note: This is a recommendation, not a hard limit.</td>

Review Comment:
   nit: equals -> equals to



##########
docs/content/maintenance/metrics.md:
##########
@@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented 
some key standard Fli
             <td>Gauge</td>
             <td>Time difference between reading the data file and file 
creation.</td>
         </tr>
+        <tr>
+            <td>sourceParallelismUpperBound</td>
+            <td>Flink Source Enumerator</td>
+            <td>Gauge</td>
+            <td>Recommended upper bound of parallelism for auto-scaling 
systems. For fixed bucket tables, this equals the bucket number. For dynamic or 
postpone bucket tables (bucket < 0), this equals the flink max parallelism. 
Note: This is a recommendation, not a hard limit.</td>

Review Comment:
   > For fixed bucket tables, this equals the bucket number. For dynamic or 
postpone bucket tables (bucket < 0), this equals the flink max parallelism.
   
   We might not need to describe these details. They are internal 
implementations that may be subject to change once we can acquire a more 
accurate recommended parallelism for different bucket modes, rather than the 
API part.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java:
##########
@@ -85,6 +94,12 @@ public SplitEnumerator<FileStoreSourceSplit, 
PendingSplitsCheckpoint> restoreEnu
         }
         StreamTableScan scan = readBuilder.newStreamScan();
         if (metricGroup(context) != null) {
+            int bucketNum = CoreOptions.fromMap(options).bucket();
+            int sourceParallelismUpperBound = bucketNum < 0 ? 
MAX_PARALLELISM_OF_SOURCE : bucketNum;
+
+            context.metricGroup()
+                    .gauge(SOURCE_PARALLELISM_UPPER_BOUND, () -> 
sourceParallelismUpperBound);

Review Comment:
   Better to trigger this registration in the open() or init() method of the 
enumerator or split assigner.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java:
##########
@@ -46,6 +46,15 @@ public class ContinuousFileStoreSource extends FlinkSource {
     protected final Map<String, String> options;
     protected final boolean unordered;
 
+    /**
+     * Metric name for source scaling max parallelism. This metric provides a 
recommended upper
+     * bound of parallelism for auto-scaling systems.
+     */
+    public static final String SOURCE_PARALLELISM_UPPER_BOUND = 
"sourceParallelismUpperBound";
+
+    /** refer org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM. 
*/

Review Comment:
   nit: refer -> refer to



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to