phet commented on code in PR #3942:
URL: https://github.com/apache/gobblin/pull/3942#discussion_r1589506419
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -50,31 +52,51 @@
@Alpha
@Slf4j
@Singleton
-public class DagProcessingEngine {
+@AllArgsConstructor
+public class DagProcessingEngine extends AbstractIdleService {
@Getter private final Optional<DagTaskStream> dagTaskStream;
@Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ private final Config config;
+ private final Optional<DagProcFactory> dagProcFactory;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private static final Integer TERMINATION_TIMEOUT = 30;
@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream,
Optional<DagProcFactory> dagProcFactory,
Optional<DagManagementStateStore> dagManagementStateStore) {
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.config = config;
+ this.dagProcFactory = dagProcFactory;
+ log.info("DagProcessingEngine instantiated.");
+ }
+
+ @Override
+ protected void startUp() {
Integer numThreads = ConfigUtils.getInt
(config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY,
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
- ScheduledExecutorService scheduledExecutorPool =
+ this.scheduledExecutorPool =
Executors.newScheduledThreadPool(numThreads,
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("DagProcessingEngineThread")));
- this.dagTaskStream = dagTaskStream;
- this.dagManagementStateStore = dagManagementStateStore;
-
for (int i=0; i < numThreads; i++) {
// todo - set metrics for count of active DagProcEngineThread
DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream.get(), dagProcFactory.get(),
dagManagementStateStore.get());
- scheduledExecutorPool.submit(dagProcEngineThread);
+ this.scheduledExecutorPool.submit(dagProcEngineThread);
+ log.info("DagProcEngineThread " + i + " started.");
Review Comment:
suggest to put logging in the first line of `DPET.run()`. (give the DPETs a
name or numeric ID to enable self-ID)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]