[
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946925&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946925
]
ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/24 19:44
Start Date: 05/Dec/24 19:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1871870894
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
+ private final WorkforceStaffing actualWorkforceStaffing;
+ /** this holds the current total workforce plan as per latest received
scaling directives */
+ private final WorkforcePlan workforcePlan;
+
+ public DynamicScalingYarnService(Config config, String applicationName,
String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus)
throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+
+ this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
+ this.workforcePlan = new WorkforcePlan(this.config,
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
+ }
+
+ @Override
+ protected void requestInitialContainers() {
+ StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+ requestNewContainersForStaffingDeltas(deltas);
Review Comment:
I believe we're safe given `WorkforcePlan::calcStaffingDeltas` is
thread-safe and `reqNewCon...` is `synchronized`, but that's subtle. for the
sake of maintainers' sanity, please also make this method `synchronized`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ *
+ * This is an abstract class that provides the basic functionality for
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link
ScalingDirectiveSource} that will be used to get scaling directives.
+ *
+ * The actual implemented class needs to be passed as value of config {@link
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends
AbstractIdleService {
+
+ protected final static String DYNAMIC_SCALING_POLLING_INTERVAL =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ protected final Config config;
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+
+ public
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ } else {
+ String errorMsg = "Failure while getting YarnService Instance from
GobblinTemporalApplicationMaster::get_yarnService()"
+ + " YarnService {" +
appMaster.get_yarnService().getClass().getSimpleName() + "} is not an instance
of DynamicScalingYarnService";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
+ Optional.of("DynamicScalingExecutor")));
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
+ DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ log.info("Starting the {} with re-scaling interval of {} seconds",
this.getClass().getSimpleName(), scheduleInterval);
Review Comment:
simple name seems fine here, as this is informational, not for debugging
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
+ private final WorkforceStaffing actualWorkforceStaffing;
+ /** this holds the current total workforce plan as per latest received
scaling directives */
+ private final WorkforcePlan workforcePlan;
+
+ public DynamicScalingYarnService(Config config, String applicationName,
String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus)
throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+
+ this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
+ this.workforcePlan = new WorkforcePlan(this.config,
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
+ }
+
+ @Override
+ protected void requestInitialContainers() {
+ StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+ requestNewContainersForStaffingDeltas(deltas);
+ }
+
+ /**
+ * Revises the workforce plan and requests new containers based on the given
scaling directives.
+ *
+ * @param scalingDirectives the list of scaling directives
+ */
+ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective>
scalingDirectives) {
+ this.workforcePlan.reviseWhenNewer(scalingDirectives);
+ StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+ requestNewContainersForStaffingDeltas(deltas);
+ }
+
+ private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+ deltas.getPerProfileDeltas().forEach(profileDelta -> {
+ if (profileDelta.getDelta() > 0) {
Review Comment:
nit: would comment:
```
if (...) { // scale up!
...
} else if (profileDelta.getDelta() < 0) { // scale down!
...
} // else, already at staffing plan (or at least have requested, so
in-progress)
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -419,41 +398,25 @@ private EventSubmitter buildEventSubmitter() {
.build();
}
- /**
- * Request an allocation of containers. If numTargetContainers is larger
than the max of current and expected number
- * of containers then additional containers are requested.
- * <p>
- * If numTargetContainers is less than the current number of allocated
containers then release free containers.
- * Shrinking is relative to the number of currently allocated containers
since it takes time for containers
- * to be allocated and assigned work and we want to avoid releasing a
container prematurely before it is assigned
- * work. This means that a container may not be released even though
numTargetContainers is less than the requested
- * number of containers. The intended usage is for the caller of this method
to make periodic calls to attempt to
- * adjust the cluster towards the desired number of containers.
- *
- * @param inUseInstances a set of in use instances
- * @return whether successfully requested the target number of containers
- */
- public synchronized boolean requestTargetNumberOfContainers(int
numContainers, Set<String> inUseInstances) {
- int defaultContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
- int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.
CONTAINER_CORES_KEY);
-
- LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances
count is {}, container map size is {}",
- numContainers, inUseInstances.size(), this.containerMap.size());
-
- requestContainers(numContainers,
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
- LOGGER.info("Current tag-container desired count:{}, tag-container
allocated: {}", numContainers, this.allocatedContainerCountMap);
- return true;
+ /** unless overridden to actually scale, "initial" containers may be the
app's *only* containers! */
+ protected void requestInitialContainers() {
+ WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config);
+ int numContainers =
baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
Review Comment:
use it directly:
```
this.config.getInt(...)
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
Review Comment:
*already* requested
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java:
##########
@@ -18,12 +18,19 @@
package org.apache.gobblin.temporal.dynamic;
import com.typesafe.config.Config;
+import lombok.AllArgsConstructor;
import lombok.Data;
/** A named worker {@link Config} */
@Data
+@AllArgsConstructor
public class WorkerProfile {
private final String name;
private final Config config;
+
+ public WorkerProfile(Config config) {
+ this.name = WorkforceProfiles.BASELINE_NAME;
+ this.config = config;
Review Comment:
nit:
```
this(WorkforceProfiles.BASELINE_NAME, config)
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -485,8 +448,11 @@ private void requestContainer(Optional<String>
preferredNode, Resource resource)
priority.setPriority(priorityNum);
String[] preferredNodes = preferredNode.isPresent() ? new String[]
{preferredNode.get()} : null;
+
+ long allocationRequestId = optAllocationRequestId.or(0L);
Review Comment:
rather than this magic number, name the constant
`DEFAULT_ALLOCATION_REQUEST_ID` and then use that to also initialize the
`allocationRequestIdGenerator`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.Optional;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link FsScalingDirectiveSource} based implementation of {@link
AbstractDynamicScalingYarnServiceManager}.
+ */
+public class FsSourceDynamicScalingYarnServiceManager extends
AbstractDynamicScalingYarnServiceManager {
+ public final static String DYNAMIC_SCALING_DIRECTIVES_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
+ public final static String DYNAMIC_SCALING_ERRORS_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
Review Comment:
these are fine for now, but I suspect we'll move to basing these directories
from the `JobStateUtils.getWorkDirRoot`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -419,41 +398,25 @@ private EventSubmitter buildEventSubmitter() {
.build();
}
- /**
- * Request an allocation of containers. If numTargetContainers is larger
than the max of current and expected number
- * of containers then additional containers are requested.
- * <p>
- * If numTargetContainers is less than the current number of allocated
containers then release free containers.
- * Shrinking is relative to the number of currently allocated containers
since it takes time for containers
- * to be allocated and assigned work and we want to avoid releasing a
container prematurely before it is assigned
- * work. This means that a container may not be released even though
numTargetContainers is less than the requested
- * number of containers. The intended usage is for the caller of this method
to make periodic calls to attempt to
- * adjust the cluster towards the desired number of containers.
- *
- * @param inUseInstances a set of in use instances
- * @return whether successfully requested the target number of containers
- */
- public synchronized boolean requestTargetNumberOfContainers(int
numContainers, Set<String> inUseInstances) {
- int defaultContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
- int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.
CONTAINER_CORES_KEY);
-
- LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances
count is {}, container map size is {}",
- numContainers, inUseInstances.size(), this.containerMap.size());
-
- requestContainers(numContainers,
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
- LOGGER.info("Current tag-container desired count:{}, tag-container
allocated: {}", numContainers, this.allocatedContainerCountMap);
- return true;
+ /** unless overridden to actually scale, "initial" containers may be the
app's *only* containers! */
Review Comment:
great comment! consider: "may be" => "will be"
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -590,15 +556,44 @@ protected ByteBuffer getSecurityTokens() throws
IOException {
@VisibleForTesting
protected String buildContainerCommand(Container container, String
helixParticipantId, String helixInstanceTag) {
+ long allocationRequestId = container.getAllocationRequestId();
+ // Using getOrDefault for backward-compatibility with containers that
don't have allocationRequestId set
+ WorkerProfile workerProfile =
this.workerProfileByAllocationRequestId.getOrDefault(allocationRequestId,
+ this.defaultWorkerProfile);
Review Comment:
I see value in recovering, so the app continues onward. but not having an
associated worker profile really is a bug! let's not silently resolve, but
instead:
```
WorkerProfile workerProfile =
Optional.ofNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId))
.orElseGet(() -> {
log.warn("no WP found for {} ... falling back... ");
return
this.workerProfileByAllocationRequestId.get(DEFAULT_ALLOCATION_REQUEST_ID); //
NOTE: instance member unnecessary!
});
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ *
+ * This is an abstract class that provides the basic functionality for
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link
ScalingDirectiveSource} that will be used to get scaling directives.
+ *
+ * The actual implemented class needs to be passed as value of config {@link
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends
AbstractIdleService {
+
+ protected final static String DYNAMIC_SCALING_POLLING_INTERVAL =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ protected final Config config;
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+
+ public
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ } else {
+ String errorMsg = "Failure while getting YarnService Instance from
GobblinTemporalApplicationMaster::get_yarnService()"
+ + " YarnService {" +
appMaster.get_yarnService().getClass().getSimpleName() + "} is not an instance
of DynamicScalingYarnService";
Review Comment:
always use FQ name, since it's maddeningly frustrating in the 1:100 case of
finding out a name was overloaded in a package we might not even know exists!
Issue Time Tracking
-------------------
Worklog Id: (was: 946925)
Time Spent: 4h 10m (was: 4h)
> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
> Key: GOBBLIN-2174
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-core
> Reporter: Vivek Rai
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 4h 10m
> Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn
> Service needs to be integrated with the dynamic scaling to have fully
> functional dynamic scalable yarn service.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)