phet commented on code in PR #3983:
URL: https://github.com/apache/gobblin/pull/3983#discussion_r1676177633
##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -80,4 +80,27 @@ public class ServiceMetricNames {
public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
public static final String DAG_PROCESSING_EXCEPTION_METER =
"DagProcessingException";
+ /* DagProcessingEngine & Multi-active Execution Related Metrics
+ * Note: metrics ending with the delimiter '.' will be suffixed by the
specific {@link DagActionType} type for finer
+ * grained monitoring of each dagAction type in addition to the aggregation
of all types.
+ */
+ public static final String DAG_PROCESSING_ENGINE_PREFIX =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "DagProcEngine.";
+ public static final String DAG_ACTIONS_STORED = DAG_PROCESSING_ENGINE_PREFIX
+ "dagActionsStored.";
+ public static final String DAG_ACTIONS_OBSERVED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsObserved.";
+ public static final String DAG_ACTIONS_LEASES_OBTAINED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsLeasesObtained.";
+ public static final String DAG_ACTIONS_NO_LONGER_LEASING =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsNoLongerLeasing.";
+ public static final String DAG_ACTION_LEASE_REMINDERS_SCHEDULED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionLeaseRemindersScheduled.";
+ public static final String DAG_ACTION_REMINDERS_PROCESSED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionRemindersProcessed.";
+ // TODO: implement dropping reminder event after exceed some time
+ public static final String DAG_ACTIONS_EXCEEDED_MAX_RETRY =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsExceededMaxRetry.";
+ public static final String DAG_ACTIONS_INIT_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitFailed.";
+ public static final String DAG_ACTIONS_INIT_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitSucceeded.";
+ public static final String DAG_ACTION_EXECUTIONS_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionExecutionsFailed.";
+ public static final String DAG_ACTION_EXECUTIONS_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionExecutionsSucceeded.";
+ public static final String DAG_ACTION_CONCLUSIONS_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionConclusionsFailed.";
+ public static final String DAG_ACTION_CONCLUSIONS_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionConclusionsSucceeded.";
Review Comment:
for consistency in pluralization, I believe you want
`dagActionsReminderProcessed` and `dagActionsExecutionFailed`, etc.
but, that said, let's parallel method names, where they exist. so for
`DagProc::act`, `DagProc::initialize`, and `DagTask::conclude`, it would be
`dagActions{Initialize,Act,Conclude}{Succeeded,Failed}`
##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -80,4 +80,27 @@ public class ServiceMetricNames {
public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
public static final String DAG_PROCESSING_EXCEPTION_METER =
"DagProcessingException";
+ /* DagProcessingEngine & Multi-active Execution Related Metrics
+ * Note: metrics ending with the delimiter '.' will be suffixed by the
specific {@link DagActionType} type for finer
+ * grained monitoring of each dagAction type in addition to the aggregation
of all types.
+ */
+ public static final String DAG_PROCESSING_ENGINE_PREFIX =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "DagProcEngine.";
+ public static final String DAG_ACTIONS_STORED = DAG_PROCESSING_ENGINE_PREFIX
+ "dagActionsStored.";
+ public static final String DAG_ACTIONS_OBSERVED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsObserved.";
+ public static final String DAG_ACTIONS_LEASES_OBTAINED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsLeasesObtained.";
+ public static final String DAG_ACTIONS_NO_LONGER_LEASING =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsNoLongerLeasing.";
+ public static final String DAG_ACTION_LEASE_REMINDERS_SCHEDULED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionLeaseRemindersScheduled.";
+ public static final String DAG_ACTION_REMINDERS_PROCESSED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionRemindersProcessed.";
+ // TODO: implement dropping reminder event after exceed some time
+ public static final String DAG_ACTIONS_EXCEEDED_MAX_RETRY =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsExceededMaxRetry.";
+ public static final String DAG_ACTIONS_INIT_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitFailed.";
+ public static final String DAG_ACTIONS_INIT_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsInitSucceeded.";
+ public static final String DAG_ACTION_EXECUTIONS_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionExecutionsFailed.";
+ public static final String DAG_ACTION_EXECUTIONS_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionExecutionsSucceeded.";
+ public static final String DAG_ACTION_CONCLUSIONS_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionConclusionsFailed.";
+ public static final String DAG_ACTION_CONCLUSIONS_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionConclusionsSucceeded.";
+ public static final String DAG_ACTIONS_REMOVED_FROM_STORE =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsRemovedFromStore.";
+ public static final String DAG_ACTIONS_FAILING_REMOVAL =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsFailingRemoval.";
Review Comment:
`dagActionsDelete{Succeeded,Failed}`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -64,8 +68,10 @@ protected void handleDagAction(String operation,
DagActionStore.DagAction dagAct
switch (operation) {
case "INSERT":
handleDagAction(dagAction, false);
+
this.dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTIONS_OBSERVED,
dagActionType);
break;
case "UPDATE":
+
this.dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTIONS_OBSERVED,
dagActionType);
Review Comment:
same concern here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -229,16 +233,20 @@ protected void
processMessage(DecodeableKafkaRecord<String, DagActionStoreChange
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
- protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup, String flowName,
- long flowExecutionId, DagActionStore.DagActionType dagActionType) {
+ protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup,
+ String flowName, long flowExecutionId, DagActionStore.DagActionType
dagActionType) {
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
switch (operation) {
case "INSERT":
handleDagAction(dagAction, false);
+
this.dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTIONS_OBSERVED,
+ dagActionType);
break;
case "UPDATE":
+
this.dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTIONS_OBSERVED,
+ dagActionType);
Review Comment:
being as UPDATE is an error (that's already counted by its own metric),
should it really increment the same DPE metric as INSERT?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.ConcurrentMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+@Slf4j
+/**
+ * Used to track all metrics relating to processing dagActions when
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by
time) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl ->
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
+public class DagProcessingEngineMetrics {
+ MetricContext metricContext;
+ // Declare map of dagActionType to a ContextAwareMeter for each metric
+ private ConcurrentMap<String, ContextAwareMeter> dagActionsStoredMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String, ContextAwareMeter>dagActionsObservedMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsLeasesObtainedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsNoLongerLeasingMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionLeaseRemindersScheduledMeters = new
ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionRemindersProcessedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsExceededMaxRetryMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String, ContextAwareMeter>dagActionsInitFailedMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsInitSucceededMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionExecutionsFailedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionExecutionsSucceededMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionConclusionsFailedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionConclusionsSucceededMeters = new
ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsRemovedFromStoreMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsFailingRemovalMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionAverageProcessingDelayMillisMeters = new
ConcurrentHashMap();
+
+ public DagProcessingEngineMetrics(MetricContext metricContext) {
+ this.metricContext = metricContext;
+ registerAllMetrics();
+ }
+
+ public void registerAllMetrics() {
+ registerMetricForEachDagAction(this.dagActionsStoredMeters,
ServiceMetricNames.DAG_ACTIONS_STORED);
+ registerMetricForEachDagAction(this.dagActionsObservedMeters,
ServiceMetricNames.DAG_ACTIONS_OBSERVED);
+ registerMetricForEachDagAction(this.dagActionsLeasesObtainedMeters,
ServiceMetricNames.DAG_ACTIONS_LEASES_OBTAINED);
+ registerMetricForEachDagAction(this.dagActionsNoLongerLeasingMeters,
ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING);
+
registerMetricForEachDagAction(this.dagActionLeaseRemindersScheduledMeters,
ServiceMetricNames.DAG_ACTION_LEASE_REMINDERS_SCHEDULED);
+ registerMetricForEachDagAction(this.dagActionRemindersProcessedMeters,
ServiceMetricNames.DAG_ACTION_REMINDERS_PROCESSED);
+ registerMetricForEachDagAction(this.dagActionsExceededMaxRetryMeters,
ServiceMetricNames.DAG_ACTIONS_EXCEEDED_MAX_RETRY);
+ registerMetricForEachDagAction(this.dagActionsInitFailedMeters,
ServiceMetricNames.DAG_ACTIONS_INIT_FAILED);
+ registerMetricForEachDagAction(this.dagActionsInitSucceededMeters,
ServiceMetricNames.DAG_ACTIONS_INIT_SUCCEEDED);
+ registerMetricForEachDagAction(this.dagActionExecutionsFailedMeters,
ServiceMetricNames.DAG_ACTION_EXECUTIONS_FAILED);
+ registerMetricForEachDagAction(this.dagActionExecutionsSucceededMeters,
ServiceMetricNames.DAG_ACTION_EXECUTIONS_SUCCEEDED);
+ registerMetricForEachDagAction(this.dagActionConclusionsFailedMeters,
ServiceMetricNames.DAG_ACTION_CONCLUSIONS_FAILED);
+ registerMetricForEachDagAction(this.dagActionConclusionsSucceededMeters,
ServiceMetricNames.DAG_ACTION_CONCLUSIONS_SUCCEEDED);
+ }
+
+ /**
+ * Create a meter of each dagActionType for the given metric, register it
with the metric context, and store it in a
+ * concurrent map.
+ * @param metricMap
+ * @param metricName
+ */
+ private void registerMetricForEachDagAction(ConcurrentMap<String,
ContextAwareMeter> metricMap, String metricName) {
+ for (DagActionStore.DagActionType dagActionType :
DagActionStore.DagActionType.values()) {
+ metricMap.put(dagActionType.toString(),
+ this.metricContext.contextAwareMeter(metricName + dagActionType));
+ }
+ }
+
+ /**
+ * Updates the meter corresponding to the metricsName and dagActionType
provided if they match an existing metric
+ * @param metricName
+ * @param dagActionType
+ */
+ public void updateMetricForDagAction(String metricName,
DagActionStore.DagActionType dagActionType) {
+ switch (metricName) {
+ case ServiceMetricNames.DAG_ACTIONS_STORED:
+ updateMetricForDagAction(this.dagActionsStoredMeters, dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTIONS_OBSERVED:
+ updateMetricForDagAction(this.dagActionsObservedMeters, dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTIONS_LEASES_OBTAINED:
+ updateMetricForDagAction(this.dagActionsLeasesObtainedMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING:
+ updateMetricForDagAction(this.dagActionsNoLongerLeasingMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTION_LEASE_REMINDERS_SCHEDULED:
+ updateMetricForDagAction(this.dagActionLeaseRemindersScheduledMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTION_REMINDERS_PROCESSED:
+ updateMetricForDagAction(this.dagActionRemindersProcessedMeters,
dagActionType);
+ break;
+ // TODO: implement evaluating max retries later
+ case ServiceMetricNames.DAG_ACTIONS_EXCEEDED_MAX_RETRY:
+ updateMetricForDagAction(this.dagActionsExceededMaxRetryMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTIONS_INIT_FAILED:
+ updateMetricForDagAction(this.dagActionsInitFailedMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTIONS_INIT_SUCCEEDED:
+ updateMetricForDagAction(this.dagActionsInitSucceededMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTION_EXECUTIONS_FAILED:
+ updateMetricForDagAction(this.dagActionExecutionsFailedMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTION_EXECUTIONS_SUCCEEDED:
+ updateMetricForDagAction(this.dagActionExecutionsSucceededMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTION_CONCLUSIONS_FAILED:
+ updateMetricForDagAction(this.dagActionConclusionsFailedMeters,
dagActionType);
+ break;
+ case ServiceMetricNames.DAG_ACTION_CONCLUSIONS_SUCCEEDED:
+ updateMetricForDagAction(this.dagActionConclusionsSucceededMeters,
dagActionType);
+ break;
+ default:
+ log.warn("Skipping marking metric because no meter found to match it.
Metric name: {}", metricName);
+ break;
+ }
+ }
+
+ /**
+ * Generic helper used to increment a metric corresponding to the
dagActionType in the provided map. It assumes the
+ * meter for each dagActionType can be identified by its name.
+ */
+ private void updateMetricForDagAction(ConcurrentMap<String,
ContextAwareMeter> metricMap,
+ DagActionStore.DagActionType dagActionType) {
+ if (metricMap.containsKey(dagActionType.toString())) {
+ metricMap.get(dagActionType.toString()).mark();
+ } else {
+ log.warn("Skipping metric. No meter exists for dagActionType {} in
metricsMap {}");
+ }
Review Comment:
this feels like a massive "can't happen" situation (given the strong typing
of `dagActionType`. therefore, it either ought to throw an exception or do a
JIT initialization of the `metricMap` for the new type
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.ConcurrentMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+@Slf4j
+/**
+ * Used to track all metrics relating to processing dagActions when
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by
time) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl ->
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
+public class DagProcessingEngineMetrics {
+ MetricContext metricContext;
+ // Declare map of dagActionType to a ContextAwareMeter for each metric
+ private ConcurrentMap<String, ContextAwareMeter> dagActionsStoredMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String, ContextAwareMeter>dagActionsObservedMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsLeasesObtainedMeters = new ConcurrentHashMap();
Review Comment:
nit: all of these are missing a space between `>` and the var name. there
are also too many spaces between `= new`.
finally, naming them `XYZMeters` sounds like they're a list/set/sequence
collection, rather than a mapping.
hence, suggest: `leaseObtainedMeterByDagActionType` or
`initializeSucceededMeterByDagActionType`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -211,6 +221,11 @@ private LeaseAttemptStatus
retrieveLeaseStatus(DagActionStore.LeaseParams leaseP
*/
if (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus)) {
scheduleReminderForEvent(leaseAttemptStatus);
+
this.dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTION_LEASE_REMINDERS_SCHEDULED,
+ leaseParams.getDagAction().getDagActionType());
+ } else {
+
this.dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING,
+ leaseParams.getDagAction().getDagActionType());
Review Comment:
gosh, everyone reaches in to pull out the same `DagActionType`... why not
encapsulate that within a `DagProcessingEngineMetrics` method taking either a
`LeaseParams` or an overall `DagAction`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -132,11 +138,17 @@ public void run() {
DagProc<?> dagProc = dagTask.host(dagProcFactory);
try {
// todo - add retries
- dagProc.process(dagManagementStateStore);
+ dagProc.process(dagManagementStateStore, dagProcEngineMetrics);
dagTask.conclude();
+ // TODO: change this to determine the action type or make this a
generic meter
+
dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTION_CONCLUSIONS_SUCCEEDED,
+ DagActionStore.DagActionType.LAUNCH);
Review Comment:
could you put these two metrics calls within `DagTask::conclude` (since that
should have the `DagAction`)?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -44,18 +46,28 @@ public KillDagProc(KillDagTask killDagTask) {
}
@Override
- protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
- throws IOException {
- return dagManagementStateStore.getDag(getDagId());
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
+ try {
+ Optional<Dag<JobExecutionPlan>> dag =
dagManagementStateStore.getDag(getDagId());
+
dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTIONS_INIT_SUCCEEDED,
+ DagActionStore.DagActionType.KILL);
+ return dag;
+ } catch (Exception e) {
+
dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTIONS_INIT_FAILED,
+ DagActionStore.DagActionType.KILL);
+ throw e;
+ }
Review Comment:
this is far too much "bespoke" code in every one of the derived classes.
let's instead implement within the `DagProc` base class!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -117,6 +122,7 @@ static class DagProcEngineThread implements Runnable {
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
private DagManagementStateStore dagManagementStateStore;
+ private DagProcessingEngineMetrics dagProcEngineMetrics;
Review Comment:
just wondering... would these ever change? why not `private final`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.ConcurrentMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+@Slf4j
+/**
+ * Used to track all metrics relating to processing dagActions when
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by
time) at various points of the system,
Review Comment:
did you mean "broken down by time" or "broken down by type"?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.ConcurrentMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+@Slf4j
+/**
+ * Used to track all metrics relating to processing dagActions when
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by
time) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl ->
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
Review Comment:
the convention I'm familiar w/ is to put javadoc first and then annotations
"closer" to the def. see examples from the official docs -
https://docs.oracle.com/javase/1.5.0/docs/guide/language/annotations.html
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -63,13 +64,16 @@ public DagProc(DagTask dagTask) {
this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
}
- public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
- T state = initialize(dagManagementStateStore); // todo - retry
- act(dagManagementStateStore, state); // todo - retry
+ public final void process(DagManagementStateStore dagManagementStateStore,
+ DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
+ T state = initialize(dagManagementStateStore, dagProcEngineMetrics); //
todo - retry
+ act(dagManagementStateStore, state, dagProcEngineMetrics); // todo -
retry
Review Comment:
these look like the two places to wrap the calls to increment metrics for
`{Initialize,Act}{Succeeded,Failed}`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.ConcurrentMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+@Slf4j
+/**
+ * Used to track all metrics relating to processing dagActions when
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by
time) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl ->
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
+public class DagProcessingEngineMetrics {
+ MetricContext metricContext;
+ // Declare map of dagActionType to a ContextAwareMeter for each metric
+ private ConcurrentMap<String, ContextAwareMeter> dagActionsStoredMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String, ContextAwareMeter>dagActionsObservedMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsLeasesObtainedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsNoLongerLeasingMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionLeaseRemindersScheduledMeters = new
ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionRemindersProcessedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsExceededMaxRetryMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String, ContextAwareMeter>dagActionsInitFailedMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsInitSucceededMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionExecutionsFailedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionExecutionsSucceededMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionConclusionsFailedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionConclusionsSucceededMeters = new
ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsRemovedFromStoreMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsFailingRemovalMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionAverageProcessingDelayMillisMeters = new
ConcurrentHashMap();
+
+ public DagProcessingEngineMetrics(MetricContext metricContext) {
+ this.metricContext = metricContext;
+ registerAllMetrics();
+ }
+
+ public void registerAllMetrics() {
+ registerMetricForEachDagAction(this.dagActionsStoredMeters,
ServiceMetricNames.DAG_ACTIONS_STORED);
+ registerMetricForEachDagAction(this.dagActionsObservedMeters,
ServiceMetricNames.DAG_ACTIONS_OBSERVED);
+ registerMetricForEachDagAction(this.dagActionsLeasesObtainedMeters,
ServiceMetricNames.DAG_ACTIONS_LEASES_OBTAINED);
+ registerMetricForEachDagAction(this.dagActionsNoLongerLeasingMeters,
ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING);
+
registerMetricForEachDagAction(this.dagActionLeaseRemindersScheduledMeters,
ServiceMetricNames.DAG_ACTION_LEASE_REMINDERS_SCHEDULED);
+ registerMetricForEachDagAction(this.dagActionRemindersProcessedMeters,
ServiceMetricNames.DAG_ACTION_REMINDERS_PROCESSED);
+ registerMetricForEachDagAction(this.dagActionsExceededMaxRetryMeters,
ServiceMetricNames.DAG_ACTIONS_EXCEEDED_MAX_RETRY);
+ registerMetricForEachDagAction(this.dagActionsInitFailedMeters,
ServiceMetricNames.DAG_ACTIONS_INIT_FAILED);
+ registerMetricForEachDagAction(this.dagActionsInitSucceededMeters,
ServiceMetricNames.DAG_ACTIONS_INIT_SUCCEEDED);
+ registerMetricForEachDagAction(this.dagActionExecutionsFailedMeters,
ServiceMetricNames.DAG_ACTION_EXECUTIONS_FAILED);
+ registerMetricForEachDagAction(this.dagActionExecutionsSucceededMeters,
ServiceMetricNames.DAG_ACTION_EXECUTIONS_SUCCEEDED);
+ registerMetricForEachDagAction(this.dagActionConclusionsFailedMeters,
ServiceMetricNames.DAG_ACTION_CONCLUSIONS_FAILED);
+ registerMetricForEachDagAction(this.dagActionConclusionsSucceededMeters,
ServiceMetricNames.DAG_ACTION_CONCLUSIONS_SUCCEEDED);
+ }
+
+ /**
+ * Create a meter of each dagActionType for the given metric, register it
with the metric context, and store it in a
+ * concurrent map.
+ * @param metricMap
+ * @param metricName
+ */
+ private void registerMetricForEachDagAction(ConcurrentMap<String,
ContextAwareMeter> metricMap, String metricName) {
Review Comment:
nit: this is registering NOT `ForEachDagAction` but `ForEachDagActionType`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -80,17 +81,17 @@ public void setUp() throws Exception {
doReturn(true).when(dagActionStore).deleteDagAction(any());
dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
- mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)), false,
- dagManagementStateStore);
+ mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
+ false, dagManagementStateStore,
Mockito.mock(DagProcessingEngineMetrics.class));
this.dagProcFactory = new DagProcFactory(null);
DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
new DagProcessingEngine.DagProcEngineThread(dagManagementTaskStream,
this.dagProcFactory,
- dagManagementStateStore, 0);
+ dagManagementStateStore, mock(DagProcessingEngineMetrics.class),
0);
Review Comment:
lots of tests are now being retrofitted to initialize w/ DPE metrics w/ a
mock.
is it not possible to verify behavior or what's invoked on that mock?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.ConcurrentMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+@Slf4j
+/**
+ * Used to track all metrics relating to processing dagActions when
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by
time) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl ->
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
+public class DagProcessingEngineMetrics {
+ MetricContext metricContext;
+ // Declare map of dagActionType to a ContextAwareMeter for each metric
+ private ConcurrentMap<String, ContextAwareMeter> dagActionsStoredMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String, ContextAwareMeter>dagActionsObservedMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsLeasesObtainedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsNoLongerLeasingMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionLeaseRemindersScheduledMeters = new
ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionRemindersProcessedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsExceededMaxRetryMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String, ContextAwareMeter>dagActionsInitFailedMeters =
new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsInitSucceededMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionExecutionsFailedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionExecutionsSucceededMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionConclusionsFailedMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionConclusionsSucceededMeters = new
ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsRemovedFromStoreMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionsFailingRemovalMeters = new ConcurrentHashMap();
+ private ConcurrentMap<String,
ContextAwareMeter>dagActionAverageProcessingDelayMillisMeters = new
ConcurrentHashMap();
+
+ public DagProcessingEngineMetrics(MetricContext metricContext) {
+ this.metricContext = metricContext;
+ registerAllMetrics();
+ }
+
+ public void registerAllMetrics() {
+ registerMetricForEachDagAction(this.dagActionsStoredMeters,
ServiceMetricNames.DAG_ACTIONS_STORED);
+ registerMetricForEachDagAction(this.dagActionsObservedMeters,
ServiceMetricNames.DAG_ACTIONS_OBSERVED);
+ registerMetricForEachDagAction(this.dagActionsLeasesObtainedMeters,
ServiceMetricNames.DAG_ACTIONS_LEASES_OBTAINED);
+ registerMetricForEachDagAction(this.dagActionsNoLongerLeasingMeters,
ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING);
+
registerMetricForEachDagAction(this.dagActionLeaseRemindersScheduledMeters,
ServiceMetricNames.DAG_ACTION_LEASE_REMINDERS_SCHEDULED);
+ registerMetricForEachDagAction(this.dagActionRemindersProcessedMeters,
ServiceMetricNames.DAG_ACTION_REMINDERS_PROCESSED);
+ registerMetricForEachDagAction(this.dagActionsExceededMaxRetryMeters,
ServiceMetricNames.DAG_ACTIONS_EXCEEDED_MAX_RETRY);
+ registerMetricForEachDagAction(this.dagActionsInitFailedMeters,
ServiceMetricNames.DAG_ACTIONS_INIT_FAILED);
+ registerMetricForEachDagAction(this.dagActionsInitSucceededMeters,
ServiceMetricNames.DAG_ACTIONS_INIT_SUCCEEDED);
+ registerMetricForEachDagAction(this.dagActionExecutionsFailedMeters,
ServiceMetricNames.DAG_ACTION_EXECUTIONS_FAILED);
+ registerMetricForEachDagAction(this.dagActionExecutionsSucceededMeters,
ServiceMetricNames.DAG_ACTION_EXECUTIONS_SUCCEEDED);
+ registerMetricForEachDagAction(this.dagActionConclusionsFailedMeters,
ServiceMetricNames.DAG_ACTION_CONCLUSIONS_FAILED);
+ registerMetricForEachDagAction(this.dagActionConclusionsSucceededMeters,
ServiceMetricNames.DAG_ACTION_CONCLUSIONS_SUCCEEDED);
+ }
+
+ /**
+ * Create a meter of each dagActionType for the given metric, register it
with the metric context, and store it in a
+ * concurrent map.
+ * @param metricMap
+ * @param metricName
+ */
+ private void registerMetricForEachDagAction(ConcurrentMap<String,
ContextAwareMeter> metricMap, String metricName) {
+ for (DagActionStore.DagActionType dagActionType :
DagActionStore.DagActionType.values()) {
+ metricMap.put(dagActionType.toString(),
+ this.metricContext.contextAwareMeter(metricName + dagActionType));
+ }
+ }
+
+ /**
+ * Updates the meter corresponding to the metricsName and dagActionType
provided if they match an existing metric
+ * @param metricName
+ * @param dagActionType
+ */
+ public void updateMetricForDagAction(String metricName,
DagActionStore.DagActionType dagActionType) {
Review Comment:
a set of type-safe methods would be so preferable - a lot can go wrong w/
passing in `String metricName`!
e.g. instead of having a massive switch statement buried within the impl
which also pushes complexity (and the potential for errors) to callers, by
making them hard-code a reference to a well-known constant at each point of use:
```
this.dagProcEngineMetrics.updateMetricForDagAction(ServiceMetricNames.DAG_ACTION_REMINDERS_PROCESSED,
leaseParams.getDagAction());
```
instead ask them to hard-code a type-safe and semantic method name:
```
this.dagProcEngineMetrics.markReminderProcessed(leaseParams.getDagAction());
```
or
```
this.dagProcEngineMetrics.incrementInitializeSucceeded(dagAction);
```
the impl should be trivial:
```
public void markReminderProcessed(DagAction dagAction) {
updateMetric(this.reminderProcessedByDagActionType,
dagAction.getDagActionType());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]