umustafi commented on code in PR #3962:
URL: https://github.com/apache/gobblin/pull/3962#discussion_r1695933619


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -713,6 +715,126 @@ public void testObservabilityEventSingleEmission() throws 
IOException, Reflectiv
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+  public void testObservabilityEventFlowLevel() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobSucceededEvent(),
+        createFlowSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    State producerState = new State();
+    
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
 "true");
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(producerState,
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer, mock(DagManagementStateStore.class));
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");

Review Comment:
   is getNextJobStatusState used for flow and job with NA as default for a flow 
level event? Is the NA value codified somewhere that we can reference here?



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -713,6 +715,126 @@ public void testObservabilityEventSingleEmission() throws 
IOException, Reflectiv
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+  public void testObservabilityEventFlowLevel() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobSucceededEvent(),
+        createFlowSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    State producerState = new State();
+    
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
 "true");
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(producerState,
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer, mock(DagManagementStateStore.class));
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+    // Only the COMPLETE event should create a GaaSFlowObservabilityEvent
+    List<GaaSFlowObservabilityEvent> emittedFlowEvents = 
mockEventProducer.getTestEmittedFlowEvents();
+    Assert.assertEquals(emittedFlowEvents.size(), 1);
+    Iterator<GaaSFlowObservabilityEvent> flowIterator = 
emittedFlowEvents.iterator();
+    GaaSFlowObservabilityEvent flowEvent = flowIterator.next();
+    Assert.assertEquals(flowEvent.getFlowStatus(), FlowStatus.SUCCEEDED);
+    Assert.assertEquals(flowEvent.getFlowName(), this.flowName);
+    Assert.assertEquals(flowEvent.getFlowGroup(), this.flowGroup);
+
+    jobStatusMonitor.shutDown();
+  }
+
+  @Test (dependsOnMethods = "testObservabilityEventFlowLevel")
+  public void testObservabilityEventFlowFailed() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobFailedEvent(),
+        createFlowFailedEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    State producerState = new State();
+    
producerState.setProp(GaaSJobObservabilityEventProducer.EMIT_FLOW_OBSERVABILITY_EVENT,
 "true");
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(producerState,
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer, mock(DagManagementStateStore.class));
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.FAILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.FAILED.name());
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+    List<GaaSJobObservabilityEvent> emittedEvents = 
mockEventProducer.getTestEmittedJobEvents();
+    Assert.assertEquals(emittedEvents.size(), 1);
+    Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+    GaaSJobObservabilityEvent event1 = iterator.next();
+    Assert.assertEquals(event1.getJobStatus(), JobStatus.EXECUTION_FAILURE);
+    Assert.assertEquals(event1.getFlowName(), this.flowName);
+    Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+    // Only the COMPLETE event should create a GaaSFlowObservabilityEvent

Review Comment:
   update the comment here and on 817 to say "FAILED"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -81,12 +85,14 @@ public abstract class GaaSJobObservabilityEventProducer 
implements Closeable {
   protected ObservableLongMeasurement jobStatusMetric;
   protected MultiContextIssueRepository issueRepository;
   protected boolean instrumentationEnabled;
+  protected boolean emitFlowObservabilityEvent;
   ContextAwareMeter getIssuesFailedMeter;
 
   public GaaSJobObservabilityEventProducer(State state, 
MultiContextIssueRepository issueRepository, boolean instrumentationEnabled) {
     this.state = state;
     this.issueRepository = issueRepository;
     this.instrumentationEnabled = instrumentationEnabled;
+    this.emitFlowObservabilityEvent = 
this.state.getPropAsBoolean(EMIT_FLOW_OBSERVABILITY_EVENT, false);

Review Comment:
   do we not want these emitted by default? are job level events always emitted



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -123,7 +129,11 @@ private void setupMetrics(State state) {
 
   public void emitObservabilityEvent(final State jobState) {
     GaaSJobObservabilityEvent event = createGaaSObservabilityEvent(jobState);
-    sendUnderlyingEvent(event);
+    if 
(jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD).equals(JobStatusRetriever.NA_KEY)
 && this.emitFlowObservabilityEvent) {

Review Comment:
   why do we only emit flow type events in some cases? is this for backwards 
compatibility? what's the harm in emitting FlowLevel events going forward?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to