phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744231904
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -172,19 +159,19 @@ private void
createFlowFinishDeadlineTrigger(DagActionStore.LeaseParams leasePar
Dag.DagNode<JobExecutionPlan> dagNode =
this.dagManagementStateStore.getDag(leaseParams.getDagAction().getDagId()).get().getNodes().get(0);
try {
- timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
+ timeOutForJobFinish = DagUtils.getFlowFinishDeadline(dagNode);
} catch (ConfigException e) {
log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid
format, using default SLA of {}",
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
- DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
- timeOutForJobFinish = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+ ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS);
+ timeOutForJobFinish = ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS;
Review Comment:
can these keys likewise be renamed from SLA => deadline?
##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -203,12 +178,13 @@ public class ServiceConfigKeys {
public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX +
"issueRepo.class";
+ public static final String QUOTA_MANAGER_PREFIX = "UserQuotaManagerPrefix.";
public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
- public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
public static final String NUM_DAG_PROC_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
- public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED =
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+ public static final String JOB_START_SLA_TIME =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME;
+ public static final String JOB_START_SLA_UNITS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT;
+ public static final long DEFAULT_FLOW_SLA_MILLIS =
TimeUnit.HOURS.toMillis(24);
Review Comment:
why add the DPE prefix? (also SLA => deadline)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -271,10 +267,7 @@ protected static MetricNameRegexFilter
getMetricsFilterForDagManager() {
}
public void cleanup() {
- // Add null check so that unit test will not affect each other when we
de-active non-instrumented DagManager
- if(this.metricContext != null &&
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName()))
{
- // The DMThread's metrics mappings follow the lifecycle of the DMThread
itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
- // To avoid IllegalArgumentException by the RMC preventing (re-)add of a
metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
Review Comment:
since we no longer have DMThreads, is the guard and/or the
`getMetricsFilterForDagManager` still needed?
also, should that method (L269, above) be renamed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -249,9 +250,9 @@ public static void
sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStor
public static long getDefaultJobStartDeadline(Config config) {
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
- config, DagManager.JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
- return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config,
DagManager.JOB_START_SLA_TIME,
- ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+ config, ServiceConfigKeys.JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_DEADLINE_TIME_UNIT));
+ return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config,
ServiceConfigKeys.JOB_START_SLA_TIME,
+ ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_DEADLINE_TIME));
Review Comment:
can we rename here too: SLA => deadline?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java:
##########
@@ -60,10 +62,29 @@ public class DagManagerUtilsTest {
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
+ @Test
+ void slaConfigCheck() throws Exception {
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("5", 123456783L,
"FINISH_RUNNING", 1);
+
Assert.assertEquals(DagUtils.getFlowFinishDeadline(dag.getStartNodes().get(0)),
ServiceConfigKeys.DEFAULT_FLOW_SLA_MILLIS);
Review Comment:
also here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -42,7 +42,7 @@
/**
* An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
- * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ * {@link org.apache.gobblin.service.ServiceConfigKeys#JOB_START_SLA_TIME}
time.
Review Comment:
SLA => deadline
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
Map<URI, TopologySpec> topologySpecMap;
private final Config config;
public static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
- public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ public static final String DAG_STATESTORE_CLASS_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"dagStateStoreClass";
Review Comment:
is it possible to avoid qualifying this config key w/ the DPE's name?
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1071,13 +1071,13 @@ public class ConfigurationKeys {
* Configuration properties related to Flows
*/
public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
- public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
- public static final String GOBBLIN_FLOW_SLA_TIME_UNIT =
"gobblin.flow.sla.timeunit";
- public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
- public static final String GOBBLIN_JOB_START_SLA_TIME =
"gobblin.job.start.sla.time";
- public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT =
"gobblin.job.start.sla.timeunit";
- public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
- public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
+ public static final String GOBBLIN_FLOW_DEADLINE_TIME =
"gobblin.flow.deadline.time";
+ public static final String GOBBLIN_FLOW_DEADLINE_TIME_UNIT =
"gobblin.flow.deadline.timeunit";
Review Comment:
suggest `gobblin.flow.completion.deadline.time` or
`gobblin.flow.finish.deadline.time` to parallel
`gobblin.job.start.deadline.time` (rather than dropping one segment entirely)
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagUtilsTest.java:
##########
@@ -60,10 +62,29 @@ public class DagManagerUtilsTest {
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
+ @Test
+ void slaConfigCheck() throws Exception {
Review Comment:
shall we rename SLA => deadline?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]