eolivelli commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r872336085


##########
conf/standalone.conf:
##########
@@ -1099,4 +1099,4 @@ configurationStoreServers=
 # than this limit then broker will persist unacked ranges into bookkeeper to 
avoid additional data overhead into
 # zookeeper.
 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
-managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
\ No newline at end of file
+managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

Review Comment:
   nit: please revert unnecessary changes



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -106,269 +112,270 @@ static void resetTypes() {
         metricWithTypeDefinition.clear();
     }
 
-    static void printTopicStats(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
+    static void printTopicStats(Map<String, List<String>> metrics, String 
cluster, String namespace, String topic,

Review Comment:
   it looks like this change is not only about the Function Worker.
   Please update the title



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -382,77 +389,102 @@ static void metricType(SimpleTextOutputStream stream, 
String name) {
 
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String name, double value, boolean 
splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String 
cluster, String namespace, String topic,
+                               String name, double value, boolean 
splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel);
+            stream.write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-           String subscription, String name, long value, boolean 
splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String 
cluster, String namespace, String topic,
+                               String subscription, String name, long value, 
boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
+    }
+
+    private static void metric(Map<String, List<String>> metrics, String 
cluster, String namespace, String topic,
+                               String producerName, long produceId, String 
name, double value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel);
+            stream.write("\",producer_name=\"").write(producerName)
+                    .write("\",producer_id=\"").write(produceId)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String producerName, long produceId, String name, double value, 
boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",producer_name=\"").write(producerName)
-                .write("\",producer_id=\"").write(produceId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String 
cluster, String namespace, String topic,
+                               String subscription, String name, double value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String subscription, String name, double value, boolean 
splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String 
cluster, String namespace, String topic,
+                               String subscription, String consumerName, long 
consumerId, String name, long value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String 
name, long value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
-                .write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String 
cluster, String namespace, String topic,
+                               String subscription, String consumerName, long 
consumerId, String name, double value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
+                    .write(consumerId).write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String 
name, double value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
-                .write(consumerId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metricWithRemoteCluster(Map<String, List<String>> 
metrics, String cluster, String namespace,
+                                                String topic, String name, 
String remoteCluster, double value,
+                                                boolean 
splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel);
+            
stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} 
").write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, 
String cluster, String namespace,
-            String topic, String name, String remoteCluster, double value, 
boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",remote_cluster=\"").write(remoteCluster).write("\"} 
");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeMetric(Map<String, List<String>> metrics, String 
name,
+                                    Consumer<SimpleTextOutputStream> 
metricWriter) {
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();

Review Comment:
   it looks like we are doing more memory allocations (even if you call 
'buf.release()') 
   is it possible to move back to creating the SimpleTextOutputStream only once 
?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to