[
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=945669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-945669
]
ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 26/Nov/24 09:48
Start Date: 26/Nov/24 09:48
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1858055680
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+ private int count = 0;
Review Comment:
suggest `invocationsCount` or `numInvocations`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+ private int count = 0;
+ private final Optional<ProfileDerivation> derivedFromBaseline;
+ public DummyScalingDirectiveSource() {
+ this.derivedFromBaseline = Optional.of(new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Adding(
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
"2048"),
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+ )
+ ));
+ }
+
+ /**
+ * @return {@link ScalingDirective}s - an impl. may choose to return all
known directives or to give only newer
+ * directives than previously returned
+ */
+ @Override
+ public List<ScalingDirective> getScalingDirectives() {
+ // Note - profile should exist already pr is derived from other profile
+ if (this.count == 0) {
+ this.count++;
+ return Arrays.asList(
+ new ScalingDirective("firstProfile", 3, System.currentTimeMillis(),
this.derivedFromBaseline),
+ new ScalingDirective("secondProfile", 2, System.currentTimeMillis(),
this.derivedFromBaseline)
Review Comment:
for all of these, we must ensure the second of the pair has a later time
than the first. since I don't believe the order of arg execution is defined in
java, would be best to assign currTime and then use it as `currTime + N` the
second time
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -440,7 +430,7 @@ public synchronized boolean
requestTargetNumberOfContainers(int numContainers, S
LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances
count is {}, container map size is {}",
numContainers, inUseInstances.size(), this.containerMap.size());
- requestContainers(numContainers,
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
+ requestContainers(numContainers,
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores),
Optional.absent());
Review Comment:
(WRT the enclosing method...)
does it need to be `public`? also, if it's only used by
`requestInitialContainers` we might name it thus. on the other hand, why does
special case the reading from `config`, rather than doing
`WorkerProfile.getConfig`?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName,
String applicationId,
GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS),
TimeUnit.SECONDS).build();
- this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
- GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
- Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 &&
this.jvmMemoryXmxRatio <= 1,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + "
must be between 0 and 1 inclusive");
-
- this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
- Preconditions.checkArgument(this.jvmMemoryOverheadMbs <
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + "
cannot be more than "
- + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
- + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
this.appViewAcl = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.jarCacheEnabled = ConfigUtils.getBoolean(this.config,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+ // Initialising this baseline worker profile to use as default worker
profile in case allocation request id is not in map
+ this.baselineWorkerProfile = new
WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config);
+
+ // Putting baseline profile in the map for default allocation request id
(0)
+
this.allocationRequestIdToWorkerProfile.put(allocationRequestIdGenerator.getAndIncrement(),
this.baselineWorkerProfile);
Review Comment:
doesn't the `generateAllocationRequestId` method do this very thing? let's
drop the special handling and call that. ignore the return value if it's not
needed... but, about that - shouldn't `requestInitialContainers` be passing
along this allocation req ID (for the baseline profile)?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
+ private final WorkforceStaffing workforceStaffing;
+ /** this holds the current total workforce plan as per latest received
scaling directives */
+ private final WorkforcePlan workforcePlan;
+
+ public DynamicScalingYarnService(Config config, String applicationName,
String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus)
throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+
+ this.workforceStaffing =
WorkforceStaffing.initialize(getInitialContainers());
+ this.workforcePlan = new WorkforcePlan(getConfig(),
getInitialContainers());
Review Comment:
would be great if this were more clearly/explicitly based on the baseline
profile's config
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
+ private final WorkforceStaffing workforceStaffing;
+ /** this holds the current total workforce plan as per latest received
scaling directives */
+ private final WorkforcePlan workforcePlan;
+
+ public DynamicScalingYarnService(Config config, String applicationName,
String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus)
throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+
+ this.workforceStaffing =
WorkforceStaffing.initialize(getInitialContainers());
+ this.workforcePlan = new WorkforcePlan(getConfig(),
getInitialContainers());
+ }
+
+ /**
+ * Revises the workforce plan and requests new containers based on the given
scaling directives.
+ *
+ * @param scalingDirectives the list of scaling directives
+ */
+ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective>
scalingDirectives) {
+ this.workforcePlan.reviseWhenNewer(scalingDirectives);
+ StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.workforceStaffing);
+ requestNewContainersForStaffingDeltas(deltas);
+ }
+
+ private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+ deltas.getPerProfileDeltas().forEach(profileDelta -> {
+ if (profileDelta.getDelta() > 0) {
+ WorkerProfile workerProfile = profileDelta.getProfile();
+ String profileName = workerProfile.getName();
+ int curNumContainers =
this.workforceStaffing.getStaffing(profileName).orElse(0);
Review Comment:
nit: `curNumContainers` => `currNumContainers`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -485,8 +475,11 @@ private void requestContainer(Optional<String>
preferredNode, Resource resource)
priority.setPriority(priorityNum);
String[] preferredNodes = preferredNode.isPresent() ? new String[]
{preferredNode.get()} : null;
+
+ long allocationRequestID = allocationRequestId.or(0L);
Review Comment:
please don't use case to differentiate names. how about:
```
long allocationRequestId = optAllocationRequestId.or(0L);
```
?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+ private int count = 0;
+ private final Optional<ProfileDerivation> derivedFromBaseline;
+ public DummyScalingDirectiveSource() {
+ this.derivedFromBaseline = Optional.of(new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Adding(
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
"2048"),
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+ )
+ ));
+ }
+
+ /**
+ * @return {@link ScalingDirective}s - an impl. may choose to return all
known directives or to give only newer
+ * directives than previously returned
+ */
+ @Override
+ public List<ScalingDirective> getScalingDirectives() {
+ // Note - profile should exist already pr is derived from other profile
+ if (this.count == 0) {
Review Comment:
I realize it's only test code, but best to make thread-safe w/ either
`synchronized` an `AtomicInteger`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -590,15 +583,43 @@ protected ByteBuffer getSecurityTokens() throws
IOException {
@VisibleForTesting
protected String buildContainerCommand(Container container, String
helixParticipantId, String helixInstanceTag) {
+ long allocationRequestID = container.getAllocationRequestId();
Review Comment:
nit: `allocationRequestID` => `allocationRequestId`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -755,6 +776,18 @@ private ImmutableMap.Builder<String, String>
buildContainerStatusEventMetadata(C
return eventMetadataBuilder;
}
+ /**
+ * Generates a unique allocation request ID for the given worker profile and
store the id to profile mapping.
+ *
+ * @param workerProfile the worker profile for which the allocation request
ID is generated
+ * @return the generated allocation request ID
+ */
+ protected long generateAllocationRequestId(WorkerProfile workerProfile) {
Review Comment:
naming-wise, the side-effect of putting into the map seems critical for
maintainers to notice. maybe `generateAndStoreByUniqueAllocationRequestId` or
simply `storeByUniqueAllocationRequestId`?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+ private int count = 0;
+ private final Optional<ProfileDerivation> derivedFromBaseline;
+ public DummyScalingDirectiveSource() {
+ this.derivedFromBaseline = Optional.of(new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Adding(
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
"2048"),
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+ )
+ ));
+ }
+
+ /**
+ * @return {@link ScalingDirective}s - an impl. may choose to return all
known directives or to give only newer
+ * directives than previously returned
+ */
+ @Override
+ public List<ScalingDirective> getScalingDirectives() {
+ // Note - profile should exist already pr is derived from other profile
+ if (this.count == 0) {
+ this.count++;
+ return Arrays.asList(
+ new ScalingDirective("firstProfile", 3, System.currentTimeMillis(),
this.derivedFromBaseline),
+ new ScalingDirective("secondProfile", 2, System.currentTimeMillis(),
this.derivedFromBaseline)
+ );
+ } else if (this.count == 1) {
+ this.count++;
+ return Arrays.asList(
+ new ScalingDirective("firstProfile", 5, System.currentTimeMillis()),
+ new ScalingDirective("secondProfile", 3, System.currentTimeMillis())
+ );
+ } else if (this.count == 2) {
+ this.count++;
+ return Arrays.asList(
+ new ScalingDirective("firstProfile", 5, System.currentTimeMillis()),
+ new ScalingDirective("secondProfile", 3, System.currentTimeMillis())
Review Comment:
just noting this is the same as when `this.count == 1`. is that intended?
maybe worth an overall comment on the "dummy" testing plan
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -200,6 +201,9 @@ class YarnService extends AbstractIdleService {
private volatile boolean shutdownInProgress = false;
private final boolean jarCacheEnabled;
+ private final WorkerProfile baselineWorkerProfile;
+ private final AtomicLong allocationRequestIdGenerator = new AtomicLong(0L);
+ private final ConcurrentMap<Long, WorkerProfile>
allocationRequestIdToWorkerProfile = new ConcurrentHashMap<>();
Review Comment:
this name works and is ok... but, considering two alternatives:
```
WorkerProfile workerProfile =
this.allocationRequestIdToWorkerProfile.get(allocationRequestId);
WorkerProfile workerProfile =
this.workerProfileByAllocationRequestId.get(allocationRequestId);
```
I prefer the second for its focus on the fact that the worker profile (that
we're here to retrieve) is the more important half of the mapping.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName,
String applicationId,
GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS),
TimeUnit.SECONDS).build();
- this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
- GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
- Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 &&
this.jvmMemoryXmxRatio <= 1,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + "
must be between 0 and 1 inclusive");
-
- this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
- Preconditions.checkArgument(this.jvmMemoryOverheadMbs <
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + "
cannot be more than "
- + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
- + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
this.appViewAcl = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.jarCacheEnabled = ConfigUtils.getBoolean(this.config,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+ // Initialising this baseline worker profile to use as default worker
profile in case allocation request id is not in map
+ this.baselineWorkerProfile = new
WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config);
Review Comment:
given you're initializing that here in on the next line of the same ctor,
when would it NOT be in the map? I don't really believe you need the
`getOrDefault` below...
if I'm missing something, and it's needed for b/w-compat when not using
`DynamicScalingYarnService`, that's OK... just leave a comment.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+ private final String DYNAMIC_SCALING_PREFIX =
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+ private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX
+ "directives.dir";
+ private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX +
"errors.dir";
+ private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX
+ "initial.delay";
+ private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
Review Comment:
not sure there's a need to take a separate value. maybe just use
`DYNAMIC_SCALING_POLLING_INTERVAL` for both
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+ private final String DYNAMIC_SCALING_PREFIX =
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+ private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX
+ "directives.dir";
+ private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX +
"errors.dir";
+ private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX
+ "initial.delay";
+ private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+ private final String DYNAMIC_SCALING_POLLING_INTERVAL =
DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ private final Config config;
+ DynamicScalingYarnService dynamicScalingYarnService;
Review Comment:
why package protected - could it be `private`?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+ private final String DYNAMIC_SCALING_PREFIX =
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+ private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX
+ "directives.dir";
+ private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX +
"errors.dir";
+ private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX
+ "initial.delay";
+ private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+ private final String DYNAMIC_SCALING_POLLING_INTERVAL =
DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ private final Config config;
+ DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+ private final FileSystem fs;
+
+ public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
Review Comment:
just to be a bit clearer on failure, let's test `instanceof` and throw a
`RuntimeException` w/ a bit more context for the user, when that's violated
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+ private final String DYNAMIC_SCALING_PREFIX =
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+ private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX
+ "directives.dir";
+ private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX +
"errors.dir";
+ private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX
+ "initial.delay";
+ private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+ private final String DYNAMIC_SCALING_POLLING_INTERVAL =
DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ private final Config config;
+ DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+ private final FileSystem fs;
+
+ public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+ com.google.common.base.Optional.of("DynamicScalingExecutor")));
+ this.fs = appMaster.getFs();
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
+ DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ int initialDelay = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_INITIAL_DELAY,
+ DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS);
+
+ ScalingDirectiveSource fsScalingDirectiveSource = new
FsScalingDirectiveSource(
+ this.fs,
+ this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+ Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+ );
+
+ // TODO: remove this line later
+ // Using for testing purposes only
+ ScalingDirectiveSource scalingDirectiveSource = new
DummyScalingDirectiveSource();
Review Comment:
would it be helpful for unit testing if, rather than hard-coding, this class
took the `ScalingDirectiveSource` FQ class name? I see that could be harder
based on the ctor params.
As a simpler alternative, make `DynamicScalingYarnServiceManger` abstract w/
a method
```
abstract protected ScalingDirectiveSource createScalingDirectiveSource();
```
and then the concrete `FsSourceDynamicScalingYarnServiceManager` would hard
code the `ScalingDirectiveSource` class. you could have a different concrete
`DSYSM` using `DummyScalingDirectiveSource`. one of those such FQ class names
would be a param.
... which reminds me.... how is this `DSYSM` created and initialized at
present?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+ private final String DYNAMIC_SCALING_PREFIX =
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+ private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX
+ "directives.dir";
+ private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX +
"errors.dir";
+ private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX
+ "initial.delay";
+ private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+ private final String DYNAMIC_SCALING_POLLING_INTERVAL =
DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ private final Config config;
+ DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+ private final FileSystem fs;
+
+ public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+ com.google.common.base.Optional.of("DynamicScalingExecutor")));
+ this.fs = appMaster.getFs();
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
+ DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ int initialDelay = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_INITIAL_DELAY,
+ DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS);
+
+ ScalingDirectiveSource fsScalingDirectiveSource = new
FsScalingDirectiveSource(
+ this.fs,
+ this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+ Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+ );
+
+ // TODO: remove this line later
+ // Using for testing purposes only
+ ScalingDirectiveSource scalingDirectiveSource = new
DummyScalingDirectiveSource();
+
+ log.info("Starting the " + this.getClass().getSimpleName());
+ log.info("Scheduling the dynamic scaling task with an interval of {}
seconds", scheduleInterval);
+
+ this.dynamicScalingExecutor.scheduleAtFixedRate(
+ new GetScalingDirectivesRunnable(this.dynamicScalingYarnService,
scalingDirectiveSource),
+ initialDelay, scheduleInterval, TimeUnit.SECONDS
+ );
+ }
+
+ @Override
+ protected void shutDown() {
+ log.info("Stopping the " + this.getClass().getSimpleName());
+ ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor,
com.google.common.base.Optional.of(log));
+ }
+
+ /**
+ * A {@link Runnable} that gets the scaling directives from the {@link
ScalingDirectiveSource} and passes them to the
+ * {@link DynamicScalingYarnService} for processing.
+ */
+ @AllArgsConstructor
+ static class GetScalingDirectivesRunnable implements Runnable {
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScalingDirectiveSource scalingDirectiveSource;
+
+ @Override
+ public void run() {
+ try {
+ List<ScalingDirective> scalingDirectives =
scalingDirectiveSource.getScalingDirectives();
+ if (!scalingDirectives.isEmpty()) {
+
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
+ }
+ } catch (IOException e) {
+ log.error("Failed to get scaling directives", e);
+ }
Review Comment:
I suggest a catch `Throwable`, since this runs in an executor and no body
ever checks whether the `Runnable` threw, which means the exception might
silently transpire w/ no notice ever logged
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
+ private final WorkforceStaffing workforceStaffing;
+ /** this holds the current total workforce plan as per latest received
scaling directives */
+ private final WorkforcePlan workforcePlan;
+
+ public DynamicScalingYarnService(Config config, String applicationName,
String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus)
throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+
+ this.workforceStaffing =
WorkforceStaffing.initialize(getInitialContainers());
+ this.workforcePlan = new WorkforcePlan(getConfig(),
getInitialContainers());
+ }
+
+ /**
+ * Revises the workforce plan and requests new containers based on the given
scaling directives.
+ *
+ * @param scalingDirectives the list of scaling directives
+ */
+ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective>
scalingDirectives) {
+ this.workforcePlan.reviseWhenNewer(scalingDirectives);
+ StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.workforceStaffing);
+ requestNewContainersForStaffingDeltas(deltas);
+ }
+
+ private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+ deltas.getPerProfileDeltas().forEach(profileDelta -> {
+ if (profileDelta.getDelta() > 0) {
+ WorkerProfile workerProfile = profileDelta.getProfile();
+ String profileName = workerProfile.getName();
+ int curNumContainers =
this.workforceStaffing.getStaffing(profileName).orElse(0);
+ int delta = profileDelta.getDelta();
+ log.info("Requesting {} new containers for profile {} having currently
{} containers", delta,
+ profileName, curNumContainers);
+ requestContainersForWorkerProfile(workerProfile, delta);
+ // update our staffing after requesting new containers
+ this.workforceStaffing.reviseStaffing(profileName, curNumContainers +
delta, System.currentTimeMillis());
+ }
+ // TODO: Decide how to handle negative deltas
Review Comment:
will you add this before merge? if not, let's at least log when it's
observed!
Issue Time Tracking
-------------------
Worklog Id: (was: 945669)
Remaining Estimate: 0h
Time Spent: 10m
> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
> Key: GOBBLIN-2174
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-core
> Reporter: Vivek Rai
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn
> Service needs to be integrated with the dynamic scaling to have fully
> functional dynamic scalable yarn service.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)