[
https://issues.apache.org/jira/browse/GOBBLIN-1930?focusedWorklogId=885138&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-885138
]
ASF GitHub Bot logged work on GOBBLIN-1930:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Oct/23 21:58
Start Date: 13/Oct/23 21:58
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3800:
URL: https://github.com/apache/gobblin/pull/3800#discussion_r1358869580
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -242,7 +242,6 @@ private void runRetentionOnArbitrationTable() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Runnable retentionTask = () -> {
try {
- Thread.sleep(10000);
Review Comment:
not concerned... more wondering... did this become redundant, given the
scheduled TP executor?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -43,14 +43,17 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.message.processed";
public static final String
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specstoreMonitor.produce.to.consume.delay";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.kills.invoked";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.message.processed";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.resumes.invoked";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.flows.launched";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.unexpected.errors";
- public static final String
- GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.produce.to.consume.delay";
+ public static final String DAG_ACTION_STORE_MONITOR_PREFIX =
"dagActionStoreMonitor";
Review Comment:
nit: this is no longer a *prefix*... but why anyway do you prefer to repeat
so many times `SMNames.GOBBLIN_SERVICE_PREFIX + "."`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -130,30 +132,34 @@ protected void processMessage(DecodeableKafkaRecord
message) {
String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
dagActionsSeenCache, operation,
produceTimestamp.toString())) {
+ this.messageFilteredOutMeter.mark();
return;
}
+ // Used to easily log information to identify the dag action
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+ dagActionType);
+
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
if (operation.equals("INSERT")) {
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
- log.info("Received insert dag action and about to send resume flow
request");
+ log.info("Received insert dag action and about to send resume flow
request for: {}", dagAction);
Review Comment:
nit: too conversational. how about:
```
log.info("DagAction change ({}): {}", operation, dagAction)
```
(i.e. won't the `resume/kill/launch` be logged as `dagAction.dagActionType`)?
##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -43,6 +43,11 @@ public class ServiceMetricNames {
public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT =
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX +
".jobDoesNotExistInScheduler";
public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT
= GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX +
".failedToSetReminderCount";
+ // Dag Action Handling Related Metrics
+ public static final String DAG_ACTION_HANDLING_PREFIX = "dagActionHandling";
Review Comment:
wondering... couldn't this be a `dagManager.` metric?
`dagManager.failedLaunchEventsOn...`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -510,13 +510,18 @@ public void
handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {} due to exception
{}", flowId, e.getMessage());
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (IOException e) {
log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag
action from dagActionStore (check "
+ "stacktrace) due to exception {}", flowId, e.getMessage());
+ this.dagManagerMetrics.incrementFailedLaunchCount();
} catch (InterruptedException e) {
- log.warn("SpecCompiler failed to reach healthy state before compilation
of flowId {}. Exception: ", flowId, e);
+ log.warn("SpecCompiler failed to reach healthy state before compilation
of flowId {} due to exception {}", flowId,
+ e);
Review Comment:
I guess there's the possibility I may be misremembering... and pursuing this
needlessly... but my expectation is that a stacktrace would only be written
when calling this form:
```
Logger::warn(String, Throwable)
```
if you call the form:
```
Logger::warn(String, Object, Object) // aka. Logger::warn(String, Object...)
```
are you certain it will print the ST, when the last arg is `Throwable` and
there's no corresponding `{}` remaining for it in the initial `String` arg?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -308,7 +307,7 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - Reminder event time"
- + "is the same as db event.", flowAction, isReminderEvent ?
"reminder" : "original",
+ + " is the same as db event.", flowAction, isReminderEvent ?
"reminder" : "original",
Review Comment:
nit: I prefer the space at the end of the string prior
Issue Time Tracking
-------------------
Worklog Id: (was: 885138)
Time Spent: 20m (was: 10m)
> Improve Logs & Metrics around Multi-active Launch Handling
> ----------------------------------------------------------
>
> Key: GOBBLIN-1930
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1930
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Improve logging and metrics around multi-active launch flow event handling to
> identify any missing events between the {{MysqlMultiActiveLeaseArbiter}}
> committing the launch event to the {{dagActionStore}} and the
> {{DagActionMonitor}} receiving events for processing. We want to be able to
> distinguish between the following cases ofÂ
> * events that are never received by the {{DagActionMonitor}}
> * events incorrectly filtered out by the {{DagActionMonitor}}
> * any failed submissions of dags to the {{DagManager}} either upon leader
> change or from the {{DagActionChangeMonitor}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)