phet commented on code in PR #3962:
URL: https://github.com/apache/gobblin/pull/3962#discussion_r1628818301
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc:
##########
@@ -0,0 +1,82 @@
+{
+ "type": "record",
+ "name": "GaaSFlowObservabilityEvent",
+ "namespace": "org.apache.gobblin.metrics",
+ "doc": "An event schema for GaaS to emit during and after a flow is
executed.",
Review Comment:
please clarify under what circumstances we anticipate emitting the event
*during* flow execution
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GaaSFlowObservabilityEvent.avsc:
##########
@@ -0,0 +1,82 @@
+{
+ "type": "record",
+ "name": "GaaSFlowObservabilityEvent",
+ "namespace": "org.apache.gobblin.metrics",
+ "doc": "An event schema for GaaS to emit during and after a flow is
executed.",
+ "fields": [
+ {
+ "name": "eventTimestamp",
+ "type": "long",
+ "doc": "Time at which event was created in milliseconds from Unix Epoch"
+ },
+ {
+ "name": "flowGroup",
+ "type": "string",
+ "doc": "Flow group for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowName",
+ "type": "string",
+ "doc": "Flow name for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowExecutionId",
+ "type": "long",
+ "doc": "Flow execution id for the GaaS flow",
+ "compliance": "NONE"
+ },
+ {
+ "name": "lastFlowModificationTimestamp",
+ "type": "long",
+ "doc": "Timestamp in millis since Epoch when the flow config was last
modified"
+ },
+ {
+ "name": "sourceNode",
+ "type": "string",
+ "doc": "Source node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "destinationNode",
+ "type": "string",
+ "doc": "Destination node for the flow edge",
+ "compliance": "NONE"
+ },
+ {
+ "name": "flowStatus",
+ "type": {
+ "type": "enum",
+ "name": "FlowStatus",
+ "symbols": [
+ "SUCCEEDED",
+ "COMPILATION_FAILURE",
+ "SUBMISSION_FAILURE",
+ "EXECUTION_FAILURE",
+ "CANCELLED"
+ ],
+ "doc": "Final flow status for the GaaS flow",
+ "compliance": "NONE"
+ }
+ },
+ {
+ "name": "effectiveUserUrn",
+ "type": [
+ "null",
+ "string"
+ ],
+ "doc": "User URN (if applicable) whose identity was used to run the
underlying Gobblin job e.g. myGroup",
+ "compliance": "NONE"
+ },
+ {
+ "name": "gaasId",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null,
+ "doc": "The deployment ID of GaaS that is sending the event (if multiple
GaaS instances are running)"
+ }
+ ]
Review Comment:
any flow start/end timestamp?
also, what about `flowProperties` - e.g. those supplied at
flow-definition-time?
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -619,6 +621,64 @@ public void testObservabilityEventSingleEmission() throws
IOException, Reflectiv
jobStatusMonitor.shutDown();
}
+ @Test (dependsOnMethods = "testObservabilityEventSingleEmission")
+ public void testObservabilityEventFlowLevel() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic7");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobSucceededEvent(),
+ createFlowSucceededEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ MultiContextIssueRepository issueRepository = new
InMemoryMultiContextIssueRepository();
+ MockGaaSJobObservabilityEventProducer mockEventProducer = new
MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
+ issueRepository, false);
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor =
createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false),
ConfigFactory.empty(),
+ mockEventProducer);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ State state = getNextJobStatusState(jobStatusMonitor, recordIterator,
"NA", "NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+
+ state = getNextJobStatusState(jobStatusMonitor, recordIterator, "NA",
"NA");
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPLETE.name());
+
+ // Only the COMPLETE event should create a GaaSJobObservabilityEvent
+ List<GaaSJobObservabilityEvent> emittedEvents =
mockEventProducer.getTestEmittedJobEvents();
+ Assert.assertEquals(emittedEvents.size(), 1);
+ Iterator<GaaSJobObservabilityEvent> iterator = emittedEvents.iterator();
+ GaaSJobObservabilityEvent event1 = iterator.next();
+ Assert.assertEquals(event1.getJobStatus(), JobStatus.SUCCEEDED);
+ Assert.assertEquals(event1.getFlowName(), this.flowName);
+ Assert.assertEquals(event1.getFlowGroup(), this.flowGroup);
+
+ // Only the COMPLETE event should create a GaaSJobObservabilityEvent
Review Comment:
GaaSFlowObservabilityEvent
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -69,6 +71,7 @@ public abstract class GaaSJobObservabilityEventProducer
implements Closeable {
public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSJobObservabilityEventProducer.";
public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS =
NoopGaaSJobObservabilityEventProducer.class.getName();
+ public static final String EMIT_FLOW_OBSERVABILITY_EVENT =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "emitFlowObservabilityEvent";
Review Comment:
NBD, but the prefix doesn't align well w/ the flow-level name
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java:
##########
@@ -195,6 +197,48 @@ public void
testCreateGaaSObservabilityEventWithPartialMetadata() throws Excepti
serializer.serializeRecord(event);
}
+ @Test
+ public void testCreateGaaSObservabilityFlowEvent() throws Exception {
+ String flowGroup = "testFlowGroup3";
+ String flowName = "testFlowName3";
+ String jobName = JobStatusRetriever.NA_KEY;
+ String flowExecutionId = "1";
+ this.issueRepository.put(
+ TroubleshooterUtils.getContextIdForJob(flowGroup, flowName,
flowExecutionId, jobName),
+ createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
+ );
+ MockGaaSJobObservabilityEventProducer
+ producer = new MockGaaSJobObservabilityEventProducer(new State(),
this.issueRepository, false);
Review Comment:
FSR, I thought that empty `State()` would adopt the default of `false`,
which would be to emit a job-schema event for the flow-level (w/ `"NA"` values
for some fields). I thought explicit enablement would be needed to actually
convert to and produce a flow-schema event
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]