This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 72a3e59ca [improve] Prometheus streaming parsing optimization (#3752)
72a3e59ca is described below
commit 72a3e59ca63945b8799340daa1d736ff3345f7e1
Author: Duansg <[email protected]>
AuthorDate: Sun Sep 7 23:20:23 2025 +0800
[improve] Prometheus streaming parsing optimization (#3752)
Co-authored-by: Tom <[email protected]>
---
.../collect/prometheus/parser/OnlineParser.java | 19 ++-
.../prometheus/parser/OnlineParserTest.java | 172 +++++++++++++++++++++
2 files changed, 187 insertions(+), 4 deletions(-)
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParser.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParser.java
index 60571ee78..a4858c0f8 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParser.java
+++
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParser.java
@@ -69,6 +69,10 @@ public class OnlineParser {
parseMetric(inputStream, metricFamilyMap, stringBuilder);
}
i = getChar(inputStream);
+ // To address the `\n\r` scenario, it is necessary to skip
+ if (i == '\r') {
+ i = getChar(inputStream);
+ }
}
} catch (FormatException e) {
log.error("prometheus parser failed because of wrong input format.
{}", e.getMessage());
@@ -223,20 +227,27 @@ public class OnlineParser {
}
private static CharChecker parseLabelValue(InputStream inputStream,
StringBuilder stringBuilder) throws IOException, FormatException {
- int i = getChar(inputStream);
+ int i = inputStream.read();
while (i != '"' && i != -1) {
if (i == '\\') {
- i = getChar(inputStream);
+ i = inputStream.read();
switch (i) {
case 'n' -> stringBuilder.append('\n');
case '\\' -> stringBuilder.append('\\');
case '\"' -> stringBuilder.append('\"');
- default -> throw new FormatException();
+ default -> {
+ // Unknown escape, keep as-is
+ //
https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/lib/protoparser/prometheus/parser.go#L419
+ stringBuilder.append('\\');
+ if (i != -1) {
+ stringBuilder.append((char) i);
+ }
+ }
}
} else {
stringBuilder.append((char) i);
}
- i = getChar(inputStream);
+ i = inputStream.read();
}
return new CharChecker(i);
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
index de8775135..4f515b869 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
+++
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/prometheus/parser/OnlineParserTest.java
@@ -43,6 +43,16 @@ class OnlineParserTest {
assertNotNull(metricFamilyMap);
}
+ @Disabled // because unless you have already saved the locally tested files
+ @Test
+ void parseTestFile() throws Exception {
+ InputStream inputStream =
this.getClass().getClassLoader().getResourceAsStream("test_file.txt");
+ assertNotNull(inputStream, "Failed to load test_file.txt resource");
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+ assertNotNull(metricFamilyMap);
+ }
+
+
@Test
void parseMetrics2() throws Exception {
String str = """
@@ -287,4 +297,166 @@ class OnlineParserTest {
assertEquals("jvm_gc_pause_seconds_sum", metricFamily1.getName());
assertEquals(0.139, metricFamily1.getMetricList().get(0).getValue());
}
+
+ @Test
+ void testParseMetricsWithLfCrLineEnding() throws Exception {
+ String str = "# HELP go_gc_duration_seconds A summary of the pause
duration of garbage collection cycles.\n\r"
+ + "# TYPE go_gc_duration_seconds summary\n\r"
+ + "jvm_gc_pause_seconds_count 1.0\n\r"
+ + "jvm_gc_pause_seconds_sum{} 0.139\n\r";
+
+ InputStream inputStream = new
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+
+ assertNotNull(metricFamilyMap);
+ assertEquals(2, metricFamilyMap.values().size());
+
+ MetricFamily metricFamily =
metricFamilyMap.get("jvm_gc_pause_seconds_count");
+ assertEquals("jvm_gc_pause_seconds_count", metricFamily.getName());
+ assertEquals(1.0, metricFamily.getMetricList().get(0).getValue());
+
+ MetricFamily metricFamily1 =
metricFamilyMap.get("jvm_gc_pause_seconds_sum");
+ assertEquals("jvm_gc_pause_seconds_sum", metricFamily1.getName());
+ assertEquals(0.139, metricFamily1.getMetricList().get(0).getValue());
+ }
+
+ @Test
+ void testParseMetricsWithMixedLineEndingsIncludingLfCr() throws Exception {
+ String str = "# HELP go_gc_duration_seconds A summary of the pause
duration of garbage collection cycles.\r\n"
+ + "# TYPE go_gc_duration_seconds summary\n"
+ + "jvm_gc_pause_seconds_count 1.0\n\r"
+ + "jvm_gc_pause_seconds_sum{} 0.139\r\n"
+ + "jvm_gc_pause_seconds_max{} 0.139\n";
+
+ InputStream inputStream = new
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+
+ assertNotNull(metricFamilyMap);
+ assertEquals(3, metricFamilyMap.values().size());
+
+ MetricFamily metricFamily =
metricFamilyMap.get("jvm_gc_pause_seconds_count");
+ assertEquals("jvm_gc_pause_seconds_count", metricFamily.getName());
+ assertEquals(1.0, metricFamily.getMetricList().get(0).getValue());
+
+ MetricFamily metricFamily1 =
metricFamilyMap.get("jvm_gc_pause_seconds_sum");
+ assertEquals("jvm_gc_pause_seconds_sum", metricFamily1.getName());
+ assertEquals(0.139, metricFamily1.getMetricList().get(0).getValue());
+
+ MetricFamily metricFamily2 =
metricFamilyMap.get("jvm_gc_pause_seconds_max");
+ assertEquals("jvm_gc_pause_seconds_max", metricFamily2.getName());
+ assertEquals(0.139, metricFamily2.getMetricList().get(0).getValue());
+ }
+
+ @Test
+ void testParseMetricsWithOnlyLfCr() throws Exception {
+ String str = "jvm_gc_pause_seconds_sum 42.0\n\r";
+ InputStream inputStream = new
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+
+ assertNotNull(metricFamilyMap);
+ assertEquals(1, metricFamilyMap.values().size());
+
+ MetricFamily metricFamily =
metricFamilyMap.get("jvm_gc_pause_seconds_sum");
+ assertEquals("jvm_gc_pause_seconds_sum", metricFamily.getName());
+ assertEquals(42.0, metricFamily.getMetricList().get(0).getValue());
+ }
+
+
+ @Test
+ void testParseMetricEscape1() throws Exception {
+ // test escape '\\'
+ String str = "# HELP go_gc_duration_seconds A summary of the pause
duration of garbage collection cycles.\r\n"
+ + "# TYPE go_gc_duration_seconds summary\n"
+ + "windows_service_info{display_name=\"\\\\Application Layer
\nGateway Service\",name=\"alg\\\\\",process_id=\"0\",run_as=\"NT
AUTHORITY\\\\LocalService\"} 1\n";
+ InputStream inputStream = new
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+
+ assertNotNull(metricFamilyMap);
+ assertEquals(1, metricFamilyMap.values().size());
+
+ MetricFamily metricFamily =
metricFamilyMap.get("windows_service_info");
+ assertEquals("windows_service_info", metricFamily.getName());
+ assertNotNull(metricFamily.getMetricList().get(0).getLabels());
+ assertEquals(4,
metricFamily.getMetricList().get(0).getLabels().size());
+ assertEquals(1.0, metricFamily.getMetricList().get(0).getValue());
+
+ assertEquals("display_name",
metricFamily.getMetricList().get(0).getLabels().get(0).getName());
+ assertEquals("\\Application Layer \nGateway Service",
metricFamily.getMetricList().get(0).getLabels().get(0).getValue());
+
+ assertEquals("name",
metricFamily.getMetricList().get(0).getLabels().get(1).getName());
+ assertEquals("alg\\",
metricFamily.getMetricList().get(0).getLabels().get(1).getValue());
+
+ assertEquals("process_id",
metricFamily.getMetricList().get(0).getLabels().get(2).getName());
+ assertEquals("0",
metricFamily.getMetricList().get(0).getLabels().get(2).getValue());
+
+ assertEquals("run_as",
metricFamily.getMetricList().get(0).getLabels().get(3).getName());
+ assertEquals("NT AUTHORITY\\LocalService",
metricFamily.getMetricList().get(0).getLabels().get(3).getValue());
+ }
+
+ @Test
+ void testParseMetricEscape2() throws Exception {
+ // test escape '\"'
+ String str = "# HELP go_gc_duration_seconds A summary of the pause
duration of garbage collection cycles.\r\n"
+ + "# TYPE go_gc_duration_seconds summary\n"
+ + "windows_service_info{display_name=\"\\\"Application Layer
\\\"Gateway Service\",name=\"alg\\\"\",process_id=\"0\",run_as=\"NT
AUTHORITY\\\"LocalService\"} 1\n";
+ InputStream inputStream = new
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+
+ assertNotNull(metricFamilyMap);
+ assertEquals(1, metricFamilyMap.values().size());
+
+ MetricFamily metricFamily =
metricFamilyMap.get("windows_service_info");
+ assertEquals("windows_service_info", metricFamily.getName());
+ assertNotNull(metricFamily.getMetricList().get(0).getLabels());
+ assertEquals(4,
metricFamily.getMetricList().get(0).getLabels().size());
+ assertEquals(1.0, metricFamily.getMetricList().get(0).getValue());
+
+ assertEquals("display_name",
metricFamily.getMetricList().get(0).getLabels().get(0).getName());
+ assertEquals("\"Application Layer \"Gateway Service",
metricFamily.getMetricList().get(0).getLabels().get(0).getValue());
+
+ assertEquals("name",
metricFamily.getMetricList().get(0).getLabels().get(1).getName());
+ assertEquals("alg\"",
metricFamily.getMetricList().get(0).getLabels().get(1).getValue());
+
+ assertEquals("process_id",
metricFamily.getMetricList().get(0).getLabels().get(2).getName());
+ assertEquals("0",
metricFamily.getMetricList().get(0).getLabels().get(2).getValue());
+
+ assertEquals("run_as",
metricFamily.getMetricList().get(0).getLabels().get(3).getName());
+ assertEquals("NT AUTHORITY\"LocalService",
metricFamily.getMetricList().get(0).getLabels().get(3).getValue());
+
+
+ }
+
+ @Test
+ void testParseMetricEscape3() throws Exception {
+ // test escape '\n'
+ String str = "# HELP go_gc_duration_seconds A summary of the pause
duration of garbage collection cycles.\r\n"
+ + "# TYPE go_gc_duration_seconds summary\n"
+ + "windows_service_info{display_name=\"\nApplication Layer
\nGateway Service\",name=\"alg\n\",process_id=\"0\",run_as=\"NT
AUTHORITY\nLocalService\"} 1\n";
+ InputStream inputStream = new
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+
+ Map<String, MetricFamily> metricFamilyMap =
OnlineParser.parseMetrics(inputStream);
+
+ assertNotNull(metricFamilyMap);
+ assertEquals(1, metricFamilyMap.values().size());
+
+ MetricFamily metricFamily =
metricFamilyMap.get("windows_service_info");
+ assertEquals("windows_service_info", metricFamily.getName());
+ assertNotNull(metricFamily.getMetricList().get(0).getLabels());
+ assertEquals(4,
metricFamily.getMetricList().get(0).getLabels().size());
+ assertEquals(1.0, metricFamily.getMetricList().get(0).getValue());
+
+ assertEquals("display_name",
metricFamily.getMetricList().get(0).getLabels().get(0).getName());
+ assertEquals("\nApplication Layer \nGateway Service",
metricFamily.getMetricList().get(0).getLabels().get(0).getValue());
+
+ assertEquals("name",
metricFamily.getMetricList().get(0).getLabels().get(1).getName());
+ assertEquals("alg\n",
metricFamily.getMetricList().get(0).getLabels().get(1).getValue());
+
+ assertEquals("process_id",
metricFamily.getMetricList().get(0).getLabels().get(2).getName());
+ assertEquals("0",
metricFamily.getMetricList().get(0).getLabels().get(2).getValue());
+
+ assertEquals("run_as",
metricFamily.getMetricList().get(0).getLabels().get(3).getName());
+ assertEquals("NT AUTHORITY\nLocalService",
metricFamily.getMetricList().get(0).getLabels().get(3).getValue());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]