agam-99 commented on code in PR #4159: URL: https://github.com/apache/gobblin/pull/4159#discussion_r2670896738
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.worker; + +import java.util.concurrent.TimeUnit; + +import com.typesafe.config.Config; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.WorkerOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Specialized worker for Work Execution stage. + * This worker only registers activities for: + * - ProcessWorkUnit (Work Execution) + * + * Runs on containers with stage-specific memory for work execution operations. + * Polls the execution task queue to ensure activities run on appropriately-sized containers. + */ +public class ExecutionWorker extends AbstractTemporalWorker { + public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; + public int maxExecutionConcurrency; Review Comment: updated ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java: ########## @@ -263,6 +264,38 @@ private TemporalWorker initiateWorker() throws Exception { return worker; } + private void initializeExecutionWorkers() throws Exception { + boolean dynamicScalingEnabled = ConfigUtils.getBoolean(clusterConfig, + GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false); + + if (!dynamicScalingEnabled) { + return; + } + + String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, + GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); + boolean isExecutionWorkerContainer = GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS.equals(workerClassName); + + // only the initial container (WorkFulfillment worker) should start an additional ExecutionWorker worker + if (isExecutionWorkerContainer) { + return; + } + + logger.info("Starting additional ExecutionWorker in initial container"); + + String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE, + GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); + WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance( + managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace); + + TemporalWorker executionWorker = GobblinConstructorUtils.invokeLongestConstructor( + (Class<TemporalWorker>)Class.forName(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS), + clusterConfig, client); + executionWorker.start(); + workers.add(executionWorker); + logger.info("Worker started for class: {}", GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS); + } Review Comment: yes ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -141,25 +143,35 @@ protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec workSpec, protected NestingExecWorkflow<WorkUnitClaimCheck> createProcessingWorkflow(FileSystemJobStateful f, Map<String, Object> searchAttributes) { - ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() + Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty()); + boolean dynamicScalingEnabled = config.hasPath(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED) + && config.getBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED); + + ChildWorkflowOptions.Builder childOpts = ChildWorkflowOptions.newBuilder() .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE) .setSearchAttributes(searchAttributes) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, - WorkerConfig.of(this).orElse(ConfigFactory.empty()))) - .build(); - // TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!? - return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); + .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, config)); + + // Route NestingExecWorkflow (work execution) to execution + if (dynamicScalingEnabled) { + childOpts.setTaskQueue(config.hasPath(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE) + ? config.getString(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE) + : GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE); + } + + return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts.build()); } protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object> searchAttributes) { + Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty()); ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() // TODO: verify to instead use: Policy.PARENT_CLOSE_POLICY_TERMINATE) .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) .setSearchAttributes(searchAttributes) - .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE, - WorkerConfig.of(this).orElse(ConfigFactory.empty()))) + .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE, config)) .build(); + // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow parent Review Comment: yes correct ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.worker; + +import java.util.concurrent.TimeUnit; + +import com.typesafe.config.Config; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.WorkerOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Specialized worker for Work Execution stage. + * This worker only registers activities for: + * - ProcessWorkUnit (Work Execution) + * + * Runs on containers with stage-specific memory for work execution operations. + * Polls the execution task queue to ensure activities run on appropriately-sized containers. + */ +public class ExecutionWorker extends AbstractTemporalWorker { + public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; + public int maxExecutionConcurrency; + + public ExecutionWorker(Config config, WorkflowClient workflowClient) { + super(config, workflowClient); + this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, Review Comment: yeah probably we can in another PR ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.worker; + +import java.util.concurrent.TimeUnit; + +import com.typesafe.config.Config; + +import io.temporal.client.WorkflowClient; +import io.temporal.worker.WorkerOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Specialized worker for Work Execution stage. + * This worker only registers activities for: + * - ProcessWorkUnit (Work Execution) + * + * Runs on containers with stage-specific memory for work execution operations. + * Polls the execution task queue to ensure activities run on appropriately-sized containers. + */ +public class ExecutionWorker extends AbstractTemporalWorker { + public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; + public int maxExecutionConcurrency; + + public ExecutionWorker(Config config, WorkflowClient workflowClient) { + super(config, workflowClient); + this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER); + } + + @Override + protected Class<?>[] getWorkflowImplClasses() { + return new Class[] { + ProcessWorkUnitsWorkflowImpl.class, + NestingExecOfProcessWorkUnitWorkflowImpl.class + }; + } + + @Override + protected Object[] getActivityImplInstances() { + return new Object[] { + new ProcessWorkUnitImpl() + }; + } + + @Override + protected WorkerOptions createWorkerOptions() { + return WorkerOptions.newBuilder() + .setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS)) + .setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency) Review Comment: yeah good point - I am not sure about this, maybe this can be discussed and handled in another PR. This PR only handles independent memory configuration and other configs remain same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
