pratapaditya04 commented on code in PR #4159:
URL: https://github.com/apache/gobblin/pull/4159#discussion_r2663823708
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -141,25 +143,35 @@ protected Workload<WorkUnitClaimCheck>
createWorkload(WUProcessingSpec workSpec,
protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f,
Map<String, Object> searchAttributes) {
- ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
+ boolean dynamicScalingEnabled =
config.hasPath(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED)
+ &&
config.getBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED);
+
+ ChildWorkflowOptions.Builder childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
.setSearchAttributes(searchAttributes)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
- WorkerConfig.of(this).orElse(ConfigFactory.empty())))
- .build();
- // TODO: to incorporate multiple different concrete `NestingExecWorkflow`
sub-workflows in the same super-workflow... shall we use queues?!?!?
- return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
config));
+
+ // Route NestingExecWorkflow (work execution) to execution
+ if (dynamicScalingEnabled) {
+
childOpts.setTaskQueue(config.hasPath(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+ ?
config.getString(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+ : GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE);
+ }
+
+ return Workflow.newChildWorkflowStub(NestingExecWorkflow.class,
childOpts.build());
}
protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object>
searchAttributes) {
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
// TODO: verify to instead use: Policy.PARENT_CLOSE_POLICY_TERMINATE)
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.setSearchAttributes(searchAttributes)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
- WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
config))
.build();
+ // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow
parent
Review Comment:
Are we saying that we have only set the task queu for the child
worfklow/activities of ProcessWorkUnitsWorkflow parent. so the CommitStep would
default to the default Task queue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]