pratapaditya04 commented on code in PR #4159:
URL: https://github.com/apache/gobblin/pull/4159#discussion_r2663794057


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -141,25 +143,35 @@ protected Workload<WorkUnitClaimCheck> 
createWorkload(WUProcessingSpec workSpec,
 
   protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f,
       Map<String, Object> searchAttributes) {
-    ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+    Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
+    boolean dynamicScalingEnabled = 
config.hasPath(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED)
+        && 
config.getBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED);
+
+    ChildWorkflowOptions.Builder childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         .setSearchAttributes(searchAttributes)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
-            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
-        .build();
-    // TODO: to incorporate multiple different concrete `NestingExecWorkflow` 
sub-workflows in the same super-workflow... shall we use queues?!?!?
-    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, 
config));
+
+    // Route NestingExecWorkflow (work execution) to execution
+    if (dynamicScalingEnabled) {
+      
childOpts.setTaskQueue(config.hasPath(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+          ? 
config.getString(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+          : GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE);
+    }
+
+    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, 
childOpts.build());
   }
 
   protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object> 
searchAttributes) {
+    Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         // TODO: verify to instead use:  Policy.PARENT_CLOSE_POLICY_TERMINATE)
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
         .setSearchAttributes(searchAttributes)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
-            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
 config))
         .build();
 
+    // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow 
parent

Review Comment:
   Where are we setting default queue for CommitActivity? We don't want commit 
activity in the process queue right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to