[ 
https://issues.apache.org/jira/browse/GOBBLIN-1930?focusedWorklogId=885138&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-885138
 ]

ASF GitHub Bot logged work on GOBBLIN-1930:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Oct/23 21:58
            Start Date: 13/Oct/23 21:58
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3800:
URL: https://github.com/apache/gobblin/pull/3800#discussion_r1358869580


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -242,7 +242,6 @@ private void runRetentionOnArbitrationTable() {
     ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
     Runnable retentionTask = () -> {
       try {
-        Thread.sleep(10000);

Review Comment:
   not concerned... more wondering... did this become redundant, given the 
scheduled TP executor?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -43,14 +43,17 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.message.processed";
   public static final String 
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
       ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specstoreMonitor.produce.to.consume.delay";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.kills.invoked";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.resumes.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.flows.launched";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.unexpected.errors";
-  public static final String
-      GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.produce.to.consume.delay";
+  public static final String DAG_ACTION_STORE_MONITOR_PREFIX = 
"dagActionStoreMonitor";

Review Comment:
   nit: this is no longer a *prefix*... but why anyway do you prefer to repeat 
so many times `SMNames.GOBBLIN_SERVICE_PREFIX + "."`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -130,30 +132,34 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     String changeIdentifier = tid + key;
     if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
dagActionsSeenCache, operation,
         produceTimestamp.toString())) {
+      this.messageFilteredOutMeter.mark();
       return;
     }
 
+    // Used to easily log information to identify the dag action
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+        dagActionType);
+
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
-          log.info("Received insert dag action and about to send resume flow 
request");
+          log.info("Received insert dag action and about to send resume flow 
request for: {}", dagAction);

Review Comment:
   nit: too conversational.  how about:
   ```
   log.info("DagAction change ({}): {}", operation, dagAction)
   ```
   (i.e. won't the `resume/kill/launch` be logged as `dagAction.dagActionType`)?



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -43,6 +43,11 @@ public class ServiceMetricNames {
   public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = 
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + 
".jobDoesNotExistInScheduler";
   public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT 
= GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + 
".failedToSetReminderCount";
 
+  // Dag Action Handling Related Metrics
+  public static final String DAG_ACTION_HANDLING_PREFIX = "dagActionHandling";

Review Comment:
   wondering... couldn't this be a `dagManager.` metric?  
`dagManager.failedLaunchEventsOn...`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -510,13 +510,18 @@ public void 
handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
       this.dagActionStore.get().deleteDagAction(launchAction);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (SpecNotFoundException e) {
       log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (IOException e) {
       log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag 
action from dagActionStore (check "
           + "stacktrace) due to exception {}", flowId, e.getMessage());
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (InterruptedException e) {
-      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
+      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {} due to exception {}", flowId,
+          e);

Review Comment:
   I guess there's the possibility I may be misremembering... and pursuing this 
needlessly... but my expectation is that a stacktrace would only be written 
when calling this form:
   ```
   Logger::warn(String, Throwable)
   ```
   if you call the form:
   ```
   Logger::warn(String, Object, Object) // aka. Logger::warn(String, Object...)
   ```
   are you certain it will print the ST, when the last arg is `Throwable` and 
there's no corresponding `{}` remaining for it in the initial `String` arg?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -308,7 +307,7 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
         if (eventTimeMillis == dbEventTimestamp.getTime()) {
           // TODO: change this to a debug after fixing issue
           log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Reminder event time"
-                  + "is the same as db event.", flowAction, isReminderEvent ? 
"reminder" : "original",
+                  + " is the same as db event.", flowAction, isReminderEvent ? 
"reminder" : "original",

Review Comment:
   nit: I prefer the space at the end of the string prior





Issue Time Tracking
-------------------

    Worklog Id:     (was: 885138)
    Time Spent: 20m  (was: 10m)

> Improve Logs & Metrics around Multi-active Launch Handling
> ----------------------------------------------------------
>
>                 Key: GOBBLIN-1930
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1930
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Improve logging and metrics around multi-active launch flow event handling to 
> identify any missing events between the {{MysqlMultiActiveLeaseArbiter}} 
> committing the launch event to the {{dagActionStore}} and the 
> {{DagActionMonitor}} receiving events for processing. We want to be able to 
> distinguish between the following cases of 
>  * events that are never received by the {{DagActionMonitor}}
>  * events incorrectly filtered out by the {{DagActionMonitor}}
>  * any failed submissions of dags to the {{DagManager}} either upon leader 
> change or from the {{DagActionChangeMonitor}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to