pratapaditya04 commented on code in PR #4159:
URL: https://github.com/apache/gobblin/pull/4159#discussion_r2663823708


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -141,25 +143,35 @@ protected Workload<WorkUnitClaimCheck> 
createWorkload(WUProcessingSpec workSpec,
 
   protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f,
       Map<String, Object> searchAttributes) {
-    ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+    Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
+    boolean dynamicScalingEnabled = 
config.hasPath(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED)
+        && 
config.getBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED);
+
+    ChildWorkflowOptions.Builder childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         .setSearchAttributes(searchAttributes)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
-            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
-        .build();
-    // TODO: to incorporate multiple different concrete `NestingExecWorkflow` 
sub-workflows in the same super-workflow... shall we use queues?!?!?
-    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, 
config));
+
+    // Route NestingExecWorkflow (work execution) to execution
+    if (dynamicScalingEnabled) {
+      
childOpts.setTaskQueue(config.hasPath(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+          ? 
config.getString(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+          : GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE);
+    }
+
+    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, 
childOpts.build());
   }
 
   protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object> 
searchAttributes) {
+    Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         // TODO: verify to instead use:  Policy.PARENT_CLOSE_POLICY_TERMINATE)
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
         .setSearchAttributes(searchAttributes)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
-            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
 config))
         .build();
 
+    // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow 
parent

Review Comment:
   Are we saying that we have only set the task queu for the child 
worfklow/activities of ProcessWorkUnitsWorkflow parent. so the CommitStep would 
default to the default Task queue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to