phet commented on code in PR #3940:
URL: https://github.com/apache/gobblin/pull/3940#discussion_r1591942093
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MockGaaSObservabilityEventProducer.java:
##########
Review Comment:
shouldn't we rename, as we'll expect another such mock producer for
flow-level obs. events too, no?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java:
##########
@@ -117,77 +122,93 @@ private void setupMetrics(State state) {
}
public void emitObservabilityEvent(final State jobState) {
- GaaSObservabilityEventExperimental event =
createGaaSObservabilityEvent(jobState);
+ GaaSJobObservabilityEvent event = createGaaSObservabilityEvent(jobState);
sendUnderlyingEvent(event);
this.eventCollector.add(event);
}
- public Attributes getEventAttributes(GaaSObservabilityEventExperimental
event) {
+ public Attributes getEventAttributes(GaaSJobObservabilityEvent event) {
Attributes tags =
Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
event.getFlowName())
.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
event.getFlowGroup())
.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName())
.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
event.getFlowExecutionId())
.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
event.getExecutorId())
- .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowGraphEdgeId())
+ .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
event.getFlowEdgeId())
.build();
return tags;
}
/**
- * Emits the GaaSObservabilityEvent with the mechanism that the child class
is built upon e.g. Kafka
+ * Emits the GaaSJobObservabilityEvent with the mechanism that the child
class is built upon e.g. Kafka
* @param event
*/
- abstract protected void
sendUnderlyingEvent(GaaSObservabilityEventExperimental event);
+ abstract protected void sendUnderlyingEvent(GaaSJobObservabilityEvent event);
/**
- * Creates a GaaSObservabilityEvent which is derived from a final GaaS job
pipeline state, which is combination of GTE job states in an ordered fashion
+ * Creates a GaaSJobObservabilityEvent which is derived from a final GaaS
job pipeline state, which is combination of GTE job states in an ordered fashion
* @param jobState
- * @return GaaSObservabilityEvent
+ * @return GaaSJobObservabilityEvent
*/
- private GaaSObservabilityEventExperimental
createGaaSObservabilityEvent(final State jobState) {
+ private GaaSJobObservabilityEvent createGaaSObservabilityEvent(final State
jobState) {
Long jobStartTime = jobState.contains(TimingEvent.JOB_START_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_START_TIME) : null;
Long jobEndTime = jobState.contains(TimingEvent.JOB_END_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_END_TIME) : null;
Long jobOrchestratedTime =
jobState.contains(TimingEvent.JOB_ORCHESTRATED_TIME) ?
jobState.getPropAsLong(TimingEvent.JOB_ORCHESTRATED_TIME) : null;
Long jobPlanningPhaseStartTime =
jobState.contains(TimingEvent.WORKUNIT_PLAN_START_TIME) ?
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_START_TIME) : null;
Long jobPlanningPhaseEndTime =
jobState.contains(TimingEvent.WORKUNIT_PLAN_END_TIME) ?
jobState.getPropAsLong(TimingEvent.WORKUNIT_PLAN_END_TIME) : null;
+ Properties jobProperties = new Properties();
+ try {
+ jobProperties =
PropertiesUtils.deserialize(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY,
""));
+ } catch (IOException e) {
Review Comment:
do we have the flowGroup+Name on hand so we could include them in the error
msg?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/NoopGaaSObservabilityEventProducer.java:
##########
Review Comment:
same thought about renaming to explicitly identify as job-level
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]