phet commented on code in PR #3899:
URL: https://github.com/apache/gobblin/pull/3899#discussion_r1542171456


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -265,7 +266,9 @@ public static GobblinServiceManager 
create(GobblinServiceConfiguration serviceCo
    */
   public static <T> T getClass(Class<T> classToGet) {
     if (GOBBLIN_SERVICE_GUICE_MODULE == null) {
-      throw new RuntimeException("getClass called without calling create 
method to initialize GobblinServiceGuiceModule");
+      throw new RuntimeException(String.format("getClass called to obtain %s 
without calling create method to "
+          + "initialize GobblinServiceGuiceModule. Stacktrace of current 
thread %s", classToGet,
+          Arrays.toString(Thread.currentThread().getStackTrace()).replace(", 
", "\n  at ")));

Review Comment:
   sorry if my comment was confusing, but the `RuntimeException` should 
generate a stacktrace already



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -167,7 +158,7 @@ private DagTask createDagTask(DagActionStore.DagAction 
dagAction, MultiActiveLea
 
     switch (dagActionType) {
       case LAUNCH:
-        return new LaunchDagTask(dagAction, leaseObtainedStatus);
+        return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());

Review Comment:
   why `Optional` - when would the `DMTSImpl` operate w/o a `DagActionStore`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiterFactory.java:
##########
@@ -27,28 +27,32 @@
 
 
 /**
- * An abstract base class for creating {@link MultiActiveLeaseArbiter} 
factories that use a specific configuration key.
+ * An abstract base class for {@link MultiActiveLeaseArbiter} factories that 
use a specific configuration key.
  * Subclasses must provide a key to use in the constructor.
  */
 @Slf4j
 public abstract class MultiActiveLeaseArbiterFactory implements 
Provider<MultiActiveLeaseArbiter> {
     private final Config config;
-    private final String key;
+    private final String configPrefix;
 
-    public MultiActiveLeaseArbiterFactory(Config config, String key) {
+    public MultiActiveLeaseArbiterFactory(Config config, String configPrefix) {
       this.config = Objects.requireNonNull(config);
-      this.key = Objects.requireNonNull(key);
+      this.configPrefix = Objects.requireNonNull(configPrefix);
+      if (!this.config.hasPath(configPrefix)) {
+        throw new RuntimeException(String.format("Unable to initialize 
multiActiveLeaseArbiter due to missing "
+            + "configurations that should be prefixed by %s.", configPrefix));
+      }
     }
 
     @Override
     public MultiActiveLeaseArbiter get() {
       try {
-        Config leaseArbiterConfig = this.config.getConfig(key);
+        Config leaseArbiterConfig = this.config.getConfig(configPrefix);

Review Comment:
   I don't see you using `this.config` anywhere, so perhaps 
`leaseArbiterConfig` is what you should store in the ctor.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -80,6 +80,14 @@ public DagAction updateFlowExecutionId(long eventTimeMillis) 
{
    */
   boolean exists(String flowGroup, String flowName, String flowExecutionId, 
DagActionType dagActionType) throws IOException, SQLException;
 
+  /**
+   * check if the dag action exists in {@link DagActionStore} using {@link 
DagAction} to identify dag and specific
+   * action value
+   * @throws IOException
+   * @return true if we successfully delete one record, return false if the 
record does not exist
+   */
+  boolean exists(DagAction dagAction) throws IOException, SQLException;
+

Review Comment:
   I don't believe there's a legit need for this method



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -41,25 +42,33 @@
 public abstract class DagTask {
   @Getter public final DagActionStore.DagAction dagAction;
   private final MultiActiveLeaseArbiter.LeaseObtainedStatus 
leaseObtainedStatus;
+  private final DagActionStore dagActionStore;
   @Getter protected final DagManager.DagId dagId;
 
-  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus,
+      DagActionStore dagActionStore) {
     this.dagAction = dagAction;
     this.leaseObtainedStatus = leaseObtainedStatus;
+    this.dagActionStore = dagActionStore;
     this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
   }
 
   public abstract <T> T host(DagTaskVisitor<T> visitor);
 
   /**
-   * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be 
done in this method.
+   * Any cleanup work, including removing the dagAction from the 
dagActionStore and completing the lease acquired to
+   * work on this task, is done in this method.
    * Returns true if concluding dag task finished successfully otherwise false.
    */
   // todo call it from the right place
   public boolean conclude() {
     try {
+      // If dagAction is not present in store then it has been deleted by 
another participant, so we can skip deletion
+      if (this.dagActionStore.exists(this.dagAction)) {

Review Comment:
   agreed: there's still a check-then-act race condition, so why not just make 
`deleteDagAction` idempotent, and avoid the pre-check?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -146,18 +141,14 @@ public DagTask next() {
   private MultiActiveLeaseArbiter.LeaseAttemptStatus 
retrieveLeaseStatus(DagActionStore.DagAction dagAction)
       throws IOException, SchedulerException {
     MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
-    if (!this.isMultiActiveExecutionEnabled) {
-      leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis(), Long.MAX_VALUE, null);
-    } else {
-      // TODO: need to handle reminder events and flag them
-      leaseAttemptStatus = this.dagActionExecutionLeaseArbiter
-          .tryAcquireLease(dagAction, System.currentTimeMillis(), false, 
false);
-          /* Schedule a reminder for the event unless the lease has been 
completed to safeguard against the case where even
-          we, when we might become the lease owner still fail to complete 
processing
-          */
-      if (!(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus)) {
-        scheduleReminderForEvent(leaseAttemptStatus);
-      }
+    // TODO: need to handle reminder events and flag them
+    leaseAttemptStatus = this.dagActionExecutionLeaseArbiter
+        .tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);

Review Comment:
   `DMTSImpl` could itself take in a `Supplier<Long> currentTimeMillisClock`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to