Copilot commented on code in PR #4159:
URL: https://github.com/apache/gobblin/pull/4159#discussion_r2622835471


##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/WorkflowStageTest.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/**
+ * Tests for {@link WorkflowStage} to verify task queue configuration
+ * for different workflow stages.
+ */
+public class WorkflowStageTest {
+
+  /**
+   * Tests that WORK_EXECUTION stage uses execution task queue from config.
+   */
+  @Test
+  public void testWorkExecutionStageUsesExecutionQueue() {
+    // Setup
+    String customExecutionQueue = "CustomExecutionQueue";
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE,
+            ConfigValueFactory.fromAnyRef(customExecutionQueue));
+
+    // Execute
+    String taskQueue = WorkflowStage.WORK_EXECUTION.getTaskQueue(config);
+
+    // Verify
+    Assert.assertEquals(taskQueue, customExecutionQueue,
+        "WORK_EXECUTION should use configured execution queue");
+  }
+
+  /**
+   * Tests that WORK_EXECUTION stage falls back to default execution queue.
+   */
+  @Test
+  public void testWorkExecutionStageUsesDefaultQueue() {
+    // Setup - empty config
+    Config config = ConfigFactory.empty();
+
+    // Execute
+    String taskQueue = WorkflowStage.WORK_EXECUTION.getTaskQueue(config);
+
+    // Verify
+    Assert.assertEquals(taskQueue, 
GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE,
+        "WORK_EXECUTION should use default execution queue when not 
configured");
+  }
+
+  /**
+   * Tests that WORK_DISCOVERY stage uses default task queue.
+   */
+  @Test
+  public void testWorkDiscoveryStageUsesDefaultQueue() {
+    // Setup
+    Config config = ConfigFactory.empty();
+
+    // Execute
+    String taskQueue = WorkflowStage.WORK_DISCOVERY.getTaskQueue(config);
+
+    // Verify
+    Assert.assertEquals(taskQueue, 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE,
+        "WORK_DISCOVERY should use default task queue");
+  }
+
+  /**
+   * Tests that WORK_COMMIT stage uses default task queue.
+   */
+  @Test
+  public void testWorkCommitStageUsesDefaultQueue() {
+    // Setup
+    Config config = ConfigFactory.empty();
+
+    // Execute
+    String taskQueue = WorkflowStage.COMMIT.getTaskQueue(config);
+
+    // Verify
+    Assert.assertEquals(taskQueue, 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE,
+        "WORK_COMMIT should use default task queue");

Review Comment:
   The comment mentions "WORK_COMMIT" but the actual enum value being tested is 
"COMMIT". Update the comment to match the enum value for consistency.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.worker;
+
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.WorkerOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Specialized worker for Work Execution stage.
+ * This worker only registers activities for:
+ * - ProcessWorkUnit (Work Execution)
+ *
+ * Runs on containers with stage-specific memory for work execution operations.
+ * Polls the execution task queue to ensure activities run on 
appropriately-sized containers.
+ */
+public class ExecutionWorker extends AbstractTemporalWorker {
+    public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120;
+    public int maxExecutionConcurrency;

Review Comment:
   The field maxExecutionConcurrency should be declared as private or have 
documentation explaining why it needs package-private visibility. Consider 
making it private if external access is not required, or document the reason 
for package-private visibility if it's intentional for testing purposes.
   ```suggestion
       private int maxExecutionConcurrency;
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java:
##########
@@ -72,6 +76,28 @@ protected String calcProfileDerivationName(JobState 
jobState) {
   }
 
   protected String calcBasisProfileName(JobState jobState) {
-    return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
+    // Always derive from the global baseline
+    return WorkforceProfiles.BASELINE_NAME;
   }
+
+  private ProfileOverlay createExecutionWorkerOverlay(JobState jobState) {
+    List<ProfileOverlay.KVPair> overlayPairs = new java.util.ArrayList<>();
+
+    // Add execution-specific memory if configured (overrides baseline memory)
+    if 
(jobState.contains(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)) {
+      overlayPairs.add(new ProfileOverlay.KVPair(
+          GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
+          
jobState.getProp(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)
+      ));
+    }
+
+    // Add ExecutionWorker class to ensure correct task queue routing
+    overlayPairs.add(new ProfileOverlay.KVPair(
+        GobblinTemporalConfigurationKeys.WORKER_CLASS,
+        GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS
+    ));
+
+    return overlayPairs.isEmpty() ? ProfileOverlay.unchanged() : new 
ProfileOverlay.Adding(overlayPairs);

Review Comment:
   The check for overlayPairs.isEmpty() on line 100 will always be false 
because the ExecutionWorker class is unconditionally added to overlayPairs on 
line 95. This means ProfileOverlay.unchanged() will never be returned. Consider 
removing this check or restructuring the logic since overlayPairs will always 
contain at least one element.
   ```suggestion
       return new ProfileOverlay.Adding(overlayPairs);
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java:
##########
@@ -134,4 +141,5 @@ public interface GobblinTemporalConfigurationKeys {
   String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
   int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;
 

Review Comment:
   Add a documentation comment for the WORK_EXECUTION_MEMORY_MB configuration 
key to explain its purpose and usage, similar to other configuration keys in 
this interface. The comment should clarify that this is the memory allocation 
in megabytes for execution worker containers when dynamic scaling is enabled.
   ```suggestion
   
     /**
      * Memory allocation (in megabytes) for execution worker containers when 
dynamic scaling is enabled.
      * This value determines the amount of memory assigned to each worker 
container during execution.
      */
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -72,6 +72,19 @@ public ActivityOptions buildActivityOptions(Properties 
props, boolean setHeartbe
         .build();
   }
 
+  public ActivityOptions buildActivityOptions(Properties props, boolean 
setHeartbeatTimeout, String taskQueue) {
+    ActivityOptions.Builder builder = ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(props))
+        .setRetryOptions(buildRetryOptions(props))
+        .setTaskQueue(taskQueue);
+
+    if (setHeartbeatTimeout) {
+      builder.setHeartbeatTimeout(getHeartbeatTimeout(props));
+    }
+
+    return builder.build();
+  }
+

Review Comment:
   The newly added buildActivityOptions method with taskQueue parameter is not 
used anywhere in the codebase. If this method is intended for future use, 
consider documenting this in a code comment. Otherwise, consider removing it to 
avoid maintaining unused code. Alternatively, if this should be used for 
routing activities to the execution queue when dynamic scaling is enabled, the 
implementation may be incomplete.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to