phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1737127941
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -168,21 +160,6 @@ protected void shutDown() throws Exception {
this.listeners.close();
}
- /***************************************************
- /* Catalog listeners *
- /**************************************************/
-
- protected void notifyAllListeners() {
- try {
- Iterator<URI> uriIterator = getSpecURIs();
- while (uriIterator.hasNext()) {
- this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
- }
- } catch (IOException e) {
- log.error("Cannot retrieve specs from catalog:", e);
- }
- }
-
Review Comment:
wondering on this...
there's still an `addListener` method. is there no any need to
`notifyListeners`?
##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -205,10 +163,10 @@ public class ServiceConfigKeys {
public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX +
"issueRepo.class";
public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
- public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
public static final String NUM_DAG_PROC_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
- public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED =
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+ public static final String JOB_START_SLA_TIME =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+ public static final String JOB_START_SLA_UNITS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+ public static final long DEFAULT_FLOW_SLA_MILLIS =
TimeUnit.HOURS.toMillis(24);
Review Comment:
three thoughts here:
1. previously the property included the `.dagManager` segment:
```
// Default job start SLA time if configured, measured in minutes. Default
is 10 minutes
// todo - rename "sla" -> "deadline", and move them to DagProcUtils
public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
```
should we continue to accept that legacy prop name so existing flow defs
remain b/w compat?
2. as we introduce a new prop name, let's take the opportunity to align
naming and semantics by calling it `start.deadline`, rather than SLA
3. given that `.dagProcessingEngine` is as much impl. detail as
`.dagManager`, it may not belong in the customer-facing name.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -74,7 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
// to these data structures.
@Getter
@Setter
- protected final Map<URI, TopologySpec> topologySpecMap;
+ protected Map<URI, TopologySpec> topologySpecMap;
Review Comment:
I didn't notice why this wouldn't still be `final`. is a derived class
assigning to it?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -55,7 +55,7 @@
@Slf4j
public class DagManagerMetrics {
Review Comment:
rename to `DagMetrics` or leave as-is?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -299,4 +303,43 @@ public int hashCode() {
public String toString() {
return this.getNodes().stream().map(node ->
node.getValue().toString()).collect(Collectors.toList()).toString();
}
+
+ public enum FlowState {
+ FAILED(-1),
+ RUNNING(0),
+ SUCCESSFUL(1);
+
+ public final int value;
+
+ FlowState(int value) {
+ this.value = value;
+ }
+ }
+
+ @Getter
+ @EqualsAndHashCode
+ public static class DagId {
+ String flowGroup;
+ String flowName;
+ long flowExecutionId;
+
+ public DagId(String flowGroup, String flowName, long flowExecutionId) {
+ this.flowGroup = flowGroup;
+ this.flowName = flowName;
+ this.flowExecutionId = flowExecutionId;
+ }
Review Comment:
`@AllArgsCtor`? actually if you can make the three fields `final`, `@Data`
should take care of many of these things for you
(not clear if that's possible, since the fields aren't `private`... even if
not `@Data` and/or `final`, can we make them `private`?)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -37,9 +38,9 @@
*/
@Slf4j
abstract public class AbstractUserQuotaManager implements UserQuotaManager {
- public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
- public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
- public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String PER_USER_QUOTA =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"defaultJobQuota";
Review Comment:
again, should we honor the old prop names for b/w compat?
also, when we devise a new name to deprecate the old one, let's avoid
embedding impl-specific identifiers in what should otherwise be a user-level
config option w/ clear and steady semantics
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -115,6 +109,31 @@ protected void shutDown()
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT,
TimeUnit.SECONDS);
}
+ /**
+ * Action to be performed on a {@link Dag}, in case of a job failure.
Currently, we allow 2 modes:
+ * <ul>
+ * <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
+ * <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to
finish, as long as all the dependencies
+ * of the job are successful.</li>
+ * </ul>
+ */
+ public enum FailureOption {
+ FINISH_RUNNING("FINISH_RUNNING"),
+ CANCEL("CANCEL"),
+ FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
+
+ private final String failureOption;
+
+ FailureOption(final String failureOption) {
+ this.failureOption = failureOption;
+ }
+
+ @Override
+ public String toString() {
+ return this.failureOption;
+ }
+ }
Review Comment:
somehow I though that:
```
public enum FailureOption {
FINISH_RUNNING, CANCEL, FINISH_ALL_POSSIBLE;
}
```
that the `.toString()` would automatically print the enum's name, no?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
Map<URI, TopologySpec> topologySpecMap;
private final Config config;
public static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
- public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ public static final String DAG_STATESTORE_CLASS_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"dagStateStoreClass";
Review Comment:
will this render all present configs silently ignored?
(presuming there's a default... if not, I expect it would instead likely
fail fast)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -57,31 +57,25 @@
@Singleton
public class DagProcessingEngine extends AbstractIdleService {
- @Getter private final Optional<DagTaskStream> dagTaskStream;
- @Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ @Getter private final DagTaskStream dagTaskStream;
+ @Getter DagManagementStateStore dagManagementStateStore;
private final Config config;
- private final Optional<DagProcFactory> dagProcFactory;
+ private final DagProcFactory dagProcFactory;
private ScheduledExecutorService scheduledExecutorPool;
private final DagProcessingEngineMetrics dagProcEngineMetrics;
private static final Integer TERMINATION_TIMEOUT = 30;
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS =
"defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartSlaTimeMillis;
+ public static final String DEFAULT_FLOW_FAILURE_OPTION =
DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name();
Review Comment:
we don't need to use `DagProcessingEngine.` qualifier inside that same
class, do we?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -271,8 +267,7 @@ protected static MetricNameRegexFilter
getMetricsFilterForDagManager() {
}
public void cleanup() {
- // Add null check so that unit test will not affect each other when we
de-active non-instrumented DagManager
- if(this.metricContext != null &&
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName()))
{
+ if (this.metricContext != null &&
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManagerMetrics.class.getSimpleName()))
{
// The DMThread's metrics mappings follow the lifecycle of the DMThread
itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a
metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
Review Comment:
this code may need reworking (since aspects recorded in the comments seem to
be changing), but at the least, update the remaining comment and also consider
whether to rename `getMetricsFilterForDagManager()`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -27,17 +27,15 @@
/**
- * An interface for storing and retrieving currently running {@link
Dag<JobExecutionPlan>}s. In case of a leadership
- * change in the {@link
org.apache.gobblin.service.modules.core.GobblinServiceManager}, the
corresponding {@link DagManager}
- * loads the running {@link Dag}s from the {@link DagStateStore} to resume
their execution.
+ * An interface for storing and retrieving currently running {@link
Dag<JobExecutionPlan>}s.
*/
@Alpha
Review Comment:
shall we remove?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -84,8 +81,4 @@ default Dag<JobExecutionPlan> getDag(DagManager.DagId dagId)
throws IOException
*/
@Deprecated
Set<String> getDagIds() throws IOException;
-
- default boolean existsDag(DagManager.DagId dagId) throws IOException {
- throw new UnsupportedOperationException("containsDag not implemented");
- }
Review Comment:
interesting to see this go away... is the thinking that one would merely
`getDag` rather than check for existence?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java:
##########
@@ -136,11 +135,6 @@ public void cleanUp(String dagId)
this.totalDagCount.dec();
Review Comment:
is this in-memory metric still valid in a multi-leader ensemble?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -26,9 +26,9 @@
/**
* Manages the statically configured user quotas for the proxy user in
user.to.proxy configuration, the API requester(s)
- * and the flow group.
- * It is used by the {@link DagManager} to ensure that the number of currently
running jobs do not exceed the quota, if
- * the quota is exceeded, the execution will fail without running on the
underlying executor.
+ * and the flow group. It is used by the {@link
org.apache.gobblin.service.modules.orchestration.proc.DagProc} to ensure
Review Comment:
specifically, is it the `LaunchDagProc`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -63,7 +63,7 @@ protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore,
}
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeToCheckDeadline.getLeft().get();
- long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ long timeOutForJobStart = DagUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
Review Comment:
let's use the class-rename as an opportunity to update the method too -
`getJobStartDeadline`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
Review Comment:
I do believe they should be separate... and since they're such closely named
classes now, consider augmenting the javadoc that these utils contain
functionality for *processing* a DAG, whereas `DagUtils` is for *interrogating*
a DAG. the javadoc for each might reference the other (circularity should be
ok for javadoc)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
/**
- * Helper class with functionality meant to be re-used between the DagManager
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the
LaunchDagProc and Orchestrator when launching
* executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's
responsibility is to
+ * process out dag action requests. However, with launch executions now being
stored in the DagActionStateStore, on
+ * restart, the LaunchDagProc has to perform validations before executing any
launch actions the previous LaunchDagProc
Review Comment:
the LaunchDP was unable to complete? more like, "that are not yet fully
processed".
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -48,8 +48,8 @@ public
EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask enforce
protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
- long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
- long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+ long flowFinishDeadline = DagUtils.getFlowSLA(dagNode);
Review Comment:
let's use the class-rename as an opportunity to update the method too -
`getFlowFinishDeadline`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -67,64 +61,51 @@
*/
@Slf4j
public class DagActionStoreChangeMonitor extends HighLevelConsumer<String,
DagActionStoreChangeEvent> {
Review Comment:
I realize we no longer have a need for both `DagActionStoreCM` and also
`DagManagementDagActionStoreCM`, but it's much clearer to read AND less error
prone to keep the latter class as-is, rather than porting over what's different
about it here into what had been its base class.
I'm not against eventually consolidating those into something like this, but
doing so in a PR w/ >110 files carries unnecessary risk that we may have
introduced a small error in the porting.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -204,7 +204,9 @@ protected void createMetrics() {
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
this.duplicateMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES);
this.heartbeatMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES);
- this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
- this.getMetricContext().register(this.produceToConsumeDelayMillis);
+ // Reports delay from all partitions in one gauge
+ ContextAwareGauge<Long> produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(
Review Comment:
all the other metrics have an instance member. why use a local for this one?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
/**
- * Helper class with functionality meant to be re-used between the DagManager
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the
LaunchDagProc and Orchestrator when launching
* executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's
responsibility is to
+ * process out dag action requests. However, with launch executions now being
stored in the DagActionStateStore, on
Review Comment:
extra "out" in "process out dag"... but this could simply be:
> creates {@link DagAction}s, which the {@link DPE} is responsible for
processing.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java:
##########
@@ -45,8 +48,8 @@ public void setUp() {
// Tests that if exceeding the quota on startup, do not throw an exception
and do not decrement the counter
@Test
public void testExceedsQuotaOnStartup() throws Exception {
- List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user",
ConfigFactory.empty());
- // Ensure that the current attempt is 1, normally done by DagManager
+ List<Dag<JobExecutionPlan>> dags = DagTestUtils.buildDagList(2, "user",
ConfigFactory.empty());
+ // Ensure that the current attempt is 1, normally done by DagProcs
Review Comment:
which one, launch? reeval?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -79,50 +78,47 @@ public class GitConfigMonitorTest {
private final File testFlowFile2 = new File(testGroupDir, TEST_FLOW_FILE2);
private final File testFlowFile3 = new File(testGroupDir, TEST_FLOW_FILE3);
- private RefSpec masterRefSpec = new RefSpec("master");
+ private final RefSpec masterRefSpec = new RefSpec("master");
private FlowCatalog flowCatalog;
- private SpecCatalogListener mockListener;
- private Config config;
private GitConfigMonitor gitConfigMonitor;
@BeforeClass
public void setup() throws Exception {
- cleanUpDir(TEST_DIR);
+ cleanUpDir();
// Create a bare repository
RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir,
FS.DETECTED);
- this.remoteRepo = fileKey.open(false);
- this.remoteRepo.create(true);
+ Repository remoteRepo = fileKey.open(false);
+ remoteRepo.create(true);
- this.gitForPush =
Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+ this.gitForPush =
Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
// push an empty commit as a base for detecting changes
this.gitForPush.commit().setMessage("First commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
- this.config = ConfigBuilder.create()
+ Config config = ConfigBuilder.create()
.addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." +
ConfigurationKeys.GIT_MONITOR_REPO_URI,
- this.remoteRepo.getDirectory().getAbsolutePath())
- .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." +
ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
- .addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR +
"flowCatalog")
- .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
- .build();
+ remoteRepo.getDirectory().getAbsolutePath())
+ .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." +
ConfigurationKeys.GIT_MONITOR_REPO_DIR,
+ TEST_DIR +
"/jobConfig").addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR +
"flowCatalog")
+ .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL,
5).build();
this.flowCatalog = new FlowCatalog(config);
- this.mockListener = mock(SpecCatalogListener.class);
-
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
- when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse<>(""));
this.flowCatalog.addListener(mockListener);
this.flowCatalog.startAsync().awaitRunning();
- this.gitConfigMonitor = new GitConfigMonitor(this.config,
this.flowCatalog);
+ this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
this.gitConfigMonitor.setActive(true);
}
- private void cleanUpDir(String dir) {
- File specStoreDir = new File(dir);
+ private void cleanUpDir() {
+ File specStoreDir = new File(GitConfigMonitorTest.TEST_DIR);
Review Comment:
NBD, but I personally favor hard-coding params, rather than dropping them to
hard-code the impl. the former is not only clearer to read, but more
future-proof
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -78,32 +76,19 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
private final Config config;
@Getter private final EventSubmitter eventSubmitter;
protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
- protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
- private final boolean isMultiActiveExecutionEnabled;
+ protected DagActionReminderScheduler dagActionReminderScheduler;
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue =
new LinkedBlockingQueue<>();
private final DagManagementStateStore dagManagementStateStore;
private final DagProcessingEngineMetrics dagProcEngineMetrics;
@Inject
- public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore,
- @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME)
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
Review Comment:
don't we need this name for init/config-time discernment between two
different versions of the MALA?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java:
##########
@@ -81,7 +83,7 @@ public static Dag<JobExecutionPlan> buildDag(String id, Long
flowExecutionId) th
addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId).
addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
if (i > 0) {
- jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job" + (i - 1)));
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
Review Comment:
semantics have changed here. if we're confident we always want "job0",
should we remove the `if (i > 0)` (or change it to `if (i == 1)`)?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -241,14 +237,9 @@ public void testForcedPushConfig() throws IOException,
GitAPIException, URISynta
Collection<Spec> specs = this.flowCatalog.getSpecs();
- Assert.assertTrue(specs.size() == 2);
+ Assert.assertEquals(specs.size(), 2);
List<Spec> specList = Lists.newArrayList(specs);
- specList.sort(new Comparator<Spec>() {
- @Override
- public int compare(Spec o1, Spec o2) {
- return o1.getUri().compareTo(o2.getUri());
- }
- });
+ specList.sort(Comparator.comparing(Spec::getUri));
Review Comment:
great improvement!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -329,22 +207,15 @@ public void remove(Spec spec, Properties headers) throws
IOException {
if (spec instanceof FlowSpec) {
String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
String flowName = FlowSpec.Utils.getFlowName(uri);
- if (this.flowLaunchHandler.isPresent()) {
- List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
- _log.info("Found {} flows to cancel.", flowExecutionIds.size());
-
- for (long flowExecutionId : flowExecutionIds) {
- DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
- DagActionStore.DagActionType.KILL);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(killDagAction, false,
- System.currentTimeMillis());
- flowLaunchHandler.get().handleFlowKillTriggerEvent(new Properties(),
leaseParams);
- }
- } else {
- //Send the dag to the DagManager to stop it.
- //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.stopDag(uri);
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
Review Comment:
indentation
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########
Review Comment:
is this the right class? isn't this more of a "JobSpecTest"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]