phet commented on code in PR #3962:
URL: https://github.com/apache/gobblin/pull/3962#discussion_r1693006907


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -713,6 +715,64 @@ public void testObservabilityEventSingleEmission() throws 
IOException, Reflectiv
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+  public void testObservabilityEventFlowLevel() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic7");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobSucceededEvent(),
+        createFlowSucceededEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    MultiContextIssueRepository issueRepository = new 
InMemoryMultiContextIssueRepository();
+    MockGaaSJobObservabilityEventProducer mockEventProducer = new 
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
+        issueRepository, false);
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = 
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), 
ConfigFactory.empty(),
+        mockEventProducer, mock(DagManagementStateStore.class));
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord<byte[], byte[]>> recordIterator = 
Iterators.transform(
+        this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    State state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
"NA", "NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA", 
"NA");
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPLETE.name());
+
+    // Only the COMPLETE event should create a GaaSJobObservabilityEvent

Review Comment:
   just wondering... should we verify an event is even emitted when the job 
fails or is cancelled, not just when successful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to