[ 
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=932527&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-932527
 ]

ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Aug/24 23:04
            Start Date: 29/Aug/24 23:04
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1737127941


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -168,21 +160,6 @@ protected void shutDown() throws Exception {
     this.listeners.close();
   }
 
-  /***************************************************
-   /* Catalog listeners                              *
-   /**************************************************/
-
-  protected void notifyAllListeners() {
-    try {
-      Iterator<URI> uriIterator = getSpecURIs();
-      while (uriIterator.hasNext()) {
-        this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
-      }
-    } catch (IOException e) {
-      log.error("Cannot retrieve specs from catalog:", e);
-    }
-  }
-

Review Comment:
   wondering on this...
   there's still an `addListener` method.  is there no any need to 
`notifyListeners`?



##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -205,10 +163,10 @@ public class ServiceConfigKeys {
   public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + 
"issueRepo.class";
 
   public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
-  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
   public static final String NUM_DAG_PROC_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
   public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
   public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
-  public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = 
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+  public static final String JOB_START_SLA_TIME = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+  public static final String JOB_START_SLA_UNITS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+  public static final long DEFAULT_FLOW_SLA_MILLIS = 
TimeUnit.HOURS.toMillis(24);

Review Comment:
   three thoughts here:
   
   1. previously the property included the `.dagManager` segment:
   ```
     // Default job start SLA time if configured, measured in minutes. Default 
is 10 minutes
     // todo - rename "sla" -> "deadline", and move them to DagProcUtils
     public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
    ```
   should we continue to accept that legacy prop name so existing flow defs 
remain b/w compat?
   
   2. as we introduce a new prop name, let's take the opportunity to align 
naming and semantics by calling it `start.deadline`, rather than SLA
   
   3. given that `.dagProcessingEngine` is as much impl. detail as 
`.dagManager`, it may not belong in the customer-facing name.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -74,7 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
   // to these data structures.
   @Getter
   @Setter
-  protected final Map<URI, TopologySpec> topologySpecMap;
+  protected Map<URI, TopologySpec> topologySpecMap;

Review Comment:
   I didn't notice why this wouldn't still be `final`.  is a derived class 
assigning to it?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -55,7 +55,7 @@
 
 @Slf4j
 public class DagManagerMetrics {

Review Comment:
   rename to `DagMetrics` or leave as-is?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -299,4 +303,43 @@ public int hashCode() {
   public String toString() {
     return this.getNodes().stream().map(node -> 
node.getValue().toString()).collect(Collectors.toList()).toString();
   }
+
+  public enum FlowState {
+    FAILED(-1),
+    RUNNING(0),
+    SUCCESSFUL(1);
+
+    public final int value;
+
+    FlowState(int value) {
+      this.value = value;
+    }
+  }
+
+  @Getter
+  @EqualsAndHashCode
+  public static class DagId {
+    String flowGroup;
+    String flowName;
+    long flowExecutionId;
+
+    public DagId(String flowGroup, String flowName, long flowExecutionId) {
+      this.flowGroup = flowGroup;
+      this.flowName = flowName;
+      this.flowExecutionId = flowExecutionId;
+    }

Review Comment:
   `@AllArgsCtor`?  actually if you can make the three fields `final`, `@Data` 
should take care of many of these things for you
   
   (not clear if that's possible, since the fields aren't `private`... even if 
not `@Data` and/or `final`, can we make them `private`?)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -37,9 +38,9 @@
  */
 @Slf4j
 abstract public class AbstractUserQuotaManager implements UserQuotaManager {
-  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
-  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
-  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String PER_USER_QUOTA = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
"perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
"defaultJobQuota";

Review Comment:
   again, should we honor the old prop names for b/w compat?
   
   also, when we devise a new name to deprecate the old one, let's avoid 
embedding impl-specific identifiers in what should otherwise be a user-level 
config option w/ clear and steady semantics



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -115,6 +109,31 @@ protected void shutDown()
     this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, 
TimeUnit.SECONDS);
   }
 
+  /**
+   * Action to be performed on a {@link Dag}, in case of a job failure. 
Currently, we allow 2 modes:
+   * <ul>
+   *   <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
+   *   <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to 
finish, as long as all the dependencies
+   *   of the job are successful.</li>
+   * </ul>
+   */
+  public enum FailureOption {
+    FINISH_RUNNING("FINISH_RUNNING"),
+    CANCEL("CANCEL"),
+    FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
+
+    private final String failureOption;
+
+    FailureOption(final String failureOption) {
+      this.failureOption = failureOption;
+    }
+
+    @Override
+    public String toString() {
+      return this.failureOption;
+    }
+  }

Review Comment:
   somehow I though that:
   ```
   public enum FailureOption {
     FINISH_RUNNING, CANCEL, FINISH_ALL_POSSIBLE;
   }
   ```
   that the `.toString()` would automatically print the enum's name, no?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
   Map<URI, TopologySpec> topologySpecMap;
   private final Config config;
   public static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
-  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
"dagStateStoreClass";

Review Comment:
   will this render all present configs silently ignored?
   
   (presuming there's a default... if not, I expect it would instead likely 
fail fast)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -57,31 +57,25 @@
 @Singleton
 public class DagProcessingEngine extends AbstractIdleService {
 
-  @Getter private final Optional<DagTaskStream> dagTaskStream;
-  @Getter Optional<DagManagementStateStore> dagManagementStateStore;
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
   private final Config config;
-  private final Optional<DagProcFactory> dagProcFactory;
+  private final DagProcFactory dagProcFactory;
   private ScheduledExecutorService scheduledExecutorPool;
   private final DagProcessingEngineMetrics dagProcEngineMetrics;
   private static final Integer TERMINATION_TIMEOUT = 30;
   public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = 
"defaultJobStartDeadlineTimeMillis";
   @Getter static long defaultJobStartSlaTimeMillis;
+  public static final String DEFAULT_FLOW_FAILURE_OPTION = 
DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name();

Review Comment:
   we don't need to use `DagProcessingEngine.` qualifier inside that same 
class, do we?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -271,8 +267,7 @@ protected static MetricNameRegexFilter 
getMetricsFilterForDagManager() {
   }
 
   public void cleanup() {
-    // Add null check so that unit test will not affect each other when we 
de-active non-instrumented DagManager
-    if(this.metricContext != null && 
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName()))
 {
+    if (this.metricContext != null && 
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManagerMetrics.class.getSimpleName()))
 {
       // The DMThread's metrics mappings follow the lifecycle of the DMThread 
itself and so are lost by DM deactivation-reactivation but the 
RootMetricContext is a (persistent) singleton.
       // To avoid IllegalArgumentException by the RMC preventing (re-)add of a 
metric already known, remove all metrics that a new DMThread thread would 
attempt to add (in DagManagerThread::initialize) whenever running 
post-re-enablement
       RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());

Review Comment:
   this code may need reworking (since aspects recorded in the comments seem to 
be changing), but at the least, update the remaining comment and also consider 
whether to rename `getMetricsFilterForDagManager()`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -27,17 +27,15 @@
 
 
 /**
- * An interface for storing and retrieving currently running {@link 
Dag<JobExecutionPlan>}s. In case of a leadership
- * change in the {@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager}, the 
corresponding {@link DagManager}
- * loads the running {@link Dag}s from the {@link DagStateStore} to resume 
their execution.
+ * An interface for storing and retrieving currently running {@link 
Dag<JobExecutionPlan>}s.
  */
 @Alpha

Review Comment:
   shall we remove?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -84,8 +81,4 @@ default Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) 
throws IOException
    */
   @Deprecated
   Set<String> getDagIds() throws IOException;
-
-  default boolean existsDag(DagManager.DagId dagId) throws IOException {
-    throw new UnsupportedOperationException("containsDag not implemented");
-  }

Review Comment:
   interesting to see this go away...  is the thinking that one would merely 
`getDag` rather than check for existence?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java:
##########
@@ -136,11 +135,6 @@ public void cleanUp(String dagId)
     this.totalDagCount.dec();

Review Comment:
   is this in-memory metric still valid in a multi-leader ensemble?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -26,9 +26,9 @@
 
 /**
  * Manages the statically configured user quotas for the proxy user in 
user.to.proxy configuration, the API requester(s)
- * and the flow group.
- * It is used by the {@link DagManager} to ensure that the number of currently 
running jobs do not exceed the quota, if
- * the quota is exceeded, the execution will fail without running on the 
underlying executor.
+ * and the flow group. It is used by the {@link 
org.apache.gobblin.service.modules.orchestration.proc.DagProc} to ensure

Review Comment:
   specifically, is it the `LaunchDagProc`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -63,7 +63,7 @@ protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore,
     }
 
     Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
-    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    long timeOutForJobStart = DagUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());

Review Comment:
   let's use the class-rename as an opportunity to update the method too - 
`getJobStartDeadline`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########


Review Comment:
   I do believe they should be separate... and since they're such closely named 
classes now, consider augmenting the javadoc that these utils contain 
functionality for *processing* a DAG, whereas `DagUtils` is for *interrogating* 
a DAG.  the javadoc for each might reference the other (circularity should be 
ok for javadoc)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
 
 
 /**
- * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the 
LaunchDagProc and Orchestrator when launching
  * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's 
responsibility is to
+ * process out dag action requests. However, with launch executions now being 
stored in the DagActionStateStore, on
+ * restart, the LaunchDagProc has to perform validations before executing any 
launch actions the previous LaunchDagProc

Review Comment:
   the LaunchDP was unable to complete?  more like, "that are not yet fully 
processed".



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -48,8 +48,8 @@ public 
EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask enforce
   protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
       DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
-    long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
-    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+    long flowFinishDeadline = DagUtils.getFlowSLA(dagNode);

Review Comment:
   let's use the class-rename as an opportunity to update the method too - 
`getFlowFinishDeadline`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -67,64 +61,51 @@
  */
 @Slf4j
 public class DagActionStoreChangeMonitor extends HighLevelConsumer<String, 
DagActionStoreChangeEvent> {

Review Comment:
   I realize we no longer have a need for both `DagActionStoreCM` and also 
`DagManagementDagActionStoreCM`, but it's much clearer to read AND less error 
prone to keep the latter class as-is, rather than porting over what's different 
about it here into what had been its base class.
   
   I'm not against eventually consolidating those into something like this, but 
doing so in a PR w/ >110 files carries unnecessary risk that we may have 
introduced a small error in the porting.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -204,7 +204,9 @@ protected void createMetrics() {
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
     this.duplicateMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES);
     this.heartbeatMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES);
-    this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
-    this.getMetricContext().register(this.produceToConsumeDelayMillis);
+   // Reports delay from all partitions in one gauge
+   ContextAwareGauge<Long> produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(

Review Comment:
   all the other metrics have an instance member.  why use a local for this one?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
 
 
 /**
- * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the 
LaunchDagProc and Orchestrator when launching
  * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's 
responsibility is to
+ * process out dag action requests. However, with launch executions now being 
stored in the DagActionStateStore, on

Review Comment:
   extra "out" in "process out dag"... but this could simply be:
   > creates {@link DagAction}s, which the {@link DPE} is responsible for 
processing.



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java:
##########
@@ -45,8 +48,8 @@ public void setUp() {
   // Tests that if exceeding the quota on startup, do not throw an exception 
and do not decrement the counter
   @Test
   public void testExceedsQuotaOnStartup() throws Exception {
-    List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user", 
ConfigFactory.empty());
-    // Ensure that the current attempt is 1, normally done by DagManager
+    List<Dag<JobExecutionPlan>> dags = DagTestUtils.buildDagList(2, "user", 
ConfigFactory.empty());
+    // Ensure that the current attempt is 1, normally done by DagProcs

Review Comment:
   which one, launch?  reeval?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -79,50 +78,47 @@ public class GitConfigMonitorTest {
   private final File testFlowFile2 = new File(testGroupDir, TEST_FLOW_FILE2);
   private final File testFlowFile3 = new File(testGroupDir, TEST_FLOW_FILE3);
 
-  private RefSpec masterRefSpec = new RefSpec("master");
+  private final RefSpec masterRefSpec = new RefSpec("master");
   private FlowCatalog flowCatalog;
-  private SpecCatalogListener mockListener;
-  private Config config;
   private GitConfigMonitor gitConfigMonitor;
 
 
   @BeforeClass
   public void setup() throws Exception {
-    cleanUpDir(TEST_DIR);
+    cleanUpDir();
 
     // Create a bare repository
     RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, 
FS.DETECTED);
-    this.remoteRepo = fileKey.open(false);
-    this.remoteRepo.create(true);
+    Repository remoteRepo = fileKey.open(false);
+    remoteRepo.create(true);
 
-    this.gitForPush = 
Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+    this.gitForPush = 
Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
 
     // push an empty commit as a base for detecting changes
     this.gitForPush.commit().setMessage("First commit").call();
     
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
 
-    this.config = ConfigBuilder.create()
+    Config config = ConfigBuilder.create()
         .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_URI,
-            this.remoteRepo.getDirectory().getAbsolutePath())
-        .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
-        .addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + 
"flowCatalog")
-        .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
-        .build();
+            remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_DIR,
+            TEST_DIR + 
"/jobConfig").addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + 
"flowCatalog")
+        .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 
5).build();
 
     this.flowCatalog = new FlowCatalog(config);
 
-    this.mockListener = mock(SpecCatalogListener.class);
-    
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
-    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+    
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse<>(""));
 
     this.flowCatalog.addListener(mockListener);
     this.flowCatalog.startAsync().awaitRunning();
-    this.gitConfigMonitor = new GitConfigMonitor(this.config, 
this.flowCatalog);
+    this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
     this.gitConfigMonitor.setActive(true);
   }
 
-  private void cleanUpDir(String dir) {
-    File specStoreDir = new File(dir);
+  private void cleanUpDir() {
+    File specStoreDir = new File(GitConfigMonitorTest.TEST_DIR);

Review Comment:
   NBD, but I personally favor hard-coding params, rather than dropping them to 
hard-code the impl.  the former is not only clearer to read, but more 
future-proof



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -78,32 +76,19 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   private final Config config;
   @Getter private final EventSubmitter eventSubmitter;
   protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
-  protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
-  private final boolean isMultiActiveExecutionEnabled;
+  protected DagActionReminderScheduler dagActionReminderScheduler;
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
   private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue = 
new LinkedBlockingQueue<>();
   private final DagManagementStateStore dagManagementStateStore;
   private final DagProcessingEngineMetrics dagProcEngineMetrics;
 
   @Inject
-  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore,
-      @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME) 
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,

Review Comment:
   don't we need this name for init/config-time discernment between two 
different versions of the MALA?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java:
##########
@@ -81,7 +83,7 @@ public static Dag<JobExecutionPlan> buildDag(String id, Long 
flowExecutionId) th
           addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
           addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
       if (i > 0) {
-        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job" + (i - 1)));
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));

Review Comment:
   semantics have changed here.  if we're confident we always want "job0", 
should we remove the `if (i > 0)` (or change it to `if (i == 1)`)?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -241,14 +237,9 @@ public void testForcedPushConfig() throws IOException, 
GitAPIException, URISynta
 
     Collection<Spec> specs = this.flowCatalog.getSpecs();
 
-    Assert.assertTrue(specs.size() == 2);
+    Assert.assertEquals(specs.size(), 2);
     List<Spec> specList = Lists.newArrayList(specs);
-    specList.sort(new Comparator<Spec>() {
-      @Override
-      public int compare(Spec o1, Spec o2) {
-        return o1.getUri().compareTo(o2.getUri());
-      }
-    });
+    specList.sort(Comparator.comparing(Spec::getUri));

Review Comment:
   great improvement!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -329,22 +207,15 @@ public void remove(Spec spec, Properties headers) throws 
IOException {
     if (spec instanceof FlowSpec) {
       String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
       String flowName = FlowSpec.Utils.getFlowName(uri);
-      if (this.flowLaunchHandler.isPresent()) {
-        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
-        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
-
-        for (long flowExecutionId : flowExecutionIds) {
-        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
-            DagActionStore.DagActionType.KILL);
-        DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(killDagAction, false,
-            System.currentTimeMillis());
-        flowLaunchHandler.get().handleFlowKillTriggerEvent(new Properties(), 
leaseParams);
-        }
-      } else {
-        //Send the dag to the DagManager to stop it.
-        //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-        this.dagManager.stopDag(uri);
+      List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+      _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+      for (long flowExecutionId : flowExecutionIds) {
+      DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,

Review Comment:
   indentation



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########


Review Comment:
   is this the right class?  isn't this more of a "JobSpecTest"?





Issue Time Tracking
-------------------

            Worklog Id:     (was: 932527)
    Remaining Estimate: 0h
            Time Spent: 10m

> remove obsolete code related to DagManager
> ------------------------------------------
>
>                 Key: GOBBLIN-2136
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to