[ 
https://issues.apache.org/jira/browse/GOBBLIN-2104?focusedWorklogId=926729&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-926729
 ]

ASF GitHub Bot logged work on GOBBLIN-2104:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jul/24 19:47
            Start Date: 19/Jul/24 19:47
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #3983:
URL: https://github.com/apache/gobblin/pull/3983#discussion_r1684914115


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.ConcurrentMap;
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricTagNames;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+
+/**
+ * Used to track all metrics relating to processing dagActions when 
DagProcessingEngine is enabled. The metrics can be
+ * used to trace the number of dagActions (which can be further broken down by 
type) at various points of the system,
+ * starting from addition to the DagActionStore to observation in the 
DagActionChangeMonitor through the
+ * DagProcessingEngine pipeline (DagManagement -> DagTaskStreamImpl -> 
MySqlMultiActiveLeaseArbiter -> DagProc).
+ */
+@Slf4j
+public class DagProcessingEngineMetrics {
+  MetricContext metricContext;
+  // Declare map of dagActionType to a ContextAwareMeter for each metric
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsStoredMeterByDagActionType = new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsObservedMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsLeasesObtainedMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsNoLongerLeasingMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsLeaseReminderScheduledMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsReminderProcessedMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsExceededMaxRetryMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsInitializeFailedMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsInitializeSucceededMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsActFailedMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsActSucceededMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsConcludeFailedMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsConcludeSucceededMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsRemovedFromStoreMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionsFailingRemovalMeterByDagActionType =  new ConcurrentHashMap();
+  private ConcurrentMap<String, ContextAwareMeter> 
dagActionAverageProcessingDelayMillisMeterByDagActionType =  new 
ConcurrentHashMap();
+
+  public DagProcessingEngineMetrics(MetricContext metricContext) {
+    this.metricContext = metricContext;
+    registerAllMetrics();
+  }
+
+  @Inject
+  public DagProcessingEngineMetrics() {
+    // Create a new metric context for the DagProcessingEngineMetrics tagged 
appropriately
+    List<Tag<?>> tags = new ArrayList<>();
+    tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, 
GobblinMetrics.MetricType.COUNTER));
+    this.metricContext = Instrumented.getMetricContext(new State(), 
this.getClass(), tags);
+  }
+
+  public void registerAllMetrics() {
+    
registerMetricForEachDagActionType(this.dagActionsStoredMeterByDagActionType, 
ServiceMetricNames.DAG_ACTIONS_STORED);
+    
registerMetricForEachDagActionType(this.dagActionsObservedMeterByDagActionType, 
ServiceMetricNames.DAG_ACTIONS_OBSERVED);
+    
registerMetricForEachDagActionType(this.dagActionsLeasesObtainedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_LEASES_OBTAINED);
+    
registerMetricForEachDagActionType(this.dagActionsNoLongerLeasingMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_NO_LONGER_LEASING);
+    
registerMetricForEachDagActionType(this.dagActionsLeaseReminderScheduledMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_LEASE_REMINDER_SCHEDULED);
+    
registerMetricForEachDagActionType(this.dagActionsReminderProcessedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_REMINDER_PROCESSED);
+    
registerMetricForEachDagActionType(this.dagActionsExceededMaxRetryMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_EXCEEDED_MAX_RETRY);
+    
registerMetricForEachDagActionType(this.dagActionsInitializeFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_INITIALIZE_FAILED);
+    
registerMetricForEachDagActionType(this.dagActionsInitializeSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_INITIALIZE_SUCCEEDED);
+    
registerMetricForEachDagActionType(this.dagActionsActFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_ACT_FAILED);
+    
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
+    
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
+    
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
+  }
+
+  /**
+   * Create a meter of each dagActionType for the given metric, register it 
with the metric context, and store it in a
+   * concurrent map.
+   * @param metricMap
+   * @param metricName
+   */
+  private void registerMetricForEachDagActionType(ConcurrentMap<String, 
ContextAwareMeter> metricMap, String metricName) {
+    for (DagActionStore.DagActionType dagActionType : 
DagActionStore.DagActionType.values()) {
+      metricMap.put(dagActionType.toString(),
+          this.metricContext.contextAwareMeter(metricName + dagActionType));
+    }
+  }
+
+  public void markDagActionsStored(DagActionStore.DagActionType dagActionType) 
{
+    updateMetricForDagActionType(this.dagActionsStoredMeterByDagActionType, 
dagActionType);
+  }
+
+  public void markDagActionsObserved(DagActionStore.DagActionType 
dagActionType) {
+    updateMetricForDagActionType(this.dagActionsObservedMeterByDagActionType, 
dagActionType);
+  }
+
+  public void markDagActionsLeasedObtained(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsLeasesObtainedMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  public void markDagActionsNoLongerLeasing(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsNoLongerLeasingMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  public void markDagActionsLeaseReminderScheduled(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsLeaseReminderScheduledMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  public void markDagActionsRemindersProcessed(DagActionStore.LeaseParams 
leaseParams) {
+    
updateMetricForDagActionType(this.dagActionsReminderProcessedMeterByDagActionType,
+        leaseParams.getDagAction().getDagActionType());
+  }
+
+  // TODO: implement evaluating max retries later
+  public void markDagActionsExceedingMaxRetry(DagActionStore.DagActionType 
dagActionType) {
+    
updateMetricForDagActionType(this.dagActionsExceededMaxRetryMeterByDagActionType,
 dagActionType);
+  }
+
+  public void markDagActionsInitialize(DagActionStore.DagActionType 
dagActionType, boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsInitializeSucceededMeterByDagActionType,
 dagActionType);
+    } else {
+      
updateMetricForDagActionType(this.dagActionsInitializeFailedMeterByDagActionType,
 dagActionType);
+    }
+  }
+
+  public void markDagActionsAct(DagActionStore.DagActionType dagActionType, 
boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsActSucceededMeterByDagActionType, 
dagActionType);
+    } else {
+      
updateMetricForDagActionType(this.dagActionsActFailedMeterByDagActionType, 
dagActionType);
+    }
+  }
+
+  public void markDagActionsConclude(DagActionStore.DagActionType 
dagActionType, boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
 dagActionType);
+    } else {
+      
updateMetricForDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, 
dagActionType);
+    }
+  }
+
+
+  /**
+   * Generic helper used to increment a metric corresponding to the 
dagActionType in the provided map. It assumes the
+   * meter for each dagActionType can be identified by its name.
+   */
+  private void updateMetricForDagActionType(ConcurrentMap<String, 
ContextAwareMeter> metricMap,
+      DagActionStore.DagActionType dagActionType) {
+      if (metricMap.containsKey(dagActionType.toString())) {
+        metricMap.get(dagActionType.toString()).mark();
+      } else {
+        throw new RuntimeException(String.format("No meter exists for 
dagActionType %s in metricsMap %s",

Review Comment:
   in that case, we should just replace `if ... else ` with  
`metricMap.get(dagActionType).mark();`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 926729)
    Time Spent: 4h  (was: 3h 50m)

> Initialize DagProcessingEngine Metrics
> --------------------------------------
>
>                 Key: GOBBLIN-2104
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2104
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to