phet commented on code in PR #3942:
URL: https://github.com/apache/gobblin/pull/3942#discussion_r1589482296
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -383,6 +386,7 @@ private void registerServicesInLauncher(){
}
this.serviceLauncher.addService(dagManager);
+ this.serviceLauncher.addService(dagProcessingEngine);
Review Comment:
always added as a svc? as we do this (pre-)rollout and maybe even during
the initial ramp, I'd expect it might be conditional based on config
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -50,31 +52,51 @@
@Alpha
@Slf4j
@Singleton
-public class DagProcessingEngine {
+@AllArgsConstructor
+public class DagProcessingEngine extends AbstractIdleService {
@Getter private final Optional<DagTaskStream> dagTaskStream;
@Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ private final Config config;
+ private final Optional<DagProcFactory> dagProcFactory;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private static final Integer TERMINATION_TIMEOUT = 30;
@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream,
Optional<DagProcFactory> dagProcFactory,
Optional<DagManagementStateStore> dagManagementStateStore) {
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.config = config;
+ this.dagProcFactory = dagProcFactory;
+ log.info("DagProcessingEngine instantiated.");
+ }
+
+ @Override
+ protected void startUp() {
Integer numThreads = ConfigUtils.getInt
(config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY,
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
- ScheduledExecutorService scheduledExecutorPool =
+ this.scheduledExecutorPool =
Executors.newScheduledThreadPool(numThreads,
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("DagProcessingEngineThread")));
- this.dagTaskStream = dagTaskStream;
- this.dagManagementStateStore = dagManagementStateStore;
-
for (int i=0; i < numThreads; i++) {
// todo - set metrics for count of active DagProcEngineThread
DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream.get(), dagProcFactory.get(),
dagManagementStateStore.get());
- scheduledExecutorPool.submit(dagProcEngineThread);
+ this.scheduledExecutorPool.submit(dagProcEngineThread);
+ log.info("DagProcEngineThread " + i + " started.");
Review Comment:
suggest to put logging in the first line of `DPET.run()`. (give the DPETs a
name of numeric ID so they can self-identify)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -50,31 +52,51 @@
@Alpha
@Slf4j
@Singleton
-public class DagProcessingEngine {
+@AllArgsConstructor
+public class DagProcessingEngine extends AbstractIdleService {
@Getter private final Optional<DagTaskStream> dagTaskStream;
@Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ private final Config config;
+ private final Optional<DagProcFactory> dagProcFactory;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private static final Integer TERMINATION_TIMEOUT = 30;
@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream,
Optional<DagProcFactory> dagProcFactory,
Optional<DagManagementStateStore> dagManagementStateStore) {
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.config = config;
+ this.dagProcFactory = dagProcFactory;
+ log.info("DagProcessingEngine instantiated.");
+ }
+
+ @Override
+ protected void startUp() {
Integer numThreads = ConfigUtils.getInt
(config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY,
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
- ScheduledExecutorService scheduledExecutorPool =
+ this.scheduledExecutorPool =
Executors.newScheduledThreadPool(numThreads,
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("DagProcessingEngineThread")));
- this.dagTaskStream = dagTaskStream;
- this.dagManagementStateStore = dagManagementStateStore;
-
for (int i=0; i < numThreads; i++) {
// todo - set metrics for count of active DagProcEngineThread
DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream.get(), dagProcFactory.get(),
dagManagementStateStore.get());
- scheduledExecutorPool.submit(dagProcEngineThread);
+ this.scheduledExecutorPool.submit(dagProcEngineThread);
+ log.info("DagProcEngineThread " + i + " started.");
}
}
+ @Override
+ protected void shutDown()
+ throws Exception {
+ log.info("DagProcessingEngine shutting down.");
Review Comment:
suggest logging `awaitTermination` timeout too
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java:
##########
@@ -109,7 +109,7 @@ public GobblinServiceConfiguration(String serviceName,
String serviceId, Config
this.isWarmStandbyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);
- this.isMultiActiveExecutionEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED, false);
+ this.isMultiActiveExecutionEnabled = true;
Review Comment:
makes sense... don't forget about this :)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -50,31 +52,51 @@
@Alpha
@Slf4j
@Singleton
-public class DagProcessingEngine {
+@AllArgsConstructor
+public class DagProcessingEngine extends AbstractIdleService {
@Getter private final Optional<DagTaskStream> dagTaskStream;
@Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ private final Config config;
+ private final Optional<DagProcFactory> dagProcFactory;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private static final Integer TERMINATION_TIMEOUT = 30;
@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream,
Optional<DagProcFactory> dagProcFactory,
Optional<DagManagementStateStore> dagManagementStateStore) {
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.config = config;
+ this.dagProcFactory = dagProcFactory;
+ log.info("DagProcessingEngine instantiated.");
Review Comment:
still keep this log?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -63,7 +63,11 @@ protected Optional<Dag<JobExecutionPlan>>
initialize(DagManagementStateStore dag
try {
FlowSpec flowSpec =
dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId()));
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
getDagId().getFlowExecutionId());
- return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ Optional<Dag<JobExecutionPlan>> dag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ if (dag.isPresent()) {
+ dagManagementStateStore.checkpointDag(dag.get());
+ }
+ return dag;
Review Comment:
slight abusage of `Optional.filter`, but I'd probably simplify as:
```
return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil().filter(theDag
-> {
dagManagementStateStore.checkpointDag(theDag);
return true;
});
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -50,31 +52,51 @@
@Alpha
@Slf4j
@Singleton
-public class DagProcessingEngine {
+@AllArgsConstructor
+public class DagProcessingEngine extends AbstractIdleService {
@Getter private final Optional<DagTaskStream> dagTaskStream;
@Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ private final Config config;
+ private final Optional<DagProcFactory> dagProcFactory;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private static final Integer TERMINATION_TIMEOUT = 30;
@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream,
Optional<DagProcFactory> dagProcFactory,
Optional<DagManagementStateStore> dagManagementStateStore) {
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.config = config;
+ this.dagProcFactory = dagProcFactory;
Review Comment:
given `startup` immediately calls `.get` on these three `Optional`s, let's
use the ctor to verify none are absent. e.g. throw if any is
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]