[
https://issues.apache.org/jira/browse/GOBBLIN-2121?focusedWorklogId=928441&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928441
]
ASF GitHub Bot logged work on GOBBLIN-2121:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Aug/24 09:43
Start Date: 02/Aug/24 09:43
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1701582523
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec,
Dag<JobExecutionPlan> jobE
}
public void remove(Spec spec, Properties headers) throws IOException {
+ URI uri = spec.getUri();
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all
executions
if (spec instanceof FlowSpec) {
- //Send the dag to the DagManager to stop it.
- //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.stopDag(spec.getUri());
+ String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+ String flowName = FlowSpec.Utils.getFlowName(uri);
+ if (this.flowLaunchHandler.isPresent()) {
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+ DagActionStore.DagActionType.KILL);
+ DagActionStore.LeaseParams leaseObject = new
DagActionStore.LeaseParams(killDagAction, false,
Review Comment:
nit: `leaseParams` rather than `leaseObject`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
Review Comment:
let's keep the PRs as small and as cohesive as possible. so next time, I'd
strongly encourage you to put this and related mods into a PR separate from
something like the non-transient exceptions work
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
+ if (KafkaJobStatusMonitor.isThrowableInstanceOf(e,
this.nonRetryableExceptions)) {
+ // conclude the lease so that it is not retried, if the dag proc
fails with non-transient exception
Review Comment:
suggest to log (although better would be within the predicate impl (i.e.
`Exception -> boolean`)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -122,6 +134,7 @@ static class DagProcEngineThread implements Runnable {
private final DagProcFactory dagProcFactory;
private final DagManagementStateStore dagManagementStateStore;
private final DagProcessingEngineMetrics dagProcEngineMetrics;
+ private final List<Class<? extends Exception>> nonRetryableExceptions;
Review Comment:
let's use a predicate as the interface - i.e. a function of `Exception ->
boolean`, but you could certainly implement one instance of that Strategy
interface in terms of a `List<Class<? extends Exception>>`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -426,7 +426,15 @@ public static long getExecutionIdFromTableName(String
tableName) {
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
- public static boolean isThrowableInstanceOf(Throwable exception,
List<Class<? extends Exception>> typesList) {
- return typesList.stream().anyMatch(e -> e.isInstance(exception));
+ public static boolean isThrowableInstanceOf(Throwable exception,
List<Class<? extends Exception>> exceptionsList) {
Review Comment:
this changes the semantics. perhaps that's OK, but a more precise name
along w/ some javadoc might help. e.g. `isThrowableOrCaseInstanceOf` or
`isThrowableInstanceOfRecursive`
also, seems better suited to living within a utils class, rather than here
in KJSM which was probably just the first place we realized we needed such
fucntionality.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
+ if (KafkaJobStatusMonitor.isThrowableInstanceOf(e,
this.nonRetryableExceptions)) {
+ // conclude the lease so that it is not retried, if the dag proc
fails with non-transient exception
+ dagTask.conclude();
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
Review Comment:
couldn't this live entirely encapsulated within `DagProc::process`? that
would give a normal return, which means the DPE would next call
`DagTask::conclude`
for that approach, initialize the `DagProcFactory` w/ the exception predicate
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java:
##########
@@ -68,7 +68,7 @@ protected boolean
isDagStillPresent(Optional<Dag<JobExecutionPlan>> dag, DagMana
if (!dagManagementStateStore.existsJobDagAction(dagAction.getFlowGroup(),
dagAction.getFlowName(),
dagAction.getFlowExecutionId(), dagAction.getJobName(),
dagAction.getDagActionType())) {
- log.warn("Dag action {} is cleaned up from DMSS. No further action is
required.", dagAction);
+ log.info("Dag action {} is cleaned up from DMSS. No further action is
required.", dagAction);
return false;
Review Comment:
don't we already hold a lease by the time `act` is called? if so, it would
be exceedingly rare (and really no different than any other `DagProc::act`
derived class impl). hence, I'd suggest to remove. but if we really believe
we need this, let's put the impl into the `DagProc` base class
Issue Time Tracking
-------------------
Worklog Id: (was: 928441)
Time Spent: 1.5h (was: 1h 20m)
> redirect kill requests to dag proc engine
> ------------------------------------------
>
> Key: GOBBLIN-2121
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2121
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)