phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343113772


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it 
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ * @param <T>
+ */
+
+@Alpha
+public abstract class DagTask {
+
+  protected MultiActiveLeaseArbiter.LeaseObtainedStatus 
leaseObtainedStatusStatus;
+
+  /**
+   * Currently, conclusion of {@link DagTask} marks and records a successful 
release of lease.
+   * It is invoked after {@link DagProc#process(DagManagementStateStore)} is 
completed successfully.
+   * @param multiActiveLeaseArbiter
+   * @throws IOException
+   */
+  public void conclude(MultiActiveLeaseArbiter multiActiveLeaseArbiter) throws 
IOException {
+    multiActiveLeaseArbiter.recordLeaseSuccess(leaseObtainedStatusStatus);

Review Comment:
   doesn't this return a boolean (of whether success)?  if so, be sure not to 
swallow the return value



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+    try {
+      S state = this.initializeWithRetries(dagManagementStateStore, 
maxRetryCount, delayRetryMillis);
+      R result = this.actWithRetries(state, dagManagementStateStore, 
maxRetryCount, delayRetryMillis); // may be pass state store too here

Review Comment:
   is the comment obsolete?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +317,42 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+   * through lease arbitration and if it fails, it will create and schedule a 
reminder trigger to check back again.

Review Comment:
   I agree with the advice!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
+
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
           log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
+          //TODO: add a flag for if condition only if multi-active is enabled
           this.resumesInvoked.mark();
         } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
           log.info("Received insert dag action and about to send kill flow 
request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));
+
+          if(isMultiLeaderDagManagerEnabled) {
+            DagActionStore.DagAction killAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);

Review Comment:
   is `killFlow` called in any other place?  if we don't actually have a 
`DagAction` already on hand in any of the callsites, not sure it makes sense to 
use that in the API.  perhaps it should be:
   ```
   killFlow(flowGroup, flowName, flowExecutionId, produceTimestamp)
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it 
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ * @param <T>
+ */
+
+@Alpha
+public abstract class DagTask {
+
+  protected MultiActiveLeaseArbiter.LeaseObtainedStatus 
leaseObtainedStatusStatus;

Review Comment:
   why not `private final`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;

Review Comment:
   not 100% sure if this is inline w/ coding standard... but I always put 
`public` before `protected`, whether or not `abstract`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config 
config, DagActionStore d
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+    // instantiating using default ctor; subsequent PR will handle 
instantiating with multi-args ctor
+//    this.dagTaskStream = new DagTaskStream();

Review Comment:
   so this isn't initialized anywhere?  won't we get an NPE when using it?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.

Review Comment:
   suggestion: `Encapsulates task-specific handling appropriate to a given 
{@link DagTask} derived type.`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task or job in a Dag.

Review Comment:
   "job" already has other connotations as what we run on executors.  maybe 
"defines a singular task in the lifecycle of a managed Dag"?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+    try {
+      S state = this.initializeWithRetries(dagManagementStateStore, 
maxRetryCount, delayRetryMillis);
+      R result = this.actWithRetries(state, dagManagementStateStore, 
maxRetryCount, delayRetryMillis); // may be pass state store too here
+      this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, 
delayRetryMillis);
+      log.info("Successfully processed Dag Request");
+    } catch (Exception ex) {
+      throw new RuntimeException("Cannot process Dag Request: ", ex);
+    }
+  }
+
+  protected final S initializeWithRetries(DagManagementStateStore 
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) throws 
IOException {
+    for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+      try {
+        return this.initialize(dagManagementStateStore);
+      } catch (MaybeRetryableException e) {
+        if (retryCount < maxRetryCount - 1) { // Don't wait before the last 
retry
+          waitBeforeRetry(delayRetryMillis);
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    throw new RuntimeException("Max retry attempts reached. Cannot initialize 
Dag");
+  }

Review Comment:
   1. try reworking more generically as:
   ```
   protected final <T> T execWithRetries(Supplier<T> exec, int maxRetryCount, 
long delayRetryMillis) {
   ...
   }
   ```
   and call like:
   ```
   R result = this.execWithRetries(() -> this.act(state, 
dagManagementStateStore), maxRetryCount, delayRetryMillis)
   ```
   
   2. probably don't wrap exceptions, given that's what `process` already plans 
to do... unless you're merely trying to tunnel them past the `Supplier`'s 
exception signature.  if so, then create a `CheckedExceptionSupplier` in the 
vein of - 
https://github.com/apache/gobblin/blob/028b85f587e3c1e6afa5d8662fe9ed3f0087568d/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java#L31
   
   3. the final missing piece is to remain aware of the timeframe for the 
task's lease expiration.  when it's close (or already passed) do not try (and 
certainly don't retry).  instead throw a particular exception to mark the 
situation



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -311,4 +315,40 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+   * through lease arbitration and if it fails, it will create and schedule a 
reminder trigger to check back again.
+   * @param jobProps
+   * @param flowAction
+   * @param eventTimeMillis
+   * @return optionally leaseObtainedStatus if acquired; otherwise schedule 
reminder to check back again.
+   * @throws IOException
+   */
+  public MultiActiveLeaseArbiter.LeaseAttemptStatus 
getLeaseOnDagAction(Properties jobProps, DagActionStore.DagAction flowAction, 
long eventTimeMillis) throws IOException {
+
+    if (multiActiveLeaseArbiter.isPresent()) {
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+      if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        this.leaseObtainedCount.inc();
+        log.info("Successfully acquired lease for dag action: {}", flowAction);
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+        this.leasedToAnotherStatusCount.inc();
+        scheduleReminderForEvent(jobProps,
+            (MultiActiveLeaseArbiter.LeasedToAnotherStatus) 
leaseAttemptStatus, eventTimeMillis);
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+        this.noLongerLeasingStatusCount.inc();
+        log.info("Received type of leaseAttemptStatus: [{}, eventTimestamp: 
{}] ", leaseAttemptStatus.getClass().getName(),
+            eventTimeMillis);
+      }
+      return leaseAttemptStatus;
+    } else {
+      throw new RuntimeException(String.format("Multi-active scheduler is not 
enabled so trigger event should not be "
+          + "handled with this method."));
+    }
+  }
+
+  public MultiActiveLeaseArbiter getMultiActiveLeaseArbiter() {
+    return this.multiActiveLeaseArbiter.get();
+  }

Review Comment:
   no need for this, if you indeed modify the existing `handleTriggerEvent` on 
line 109



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends 
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+  private KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }

Review Comment:
   tip: mark this `final` and implement via `@RequiredArgsConstructor`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -80,6 +83,8 @@ public String load(String key) throws Exception {
   protected Orchestrator orchestrator;
   protected boolean isMultiActiveSchedulerEnabled;
   protected FlowCatalog flowCatalog;
+  private DagTaskStream dagTaskStream;

Review Comment:
   this change monitor shouldn't use a task stream as such (that's exclusively 
for the `DagProcessingEngine`.)  instead let it write to the `DagManagement` API



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends 
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+  private KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }
+
+  @Override
+  protected List<Dag.DagNode<JobExecutionPlan>> 
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+    String dagToCancel = this.killDagTask.getKillDagId().toString();

Review Comment:
   nit: since it's a `String`, not a dag, I'd name `dagId` or `dagIdToCancel`
   
   anyway, for such a short impl, (unless you're planning to log it), I'd just 
make a one-liner w/o an intermediate var



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends 
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+  private KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }
+
+  @Override
+  protected List<Dag.DagNode<JobExecutionPlan>> 
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+    String dagToCancel = this.killDagTask.getKillDagId().toString();
+    return dagManagementStateStore.getJobs(dagToCancel);
+  }
+
+  /**
+   * Post initialization of the Dag with the current state, it will identify 
the {@link Dag.DagNode}s to be killed
+   * and cancel the job on the executor. The return type is kept as {@link 
Object} since we might want to refactor
+   * or add more responsibility as part of the actions taken. Hence, after 
completing all possible scenarios,
+   * it will make sense to update the method signature with its appropriate 
type.
+   * @param dagNodesToCancel
+   * @return
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws IOException
+   */
+  @Override
+  protected Dag<JobExecutionPlan> act(List<Dag.DagNode<JobExecutionPlan>> 
dagNodesToCancel, DagManagementStateStore dagManagementStateStore) throws 
Exception {
+    String dagToCancel = this.killDagTask.getKillDagId().toString();
+
+    log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+    for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+      killDagNode(dagNodeToCancel);
+    }
+    
dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+    dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by 
request");
+    
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(),
 DagActionStore.FlowActionType.KILL);
+    return dagManagementStateStore.getDag(dagToCancel);
+
+  }
+
+  @Override
+  protected void sendNotification(Dag<JobExecutionPlan> dag, EventSubmitter 
eventSubmitter) throws MaybeRetryableException {
+    for(Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dag.getNodes()) {

Review Comment:
   please update your IDE to catch when you forget the space between `for (`... 
this happens all over this PR



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends 
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+  private KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }
+
+  @Override
+  protected List<Dag.DagNode<JobExecutionPlan>> 
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+    String dagToCancel = this.killDagTask.getKillDagId().toString();
+    return dagManagementStateStore.getJobs(dagToCancel);
+  }
+
+  /**
+   * Post initialization of the Dag with the current state, it will identify 
the {@link Dag.DagNode}s to be killed
+   * and cancel the job on the executor. The return type is kept as {@link 
Object} since we might want to refactor
+   * or add more responsibility as part of the actions taken. Hence, after 
completing all possible scenarios,
+   * it will make sense to update the method signature with its appropriate 
type.
+   * @param dagNodesToCancel
+   * @return
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws IOException
+   */
+  @Override
+  protected Dag<JobExecutionPlan> act(List<Dag.DagNode<JobExecutionPlan>> 
dagNodesToCancel, DagManagementStateStore dagManagementStateStore) throws 
Exception {
+    String dagToCancel = this.killDagTask.getKillDagId().toString();
+
+    log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+    for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+      killDagNode(dagNodeToCancel);
+    }
+    
dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+    dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by 
request");
+    
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(),
 DagActionStore.FlowActionType.KILL);

Review Comment:
   nits:
   1. move `dagToCancel` init after the `for` loop (BTW, it's a dag ID, not a 
DAG)
   2. call `dagMgmtStateStore.getDag(dagToCancel)` only once
   3. decide whether still the need for an intermediate `dagToCancel` var



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends 
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+  private KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }
+
+  @Override
+  protected List<Dag.DagNode<JobExecutionPlan>> 
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+    String dagToCancel = this.killDagTask.getKillDagId().toString();
+    return dagManagementStateStore.getJobs(dagToCancel);
+  }
+
+  /**
+   * Post initialization of the Dag with the current state, it will identify 
the {@link Dag.DagNode}s to be killed
+   * and cancel the job on the executor. The return type is kept as {@link 
Object} since we might want to refactor
+   * or add more responsibility as part of the actions taken. Hence, after 
completing all possible scenarios,
+   * it will make sense to update the method signature with its appropriate 
type.

Review Comment:
   out of date



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to