phet commented on code in PR #4077: URL: https://github.com/apache/gobblin/pull/4077#discussion_r1858055680
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; Review Comment: suggest `invocationsCount` or `numInvocations` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; + private final Optional<ProfileDerivation> derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer + * directives than previously returned + */ + @Override + public List<ScalingDirective> getScalingDirectives() { + // Note - profile should exist already pr is derived from other profile + if (this.count == 0) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 3, System.currentTimeMillis(), this.derivedFromBaseline), + new ScalingDirective("secondProfile", 2, System.currentTimeMillis(), this.derivedFromBaseline) Review Comment: for all of these, we must ensure the second of the pair has a later time than the first. since I don't believe the order of arg execution is defined in java, would be best to assign currTime and then use it as `currTime + N` the second time ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -440,7 +430,7 @@ public synchronized boolean requestTargetNumberOfContainers(int numContainers, S LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}", numContainers, inUseInstances.size(), this.containerMap.size()); - requestContainers(numContainers, Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores)); + requestContainers(numContainers, Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores), Optional.absent()); Review Comment: (WRT the enclosing method...) does it need to be `public`? also, if it's only used by `requestInitialContainers` we might name it thus. on the other hand, why does special case the reading from `config`, rather than doing `WorkerProfile.getConfig`? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build(); - this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO); - - Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); - - this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); - - Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " - + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " - + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); - this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); + + // Initialising this baseline worker profile to use as default worker profile in case allocation request id is not in map + this.baselineWorkerProfile = new WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config); + + // Putting baseline profile in the map for default allocation request id (0) + this.allocationRequestIdToWorkerProfile.put(allocationRequestIdGenerator.getAndIncrement(), this.baselineWorkerProfile); Review Comment: doesn't the `generateAllocationRequestId` method do this very thing? let's drop the special handling and call that. ignore the return value if it's not needed... but, about that - shouldn't `requestInitialContainers` be passing along this allocation req ID (for the baseline profile)? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers()); + this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers()); Review Comment: would be great if this were more clearly/explicitly based on the baseline profile's config ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers()); + this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers()); + } + + /** + * Revises the workforce plan and requests new containers based on the given scaling directives. + * + * @param scalingDirectives the list of scaling directives + */ + public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) { + this.workforcePlan.reviseWhenNewer(scalingDirectives); + StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.workforceStaffing); + requestNewContainersForStaffingDeltas(deltas); + } + + private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { + deltas.getPerProfileDeltas().forEach(profileDelta -> { + if (profileDelta.getDelta() > 0) { + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int curNumContainers = this.workforceStaffing.getStaffing(profileName).orElse(0); Review Comment: nit: `curNumContainers` => `currNumContainers` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -485,8 +475,11 @@ private void requestContainer(Optional<String> preferredNode, Resource resource) priority.setPriority(priorityNum); String[] preferredNodes = preferredNode.isPresent() ? new String[] {preferredNode.get()} : null; + + long allocationRequestID = allocationRequestId.or(0L); Review Comment: please don't use case to differentiate names. how about: ``` long allocationRequestId = optAllocationRequestId.or(0L); ``` ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; + private final Optional<ProfileDerivation> derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer + * directives than previously returned + */ + @Override + public List<ScalingDirective> getScalingDirectives() { + // Note - profile should exist already pr is derived from other profile + if (this.count == 0) { Review Comment: I realize it's only test code, but best to make thread-safe w/ either `synchronized` an `AtomicInteger` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -590,15 +583,43 @@ protected ByteBuffer getSecurityTokens() throws IOException { @VisibleForTesting protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) { + long allocationRequestID = container.getAllocationRequestId(); Review Comment: nit: `allocationRequestID` => `allocationRequestId` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -755,6 +776,18 @@ private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(C return eventMetadataBuilder; } + /** + * Generates a unique allocation request ID for the given worker profile and store the id to profile mapping. + * + * @param workerProfile the worker profile for which the allocation request ID is generated + * @return the generated allocation request ID + */ + protected long generateAllocationRequestId(WorkerProfile workerProfile) { Review Comment: naming-wise, the side-effect of putting into the map seems critical for maintainers to notice. maybe `generateAndStoreByUniqueAllocationRequestId` or simply `storeByUniqueAllocationRequestId`? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; + private final Optional<ProfileDerivation> derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer + * directives than previously returned + */ + @Override + public List<ScalingDirective> getScalingDirectives() { + // Note - profile should exist already pr is derived from other profile + if (this.count == 0) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 3, System.currentTimeMillis(), this.derivedFromBaseline), + new ScalingDirective("secondProfile", 2, System.currentTimeMillis(), this.derivedFromBaseline) + ); + } else if (this.count == 1) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 5, System.currentTimeMillis()), + new ScalingDirective("secondProfile", 3, System.currentTimeMillis()) + ); + } else if (this.count == 2) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 5, System.currentTimeMillis()), + new ScalingDirective("secondProfile", 3, System.currentTimeMillis()) Review Comment: just noting this is the same as when `this.count == 1`. is that intended? maybe worth an overall comment on the "dummy" testing plan ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -200,6 +201,9 @@ class YarnService extends AbstractIdleService { private volatile boolean shutdownInProgress = false; private final boolean jarCacheEnabled; + private final WorkerProfile baselineWorkerProfile; + private final AtomicLong allocationRequestIdGenerator = new AtomicLong(0L); + private final ConcurrentMap<Long, WorkerProfile> allocationRequestIdToWorkerProfile = new ConcurrentHashMap<>(); Review Comment: this name works and is ok... but, considering two alternatives: ``` WorkerProfile workerProfile = this.allocationRequestIdToWorkerProfile.get(allocationRequestId); WorkerProfile workerProfile = this.workerProfileByAllocationRequestId.get(allocationRequestId); ``` I prefer the second for its focus on the fact that the worker profile (that we're here to retrieve) is the more important half of the mapping. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build(); - this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO); - - Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); - - this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); - - Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " - + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " - + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); - this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); + + // Initialising this baseline worker profile to use as default worker profile in case allocation request id is not in map + this.baselineWorkerProfile = new WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config); Review Comment: given you're initializing that here in on the next line of the same ctor, when would it NOT be in the map? I don't really believe you need the `getOrDefault` below... if I'm missing something, and it's needed for b/w-compat when not using `DynamicScalingYarnService`, that's OK... just leave a comment. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; Review Comment: not sure there's a need to take a separate value. maybe just use `DYNAMIC_SCALING_POLLING_INTERVAL` for both ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; Review Comment: why package protected - could it be `private`? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); Review Comment: just to be a bit clearer on failure, let's test `instanceof` and throw a `RuntimeException` w/ a bit more context for the user, when that's violated ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("DynamicScalingExecutor"))); + this.fs = appMaster.getFs(); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + int initialDelay = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_INITIAL_DELAY, + DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS); + + ScalingDirectiveSource fsScalingDirectiveSource = new FsScalingDirectiveSource( + this.fs, + this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), + Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + ); + + // TODO: remove this line later + // Using for testing purposes only + ScalingDirectiveSource scalingDirectiveSource = new DummyScalingDirectiveSource(); Review Comment: would it be helpful for unit testing if, rather than hard-coding, this class took the `ScalingDirectiveSource` FQ class name? I see that could be harder based on the ctor params. As a simpler alternative, make `DynamicScalingYarnServiceManger` abstract w/ a method ``` abstract protected ScalingDirectiveSource createScalingDirectiveSource(); ``` and then the concrete `FsSourceDynamicScalingYarnServiceManager` would hard code the `ScalingDirectiveSource` class. you could have a different concrete `DSYSM` using `DummyScalingDirectiveSource`. one of those such FQ class names would be a param. ... which reminds me.... how is this `DSYSM` created and initialized at present? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("DynamicScalingExecutor"))); + this.fs = appMaster.getFs(); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + int initialDelay = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_INITIAL_DELAY, + DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS); + + ScalingDirectiveSource fsScalingDirectiveSource = new FsScalingDirectiveSource( + this.fs, + this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), + Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + ); + + // TODO: remove this line later + // Using for testing purposes only + ScalingDirectiveSource scalingDirectiveSource = new DummyScalingDirectiveSource(); + + log.info("Starting the " + this.getClass().getSimpleName()); + log.info("Scheduling the dynamic scaling task with an interval of {} seconds", scheduleInterval); + + this.dynamicScalingExecutor.scheduleAtFixedRate( + new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, scalingDirectiveSource), + initialDelay, scheduleInterval, TimeUnit.SECONDS + ); + } + + @Override + protected void shutDown() { + log.info("Stopping the " + this.getClass().getSimpleName()); + ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, com.google.common.base.Optional.of(log)); + } + + /** + * A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the + * {@link DynamicScalingYarnService} for processing. + */ + @AllArgsConstructor + static class GetScalingDirectivesRunnable implements Runnable { + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScalingDirectiveSource scalingDirectiveSource; + + @Override + public void run() { + try { + List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives(); + if (!scalingDirectives.isEmpty()) { + dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); + } + } catch (IOException e) { + log.error("Failed to get scaling directives", e); + } Review Comment: I suggest a catch `Throwable`, since this runs in an executor and no body ever checks whether the `Runnable` threw, which means the exception might silently transpire w/ no notice ever logged ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers()); + this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers()); + } + + /** + * Revises the workforce plan and requests new containers based on the given scaling directives. + * + * @param scalingDirectives the list of scaling directives + */ + public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) { + this.workforcePlan.reviseWhenNewer(scalingDirectives); + StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.workforceStaffing); + requestNewContainersForStaffingDeltas(deltas); + } + + private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { + deltas.getPerProfileDeltas().forEach(profileDelta -> { + if (profileDelta.getDelta() > 0) { + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int curNumContainers = this.workforceStaffing.getStaffing(profileName).orElse(0); + int delta = profileDelta.getDelta(); + log.info("Requesting {} new containers for profile {} having currently {} containers", delta, + profileName, curNumContainers); + requestContainersForWorkerProfile(workerProfile, delta); + // update our staffing after requesting new containers + this.workforceStaffing.reviseStaffing(profileName, curNumContainers + delta, System.currentTimeMillis()); + } + // TODO: Decide how to handle negative deltas Review Comment: will you add this before merge? if not, let's at least log when it's observed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
