[ 
https://issues.apache.org/jira/browse/GOBBLIN-2121?focusedWorklogId=928441&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928441
 ]

ASF GitHub Bot logged work on GOBBLIN-2121:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Aug/24 09:43
            Start Date: 02/Aug/24 09:43
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1701582523


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec, 
Dag<JobExecutionPlan> jobE
   }
 
   public void remove(Spec spec, Properties headers) throws IOException {
+    URI uri = spec.getUri();
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
-      //Send the dag to the DagManager to stop it.
-      //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-      _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-      this.dagManager.stopDag(spec.getUri());
+      String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+      String flowName = FlowSpec.Utils.getFlowName(uri);
+      if (this.flowLaunchHandler.isPresent()) {
+        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+        for (long flowExecutionId : flowExecutionIds) {
+        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+            DagActionStore.DagActionType.KILL);
+        DagActionStore.LeaseParams leaseObject = new 
DagActionStore.LeaseParams(killDagAction, false,

Review Comment:
   nit: `leaseParams` rather than `leaseObject`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########


Review Comment:
   let's keep the PRs as small and as cohesive as possible.  so next time, I'd 
strongly encourage you to put this and related mods into a PR separate from 
something like the non-transient exceptions work



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
           dagTask.conclude();
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
+          if (KafkaJobStatusMonitor.isThrowableInstanceOf(e, 
this.nonRetryableExceptions)) {
+            // conclude the lease so that it is not retried, if the dag proc 
fails with non-transient exception

Review Comment:
   suggest to log (although better would be within the predicate impl (i.e. 
`Exception -> boolean`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -122,6 +134,7 @@ static class DagProcEngineThread implements Runnable {
     private final DagProcFactory dagProcFactory;
     private final DagManagementStateStore dagManagementStateStore;
     private final DagProcessingEngineMetrics dagProcEngineMetrics;
+    private final List<Class<? extends Exception>> nonRetryableExceptions;

Review Comment:
   let's use a predicate as the interface - i.e. a function of `Exception -> 
boolean`, but you could certainly implement one instance of that Strategy 
interface in terms of a `List<Class<? extends Exception>>`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -426,7 +426,15 @@ public static long getExecutionIdFromTableName(String 
tableName) {
 
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
 
-  public static boolean isThrowableInstanceOf(Throwable exception, 
List<Class<? extends Exception>> typesList) {
-    return typesList.stream().anyMatch(e -> e.isInstance(exception));
+  public static boolean isThrowableInstanceOf(Throwable exception, 
List<Class<? extends Exception>> exceptionsList) {

Review Comment:
   this changes the semantics.  perhaps that's OK, but a more precise name 
along w/ some javadoc might help.  e.g. `isThrowableOrCaseInstanceOf` or 
`isThrowableInstanceOfRecursive`
   
   also, seems better suited to living within a utils class, rather than here 
in KJSM which was probably just the first place we realized we needed such 
fucntionality.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
           dagTask.conclude();
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
+          if (KafkaJobStatusMonitor.isThrowableInstanceOf(e, 
this.nonRetryableExceptions)) {
+            // conclude the lease so that it is not retried, if the dag proc 
fails with non-transient exception
+            dagTask.conclude();
+            
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();

Review Comment:
   couldn't this live entirely encapsulated within `DagProc::process`?  that 
would give a normal return, which means the DPE would next call 
`DagTask::conclude`
   
   for that approach, initialize the `DagProcFactory` w/ the exception predicate



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java:
##########
@@ -68,7 +68,7 @@ protected boolean 
isDagStillPresent(Optional<Dag<JobExecutionPlan>> dag, DagMana
 
     if (!dagManagementStateStore.existsJobDagAction(dagAction.getFlowGroup(), 
dagAction.getFlowName(),
         dagAction.getFlowExecutionId(), dagAction.getJobName(), 
dagAction.getDagActionType())) {
-      log.warn("Dag action {} is cleaned up from DMSS. No further action is 
required.", dagAction);
+      log.info("Dag action {} is cleaned up from DMSS. No further action is 
required.", dagAction);
       return false;

Review Comment:
   don't we already hold a lease by the time `act` is called?  if so, it would 
be exceedingly rare (and really no different than any other `DagProc::act` 
derived class impl).  hence, I'd suggest to remove.  but if we really believe 
we need this, let's put the impl into the `DagProc` base class





Issue Time Tracking
-------------------

    Worklog Id:     (was: 928441)
    Time Spent: 1.5h  (was: 1h 20m)

>  redirect kill requests to dag proc engine
> ------------------------------------------
>
>                 Key: GOBBLIN-2121
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2121
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to