This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 28b050c07ff61e13682912b35ba38c28db983e0f
Author: Stefan Richter <[email protected]>
AuthorDate: Mon May 13 14:25:33 2024 +0200

    [FLINK-38353][spans] Add support for nested spans to metrics reporting.
    
    As proposed and described in:
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-483%3A+Add+support+for+children+Spans
---
 .../java/org/apache/flink/traces/SimpleSpan.java   |  15 +-
 .../main/java/org/apache/flink/traces/Span.java    |   7 +-
 .../java/org/apache/flink/traces/SpanBuilder.java  |  21 ++-
 .../traces/otel/OpenTelemetryTraceReporter.java    |   7 +-
 .../otel/OpenTelemetryTraceReporterITCase.java     | 170 +++++++++++++++++++++
 5 files changed, 211 insertions(+), 9 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java
index 8e35e3c43ca..d94c42d2117 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SimpleSpan.java
@@ -20,7 +20,10 @@ package org.apache.flink.traces;
 
 import org.apache.flink.annotation.Internal;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Basic implementation of {@link Span}. */
@@ -31,6 +34,7 @@ public class SimpleSpan implements Span {
     private final String name;
 
     private final HashMap<String, Object> attributes = new HashMap<>();
+    private final List<Span> children = new ArrayList<>();
     private final long startTsMillis;
     private final long endTsMillis;
 
@@ -39,13 +43,15 @@ public class SimpleSpan implements Span {
             String name,
             long startTsMillis,
             long endTsMillis,
-            HashMap<String, Object> attributes) {
+            HashMap<String, Object> attributes,
+            List<Span> children) {
 
         this.scope = scope;
         this.name = name;
         this.startTsMillis = startTsMillis;
         this.endTsMillis = endTsMillis;
         this.attributes.putAll(attributes);
+        this.children.addAll(children);
     }
 
     @Override
@@ -73,6 +79,11 @@ public class SimpleSpan implements Span {
         return attributes;
     }
 
+    @Override
+    public List<Span> getChildren() {
+        return Collections.unmodifiableList(children);
+    }
+
     @Override
     public String toString() {
         return SimpleSpan.class.getSimpleName()
@@ -87,6 +98,8 @@ public class SimpleSpan implements Span {
                 + endTsMillis
                 + ", attributes="
                 + attributes
+                + ", children="
+                + children
                 + "}";
     }
 }
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
index 699e48fd5c4..d530b5c3784 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
@@ -20,14 +20,12 @@ package org.apache.flink.traces;
 
 import org.apache.flink.annotation.Experimental;
 
+import java.util.List;
 import java.util.Map;
 
 /**
  * Span represents something that happened in Flink at certain point of time, 
that will be reported
  * to a {@link org.apache.flink.traces.reporter.TraceReporter}.
- *
- * <p>Currently we don't support traces with multiple spans. Each span is 
self-contained and
- * represents things like a checkpoint or recovery.
  */
 @Experimental
 public interface Span {
@@ -49,4 +47,7 @@ public interface Span {
      * added in the future.
      */
     Map<String, Object> getAttributes();
+
+    /** Returns the child spans (= nested). */
+    List<Span> getChildren();
 }
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
index e2c14738485..5f1c1c35b27 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
@@ -21,14 +21,18 @@ package org.apache.flink.traces;
 import org.apache.flink.AttributeBuilder;
 import org.apache.flink.annotation.Experimental;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /** Builder used to construct {@link Span}. See {@link Span#builder(Class, 
String)}. */
 @Experimental
 public class SpanBuilder implements AttributeBuilder {
     private final HashMap<String, Object> attributes = new HashMap<>();
+    private final List<SpanBuilder> children = new ArrayList<>();
     private final Class<?> classScope;
     private final String name;
     private long startTsMillis;
@@ -68,7 +72,10 @@ public class SpanBuilder implements AttributeBuilder {
                 name,
                 startTsMillisToBuild,
                 endTsMillisToBuild,
-                attributes);
+                attributes,
+                children.stream()
+                        .map(childBuilder -> 
childBuilder.build(additionalVariables))
+                        .collect(Collectors.toList()));
     }
 
     /**
@@ -120,4 +127,16 @@ public class SpanBuilder implements AttributeBuilder {
     public String getName() {
         return name;
     }
+
+    /** Adds child spans (= nested). */
+    public SpanBuilder addChildren(List<SpanBuilder> children) {
+        this.children.addAll(children);
+        return this;
+    }
+
+    /** Adds child span (= nested). */
+    public SpanBuilder addChild(SpanBuilder child) {
+        this.children.add(child);
+        return this;
+    }
 }
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
index 248e54b1aa3..d8798fbec4b 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
@@ -124,10 +124,9 @@ public class OpenTelemetryTraceReporter extends 
OpenTelemetryReporterBase implem
                         .startSpan();
 
         // Recursively add child spans to this parent
-        // TODO: not yet supported
-        // for (Span childSpan : span.getChildren()) {
-        //    notifyOfAddedSpanInternal(childSpan, currentOtelSpan);
-        // }
+        for (Span childSpan : span.getChildren()) {
+            notifyOfAddedSpanInternal(childSpan, currentOtelSpan);
+        }
 
         currentOtelSpan.end(span.getEndTsMillis(), TimeUnit.MILLISECONDS);
     }
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java
index 88b9871787e..7d452a90d5a 100644
--- 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java
@@ -33,7 +33,11 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -125,4 +129,170 @@ public class OpenTelemetryTraceReporterITCase extends 
OpenTelemetryTestBase {
                             });
                 });
     }
+
+    @Test
+    public void testReportNestedSpan() throws Exception {
+        String scope = this.getClass().getCanonicalName();
+
+        String attribute1KeyRoot = "foo";
+        String attribute1ValueRoot = "bar";
+        String attribute2KeyRoot = "<variable>";
+        String attribute2ValueRoot = "value";
+        String spanRoot = "root";
+
+        String spanL1N1 = "1_1";
+        String attribute1KeyL1N1 = "foo_" + spanL1N1;
+        String attribute1ValueL1N1 = "bar_" + spanL1N1;
+
+        String spanL1N2 = "1_2";
+        String attribute1KeyL1N2 = "foo_" + spanL1N2;
+        String attribute1ValueL1N2 = "bar_" + spanL1N2;
+
+        String spanL2N1 = "2_1";
+        String attribute1KeyL2N1 = "foo_" + spanL2N1;
+        String attribute1ValueL2N1 = "bar_" + spanL2N1;
+
+        reporter.open(createMetricConfig());
+        try {
+            SpanBuilder childLeveL2N1 =
+                    Span.builder(this.getClass(), spanL2N1)
+                            .setAttribute(attribute1KeyL2N1, 
attribute1ValueL2N1)
+                            .setStartTsMillis(44)
+                            .setEndTsMillis(46);
+
+            SpanBuilder childL1N1 =
+                    Span.builder(this.getClass(), spanL1N1)
+                            .setAttribute(attribute1KeyL1N1, 
attribute1ValueL1N1)
+                            .setStartTsMillis(43)
+                            .setEndTsMillis(48)
+                            .addChild(childLeveL2N1);
+
+            SpanBuilder childL1N2 =
+                    Span.builder(this.getClass(), spanL1N2)
+                            .setAttribute(attribute1KeyL1N2, 
attribute1ValueL1N2)
+                            .setStartTsMillis(44)
+                            .setEndTsMillis(46);
+
+            SpanBuilder rootSpan =
+                    Span.builder(this.getClass(), spanRoot)
+                            .setAttribute(attribute1KeyRoot, 
attribute1ValueRoot)
+                            .setAttribute(attribute2KeyRoot, 
attribute2ValueRoot)
+                            .setStartTsMillis(42)
+                            .setEndTsMillis(64)
+                            .addChildren(Arrays.asList(childL1N1, childL1N2));
+
+            reporter.notifyOfAddedSpan(rootSpan.build());
+        } finally {
+            reporter.close();
+        }
+
+        eventuallyConsumeJson(
+                (json) -> {
+                    JsonNode scopeSpans = 
json.findPath("resourceSpans").findPath("scopeSpans");
+                    
assertThat(scopeSpans.findPath("scope").findPath("name").asText())
+                            .isEqualTo(scope);
+                    JsonNode spans = scopeSpans.findPath("spans");
+
+                    Map<String, ActualSpan> actualSpanSummaries = 
convertToSummaries(spans);
+
+                    assertThat(actualSpanSummaries.keySet())
+                            .containsExactlyInAnyOrder(spanRoot, spanL1N1, 
spanL1N2, spanL2N1);
+
+                    ActualSpan root = actualSpanSummaries.get(spanRoot);
+                    ActualSpan l1n1 = actualSpanSummaries.get(spanL1N1);
+                    ActualSpan l1n2 = actualSpanSummaries.get(spanL1N2);
+                    ActualSpan l2n1 = actualSpanSummaries.get(spanL2N1);
+
+                    assertThat(root.parentSpanId).isEmpty();
+                    assertThat(root.attributes)
+                            .containsEntry(attribute1KeyRoot, 
attribute1ValueRoot);
+                    assertThat(root.attributes)
+                            .containsEntry(
+                                    
VariableNameUtil.getVariableName(attribute2KeyRoot),
+                                    attribute2ValueRoot);
+                    assertThat(l1n1.attributes)
+                            .containsEntry(attribute1KeyL1N1, 
attribute1ValueL1N1);
+                    assertThat(l1n2.attributes)
+                            .containsEntry(attribute1KeyL1N2, 
attribute1ValueL1N2);
+                    assertThat(l2n1.attributes)
+                            .containsEntry(attribute1KeyL2N1, 
attribute1ValueL2N1);
+
+                    assertThat(root.traceId).isEqualTo(l1n1.traceId);
+                    assertThat(root.traceId).isEqualTo(l1n2.traceId);
+                    assertThat(root.traceId).isEqualTo(l2n1.traceId);
+                    assertThat(root.spanId).isNotEmpty();
+                    assertThat(root.spanId).isEqualTo(l1n1.parentSpanId);
+                    assertThat(root.spanId).isEqualTo(l1n2.parentSpanId);
+
+                    assertThat(root.children).containsExactlyInAnyOrder(l1n1, 
l1n2);
+                    assertThat(l1n1.children).containsExactlyInAnyOrder(l2n1);
+                    assertThat(l1n2.children).isEmpty();
+                    assertThat(l2n1.children).isEmpty();
+                });
+    }
+
+    private Map<String, ActualSpan> convertToSummaries(JsonNode spans) {
+        Map<String, ActualSpan> spanIdToSpan = new HashMap<>();
+        for (int i = 0; spans.get(i) != null; i++) {
+            ActualSpan actualSpan = convertToActualSpan(spans.get(i));
+            spanIdToSpan.put(actualSpan.spanId, actualSpan);
+        }
+
+        Map<String, ActualSpan> nameToSpan = new HashMap<>();
+
+        spanIdToSpan.forEach(
+                (spanId, actualSpan) -> {
+                    if (!actualSpan.parentSpanId.isEmpty()) {
+                        ActualSpan parentSpan = 
spanIdToSpan.get(actualSpan.parentSpanId);
+                        parentSpan.addChild(actualSpan);
+                    }
+                    nameToSpan.put(actualSpan.name, actualSpan);
+                });
+
+        return nameToSpan;
+    }
+
+    private ActualSpan convertToActualSpan(JsonNode span) {
+        String name = span.findPath("name").asText();
+        String traceId = span.findPath("traceId").asText();
+        String spanId = span.findPath("spanId").asText();
+        String parentSpanId = span.findPath("parentSpanId").asText();
+
+        Map<String, String> attributeMap = new HashMap<>();
+        JsonNode attributes = span.findPath("attributes");
+
+        for (int j = 0; attributes.get(j) != null; j++) {
+            JsonNode attribute = attributes.get(j);
+            String key = attribute.get("key").asText();
+            String value = attribute.at("/value/stringValue").asText();
+            attributeMap.put(key, value);
+        }
+        return new ActualSpan(traceId, spanId, name, parentSpanId, 
attributeMap);
+    }
+
+    private static class ActualSpan {
+        final String traceId;
+        final String spanId;
+        final String name;
+        final String parentSpanId;
+        final Map<String, String> attributes;
+        final List<ActualSpan> children = new ArrayList<>();
+
+        public ActualSpan(
+                String traceId,
+                String spanId,
+                String name,
+                String parentSpanId,
+                Map<String, String> attributes) {
+            this.traceId = traceId;
+            this.spanId = spanId;
+            this.name = name;
+            this.parentSpanId = parentSpanId;
+            this.attributes = attributes;
+        }
+
+        public void addChild(ActualSpan child) {
+            this.children.add(child);
+        }
+    }
 }

Reply via email to