This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 28b050c07ff61e13682912b35ba38c28db983e0f Author: Stefan Richter <[email protected]> AuthorDate: Mon May 13 14:25:33 2024 +0200 [FLINK-38353][spans] Add support for nested spans to metrics reporting. As proposed and described in: https://cwiki.apache.org/confluence/display/FLINK/FLIP-483%3A+Add+support+for+children+Spans --- .../java/org/apache/flink/traces/SimpleSpan.java | 15 +- .../main/java/org/apache/flink/traces/Span.java | 7 +- .../java/org/apache/flink/traces/SpanBuilder.java | 21 ++- .../traces/otel/OpenTelemetryTraceReporter.java | 7 +- .../otel/OpenTelemetryTraceReporterITCase.java | 170 +++++++++++++++++++++ 5 files changed, 211 insertions(+), 9 deletions(-) diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java index 8e35e3c43ca..d94c42d2117 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java @@ -20,7 +20,10 @@ package org.apache.flink.traces; import org.apache.flink.annotation.Internal; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** Basic implementation of {@link Span}. */ @@ -31,6 +34,7 @@ public class SimpleSpan implements Span { private final String name; private final HashMap<String, Object> attributes = new HashMap<>(); + private final List<Span> children = new ArrayList<>(); private final long startTsMillis; private final long endTsMillis; @@ -39,13 +43,15 @@ public class SimpleSpan implements Span { String name, long startTsMillis, long endTsMillis, - HashMap<String, Object> attributes) { + HashMap<String, Object> attributes, + List<Span> children) { this.scope = scope; this.name = name; this.startTsMillis = startTsMillis; this.endTsMillis = endTsMillis; this.attributes.putAll(attributes); + this.children.addAll(children); } @Override @@ -73,6 +79,11 @@ public class SimpleSpan implements Span { return attributes; } + @Override + public List<Span> getChildren() { + return Collections.unmodifiableList(children); + } + @Override public String toString() { return SimpleSpan.class.getSimpleName() @@ -87,6 +98,8 @@ public class SimpleSpan implements Span { + endTsMillis + ", attributes=" + attributes + + ", children=" + + children + "}"; } } diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java index 699e48fd5c4..d530b5c3784 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java @@ -20,14 +20,12 @@ package org.apache.flink.traces; import org.apache.flink.annotation.Experimental; +import java.util.List; import java.util.Map; /** * Span represents something that happened in Flink at certain point of time, that will be reported * to a {@link org.apache.flink.traces.reporter.TraceReporter}. - * - * <p>Currently we don't support traces with multiple spans. Each span is self-contained and - * represents things like a checkpoint or recovery. */ @Experimental public interface Span { @@ -49,4 +47,7 @@ public interface Span { * added in the future. */ Map<String, Object> getAttributes(); + + /** Returns the child spans (= nested). */ + List<Span> getChildren(); } diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java index e2c14738485..5f1c1c35b27 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java @@ -21,14 +21,18 @@ package org.apache.flink.traces; import org.apache.flink.AttributeBuilder; import org.apache.flink.annotation.Experimental; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** Builder used to construct {@link Span}. See {@link Span#builder(Class, String)}. */ @Experimental public class SpanBuilder implements AttributeBuilder { private final HashMap<String, Object> attributes = new HashMap<>(); + private final List<SpanBuilder> children = new ArrayList<>(); private final Class<?> classScope; private final String name; private long startTsMillis; @@ -68,7 +72,10 @@ public class SpanBuilder implements AttributeBuilder { name, startTsMillisToBuild, endTsMillisToBuild, - attributes); + attributes, + children.stream() + .map(childBuilder -> childBuilder.build(additionalVariables)) + .collect(Collectors.toList())); } /** @@ -120,4 +127,16 @@ public class SpanBuilder implements AttributeBuilder { public String getName() { return name; } + + /** Adds child spans (= nested). */ + public SpanBuilder addChildren(List<SpanBuilder> children) { + this.children.addAll(children); + return this; + } + + /** Adds child span (= nested). */ + public SpanBuilder addChild(SpanBuilder child) { + this.children.add(child); + return this; + } } diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java index 248e54b1aa3..d8798fbec4b 100644 --- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java +++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java @@ -124,10 +124,9 @@ public class OpenTelemetryTraceReporter extends OpenTelemetryReporterBase implem .startSpan(); // Recursively add child spans to this parent - // TODO: not yet supported - // for (Span childSpan : span.getChildren()) { - // notifyOfAddedSpanInternal(childSpan, currentOtelSpan); - // } + for (Span childSpan : span.getChildren()) { + notifyOfAddedSpanInternal(childSpan, currentOtelSpan); + } currentOtelSpan.end(span.getEndTsMillis(), TimeUnit.MILLISECONDS); } diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java index 88b9871787e..7d452a90d5a 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java @@ -33,7 +33,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -125,4 +129,170 @@ public class OpenTelemetryTraceReporterITCase extends OpenTelemetryTestBase { }); }); } + + @Test + public void testReportNestedSpan() throws Exception { + String scope = this.getClass().getCanonicalName(); + + String attribute1KeyRoot = "foo"; + String attribute1ValueRoot = "bar"; + String attribute2KeyRoot = "<variable>"; + String attribute2ValueRoot = "value"; + String spanRoot = "root"; + + String spanL1N1 = "1_1"; + String attribute1KeyL1N1 = "foo_" + spanL1N1; + String attribute1ValueL1N1 = "bar_" + spanL1N1; + + String spanL1N2 = "1_2"; + String attribute1KeyL1N2 = "foo_" + spanL1N2; + String attribute1ValueL1N2 = "bar_" + spanL1N2; + + String spanL2N1 = "2_1"; + String attribute1KeyL2N1 = "foo_" + spanL2N1; + String attribute1ValueL2N1 = "bar_" + spanL2N1; + + reporter.open(createMetricConfig()); + try { + SpanBuilder childLeveL2N1 = + Span.builder(this.getClass(), spanL2N1) + .setAttribute(attribute1KeyL2N1, attribute1ValueL2N1) + .setStartTsMillis(44) + .setEndTsMillis(46); + + SpanBuilder childL1N1 = + Span.builder(this.getClass(), spanL1N1) + .setAttribute(attribute1KeyL1N1, attribute1ValueL1N1) + .setStartTsMillis(43) + .setEndTsMillis(48) + .addChild(childLeveL2N1); + + SpanBuilder childL1N2 = + Span.builder(this.getClass(), spanL1N2) + .setAttribute(attribute1KeyL1N2, attribute1ValueL1N2) + .setStartTsMillis(44) + .setEndTsMillis(46); + + SpanBuilder rootSpan = + Span.builder(this.getClass(), spanRoot) + .setAttribute(attribute1KeyRoot, attribute1ValueRoot) + .setAttribute(attribute2KeyRoot, attribute2ValueRoot) + .setStartTsMillis(42) + .setEndTsMillis(64) + .addChildren(Arrays.asList(childL1N1, childL1N2)); + + reporter.notifyOfAddedSpan(rootSpan.build()); + } finally { + reporter.close(); + } + + eventuallyConsumeJson( + (json) -> { + JsonNode scopeSpans = json.findPath("resourceSpans").findPath("scopeSpans"); + assertThat(scopeSpans.findPath("scope").findPath("name").asText()) + .isEqualTo(scope); + JsonNode spans = scopeSpans.findPath("spans"); + + Map<String, ActualSpan> actualSpanSummaries = convertToSummaries(spans); + + assertThat(actualSpanSummaries.keySet()) + .containsExactlyInAnyOrder(spanRoot, spanL1N1, spanL1N2, spanL2N1); + + ActualSpan root = actualSpanSummaries.get(spanRoot); + ActualSpan l1n1 = actualSpanSummaries.get(spanL1N1); + ActualSpan l1n2 = actualSpanSummaries.get(spanL1N2); + ActualSpan l2n1 = actualSpanSummaries.get(spanL2N1); + + assertThat(root.parentSpanId).isEmpty(); + assertThat(root.attributes) + .containsEntry(attribute1KeyRoot, attribute1ValueRoot); + assertThat(root.attributes) + .containsEntry( + VariableNameUtil.getVariableName(attribute2KeyRoot), + attribute2ValueRoot); + assertThat(l1n1.attributes) + .containsEntry(attribute1KeyL1N1, attribute1ValueL1N1); + assertThat(l1n2.attributes) + .containsEntry(attribute1KeyL1N2, attribute1ValueL1N2); + assertThat(l2n1.attributes) + .containsEntry(attribute1KeyL2N1, attribute1ValueL2N1); + + assertThat(root.traceId).isEqualTo(l1n1.traceId); + assertThat(root.traceId).isEqualTo(l1n2.traceId); + assertThat(root.traceId).isEqualTo(l2n1.traceId); + assertThat(root.spanId).isNotEmpty(); + assertThat(root.spanId).isEqualTo(l1n1.parentSpanId); + assertThat(root.spanId).isEqualTo(l1n2.parentSpanId); + + assertThat(root.children).containsExactlyInAnyOrder(l1n1, l1n2); + assertThat(l1n1.children).containsExactlyInAnyOrder(l2n1); + assertThat(l1n2.children).isEmpty(); + assertThat(l2n1.children).isEmpty(); + }); + } + + private Map<String, ActualSpan> convertToSummaries(JsonNode spans) { + Map<String, ActualSpan> spanIdToSpan = new HashMap<>(); + for (int i = 0; spans.get(i) != null; i++) { + ActualSpan actualSpan = convertToActualSpan(spans.get(i)); + spanIdToSpan.put(actualSpan.spanId, actualSpan); + } + + Map<String, ActualSpan> nameToSpan = new HashMap<>(); + + spanIdToSpan.forEach( + (spanId, actualSpan) -> { + if (!actualSpan.parentSpanId.isEmpty()) { + ActualSpan parentSpan = spanIdToSpan.get(actualSpan.parentSpanId); + parentSpan.addChild(actualSpan); + } + nameToSpan.put(actualSpan.name, actualSpan); + }); + + return nameToSpan; + } + + private ActualSpan convertToActualSpan(JsonNode span) { + String name = span.findPath("name").asText(); + String traceId = span.findPath("traceId").asText(); + String spanId = span.findPath("spanId").asText(); + String parentSpanId = span.findPath("parentSpanId").asText(); + + Map<String, String> attributeMap = new HashMap<>(); + JsonNode attributes = span.findPath("attributes"); + + for (int j = 0; attributes.get(j) != null; j++) { + JsonNode attribute = attributes.get(j); + String key = attribute.get("key").asText(); + String value = attribute.at("/value/stringValue").asText(); + attributeMap.put(key, value); + } + return new ActualSpan(traceId, spanId, name, parentSpanId, attributeMap); + } + + private static class ActualSpan { + final String traceId; + final String spanId; + final String name; + final String parentSpanId; + final Map<String, String> attributes; + final List<ActualSpan> children = new ArrayList<>(); + + public ActualSpan( + String traceId, + String spanId, + String name, + String parentSpanId, + Map<String, String> attributes) { + this.traceId = traceId; + this.spanId = spanId; + this.name = name; + this.parentSpanId = parentSpanId; + this.attributes = attributes; + } + + public void addChild(ActualSpan child) { + this.children.add(child); + } + } }
