phet commented on code in PR #3962:
URL: https://github.com/apache/gobblin/pull/3962#discussion_r1628818301


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc:
##########
@@ -0,0 +1,82 @@
+{
+  "type": "record",
+  "name": "GaaSFlowObservabilityEvent",
+  "namespace": "org.apache.gobblin.metrics",
+  "doc": "An event schema for GaaS to emit during and after a flow is 
executed.",

Review Comment:
   please clarify under what circumstances we anticipate emitting the event 
*during* flow execution



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc:
##########
@@ -0,0 +1,82 @@
+{
+  "type": "record",
+  "name": "GaaSFlowObservabilityEvent",
+  "namespace": "org.apache.gobblin.metrics",
+  "doc": "An event schema for GaaS to emit during and after a flow is 
executed.",
+  "fields": [
+    {
+      "name": "eventTimestamp",
+      "type": "long",
+      "doc": "Time at which event was created in milliseconds from Unix Epoch"
+    },
+    {
+      "name": "flowGroup",
+      "type": "string",
+      "doc": "Flow group for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowName",
+      "type": "string",
+      "doc": "Flow name for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowExecutionId",
+      "type": "long",
+      "doc": "Flow execution id for the GaaS flow",
+      "compliance": "NONE"
+    },
+    {
+      "name": "lastFlowModificationTimestamp",
+      "type": "long",
+      "doc": "Timestamp in millis since Epoch when the flow config was last 
modified"
+    },
+    {
+      "name": "sourceNode",
+      "type": "string",
+      "doc": "Source node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "destinationNode",
+      "type": "string",
+      "doc": "Destination node for the flow edge",
+      "compliance": "NONE"
+    },
+    {
+      "name": "flowStatus",
+      "type": {
+        "type": "enum",
+        "name": "FlowStatus",
+        "symbols": [
+          "SUCCEEDED",
+          "COMPILATION_FAILURE",
+          "SUBMISSION_FAILURE",
+          "EXECUTION_FAILURE",
+          "CANCELLED"
+        ],
+        "doc": "Final flow status for the GaaS flow",
+        "compliance": "NONE"
+      }
+    },
+    {
+      "name": "effectiveUserUrn",
+      "type": [
+        "null",
+        "string"
+      ],
+      "doc": "User URN (if applicable) whose identity was used to run the 
underlying Gobblin job e.g. myGroup",
+      "compliance": "NONE"
+    },
+    {
+      "name": "gaasId",
+      "type": [
+        "null",
+        "string"
+      ],
+      "default": null,
+      "doc": "The deployment ID of GaaS that is sending the event (if multiple 
GaaS instances are running)"
+    }
+  ]

Review Comment:
   any flow start/end timestamp?
   
   also, what about `flowProperties` - e.g. those supplied at 
flow-definition-time?



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -619,6 +621,64 @@ public void testObservabilityEventSingleEmission() throws 
IOException, Reflectiv
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+  public void testObservabilityEventFlowLevel() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobSucceededEvent(),
+        createFlowSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent

Review Comment:
   GaaSFlowObservabilityEvent



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -69,6 +71,7 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
   public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSJobObservabilityEventProducer.";
   public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
   public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = 
NoopGaaSJobObservabilityEventProducer.class.getName();
+  public static final String EMIT_FLOW_OBSERVABILITY_EVENT = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "emitFlowObservabilityEvent";

Review Comment:
   NBD, but the prefix doesn't align well w/ the flow-level name



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java:
##########
@@ -195,6 +197,48 @@ public void 
testCreateGaaSObservabilityEventWithPartialMetadata() throws Excepti
     serializer.serializeRecord(event);
   }
 
+  @Test
+  public void testCreateGaaSObservabilityFlowEvent() throws Exception {
+    String flowGroup = "testFlowGroup3";
+    String flowName = "testFlowName3";
+    String jobName = JobStatusRetriever.NA_KEY;
+    String flowExecutionId = "1";
+    this.issueRepository.put(
+        TroubleshooterUtils.getContextIdForJob(flowGroup, flowName, 
flowExecutionId, jobName),
+        createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+    );
+    MockGaaSJobObservabilityEventProducer
+        producer = new MockGaaSJobObservabilityEventProducer(new State(), 
this.issueRepository, false);

Review Comment:
   FSR, I thought that empty `State()` would adopt the default of `false`, 
which would be to emit a job-schema event for the flow-level (w/ `"NA"` values 
for some fields).  I thought explicit enablement would be needed to actually 
convert to and produce a flow-schema event



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to