[
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946768&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946768
]
ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/24 00:18
Start Date: 05/Dec/24 00:18
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1870177782
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ *
+ * This is an abstract class that provides the basic functionality for
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends
AbstractIdleService {
+
+ protected final static String DYNAMIC_SCALING_POLLING_INTERVAL =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ protected final Config config;
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+
+ public
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ } else {
+ String errorMsg = "Failure while getting YarnService Instance from
GobblinTemporalApplicationMaster::get_yarnService()"
+ + " YarnService is not an instance of DynamicScalingYarnService";
+ log.error(errorMsg);
Review Comment:
be sure to say what the *actual* instance is
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ *
+ * This is an abstract class that provides the basic functionality for
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends
AbstractIdleService {
+
+ protected final static String DYNAMIC_SCALING_POLLING_INTERVAL =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ protected final Config config;
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+
+ public
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ } else {
+ String errorMsg = "Failure while getting YarnService Instance from
GobblinTemporalApplicationMaster::get_yarnService()"
+ + " YarnService is not an instance of DynamicScalingYarnService";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
+ Optional.of("DynamicScalingExecutor")));
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
+ DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ log.info("Starting the " + this.getClass().getSimpleName());
+ log.info("Scheduling the dynamic scaling task with an interval of {}
seconds", scheduleInterval);
Review Comment:
nit: combine these into one log msg:
```
"Starting the {} with re-scaling interval of {} seconds", ....
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
+ private final WorkforceStaffing workforceStaffing;
Review Comment:
for clarity, suggest to name along the lines of `actualWorkforceStaffing`
(whereas the one internal to `WorkforcePlan` is planned/intended)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -803,24 +810,36 @@ public void onContainersAllocated(List<Container>
containers) {
// Find matching requests and remove the request (YARN-660). We the
scheduler are responsible
// for cleaning up requests after allocation based on the design in
the described ticket.
// YARN does not have a delta request API and the requests are not
cleaned up automatically.
- // Try finding a match first with the host as the resource name then
fall back to any resource match.
+ // Try finding a match first with requestAllocationId (which should
always be the case) then fall back to
+ // finding a match with the host as the resource name which then will
fall back to any resource match.
// Also see YARN-1902. Container count will explode without this logic
for removing container requests.
- List<? extends Collection<AMRMClient.ContainerRequest>>
matchingRequests = amrmClientAsync
- .getMatchingRequests(container.getPriority(),
container.getNodeHttpAddress(), container.getResource());
+ Collection<AMRMClient.ContainerRequest>
matchingRequestsByAllocationRequestId =
amrmClientAsync.getMatchingRequests(container.getAllocationRequestId());
+ if (!matchingRequestsByAllocationRequestId.isEmpty()) {
+ AMRMClient.ContainerRequest firstMatchingContainerRequest =
matchingRequestsByAllocationRequestId.iterator().next();
+ LOGGER.debug("Found matching requests {}, removing first matching
request {}",
Review Comment:
esp. until we are comfortably using in prod, these may better be `.info` log
level. there may be a lot of repetitive logging, when we scale many machines,
but that's still O(100). often the number will be much smaller. either way,
such diagnostic info may prove invaluable :)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+ private final AtomicInteger numInvocations = new AtomicInteger(0);
+ private final Optional<ProfileDerivation> derivedFromBaseline;
+ public DummyScalingDirectiveSource() {
+ this.derivedFromBaseline = Optional.of(new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Adding(
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
"2048"),
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+ )
+ ));
+ }
+
+ /**
+ * @return - A fixed set of {@link ScalingDirective}s corresponding to the
invocation number.
+ */
+ @Override
+ public List<ScalingDirective> getScalingDirectives() {
+ // Note - profile should exist already or is derived from other profile
+ if (this.numInvocations.get() == 0) {
+ this.numInvocations.getAndIncrement();
Review Comment:
nit: `incrementAndGet()` once at the top and locally save aside the value to
use in these `if/else`s
```
long currentTime = System.currentTimeMillis();
```
could also be DRY'd up
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+ @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+ @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+ @Mock private GobblinTemporalApplicationMaster
mockGobblinTemporalApplicationMaster;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ // Using 1 second as polling interval so that the test runs faster and
+ // GetScalingDirectivesRunnable.run() will be called equal to amount of
sleep introduced between startUp
+ // and shutDown in seconds
+ Config config =
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL,
ConfigValueFactory.fromAnyRef(1));
+
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+ }
+
+ @Test
+ public void testWhenScalingDirectivesIsNull() throws IOException,
InterruptedException {
+
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(2000);
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ }
+
+ @Test
+ public void testWhenScalingDirectivesIsEmpty() throws IOException,
InterruptedException {
+
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new
ArrayList<>());
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(2000);
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ }
+
+ /** Note : this test uses {@link
org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource}*/
+ @Test
+ public void testWithDummyScalingDirectiveSource() throws
InterruptedException {
+ // DummyScalingDirectiveSource returns 2 scaling directives in first 3
invocations and after that it returns empty list
+ // so the total number of invocations after three invocations should
always be 3
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, new
DummyScalingDirectiveSource());
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(5000); // 5 seconds sleep so that
GetScalingDirectivesRunnable.run() is called for 5 times
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Review Comment:
I'm tempted to verify further what it actually did. e.g. that it called
`requestContainers`. what do you think?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.yarn.GobblinTemporalApplicationMaster;
+import
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager;
+
+/**
+ * {@link DummyScalingDirectiveSource} based implementation of {@link
AbstractDynamicScalingYarnServiceManager}.
+ * This class is meant to be used for testing purposes only.
+ */
Review Comment:
how is this to be used? are you hard coding when making a private build or
passing by config? if not the latter yet, would it be worth getting it to work
that way?
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+ private Config defaultConfigs;
+ private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+ private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+ @BeforeClass
+ public void setup() {
+ URL url = YarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+ this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+ }
+
+ @Test
+ public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws
Exception {
+ final int containerMemoryMbs = 1500;
+ final int containerCores = 5;
+ final int numContainers = 4;
+ Config config = this.defaultConfigs
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY,
ConfigValueFactory.fromAnyRef(containerCores))
+ .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
ConfigValueFactory.fromAnyRef(numContainers));
+
+ YarnService yarnService = new YarnService(
+ config,
+ "testApplicationName",
+ "testApplicationId",
+ yarnConfiguration,
+ mockFileSystem,
+ eventBus
+ );
+
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
containerMemoryMbs);
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
containerCores);
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
numContainers);
+ }
+
+ @Test
+ public void testBuildContainerCommand() throws Exception {
+ final double jvmMemoryXmxRatio = 0.7;
+ final int jvmMemoryOverheadMbs = 50;
+ final int resourceMemoryMB = 3072;
+ final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio)
- jvmMemoryOverheadMbs;
+
+ Config config =
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));
+
+ Resource resource = Resource.newInstance(resourceMemoryMB, 2);
+
+ Container mockContainer = Mockito.mock(Container.class);
+ Mockito.when(mockContainer.getResource()).thenReturn(resource);
+ Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L);
+
+ YarnService yarnService = new YarnService(
+ config,
+ "testApplicationName",
+ "testApplicationId",
+ yarnConfiguration,
+ mockFileSystem,
+ eventBus
+ );
+
+ String command = yarnService.buildContainerCommand(mockContainer,
"testHelixParticipantId", "testHelixInstanceTag");
+ Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M"));
Review Comment:
first off, thank you for kicking off testing for this module!
this seems to be a partial clone of the `YarnServiceTest` in `gobblin-yarn`.
I suggest noting that origin for maintainers in the javadoc. is any [other
validation from the
original](https://github.com/apache/gobblin/blob/ab62e72839bb4824b98a442de4dd1647284e880e/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java#L149)
worth including here too?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########
Review Comment:
I really like what you've done here w/ separating concerns via the abstract
base class! :D
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -419,41 +403,20 @@ private EventSubmitter buildEventSubmitter() {
.build();
}
- /**
- * Request an allocation of containers. If numTargetContainers is larger
than the max of current and expected number
- * of containers then additional containers are requested.
- * <p>
- * If numTargetContainers is less than the current number of allocated
containers then release free containers.
- * Shrinking is relative to the number of currently allocated containers
since it takes time for containers
- * to be allocated and assigned work and we want to avoid releasing a
container prematurely before it is assigned
- * work. This means that a container may not be released even though
numTargetContainers is less than the requested
- * number of containers. The intended usage is for the caller of this method
to make periodic calls to attempt to
- * adjust the cluster towards the desired number of containers.
- *
- * @param inUseInstances a set of in use instances
- * @return whether successfully requested the target number of containers
- */
- public synchronized boolean requestTargetNumberOfContainers(int
numContainers, Set<String> inUseInstances) {
- int defaultContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
- int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.
CONTAINER_CORES_KEY);
-
- LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances
count is {}, container map size is {}",
- numContainers, inUseInstances.size(), this.containerMap.size());
-
- requestContainers(numContainers,
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
- LOGGER.info("Current tag-container desired count:{}, tag-container
allocated: {}", numContainers, this.allocatedContainerCountMap);
- return true;
- }
-
- // Request initial containers with default resource and helix tag
- private void requestInitialContainers(int containersRequested) {
- requestTargetNumberOfContainers(containersRequested,
Collections.EMPTY_SET);
+ /** Request Initial containers using baselineWorkerProfile */
Review Comment:
let's generalize this impl for reuse, by removing the "baseline" / "initial"
parts. let's drop the presumption of a well-known default allocation ID and
refactor by essentially moving the impl from the `DynamicScalingYarnService` to
this base class:
```
private synchronized void requestContainersForWorkerProfile(WorkerProfile
workerProfile, int numContainers) {
int containerMemoryMbs =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
int containerCores =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
long allocationRequestId =
storeByUniqueAllocationRequestId(workerProfile);
requestContainers(numContainers,
Resource.newInstance(containerMemoryMbs, containerCores),
Optional.of(allocationRequestId));
}
```
afterwards, requesting initial containers is merely reaching into config for
`numInitialContainers` and call this common impl
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ *
+ * This is an abstract class that provides the basic functionality for
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends
AbstractIdleService {
+
+ protected final static String DYNAMIC_SCALING_POLLING_INTERVAL =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ protected final Config config;
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+
+ public
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ } else {
+ String errorMsg = "Failure while getting YarnService Instance from
GobblinTemporalApplicationMaster::get_yarnService()"
+ + " YarnService is not an instance of DynamicScalingYarnService";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
+ Optional.of("DynamicScalingExecutor")));
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
+ DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ log.info("Starting the " + this.getClass().getSimpleName());
+ log.info("Scheduling the dynamic scaling task with an interval of {}
seconds", scheduleInterval);
+
+ this.dynamicScalingExecutor.scheduleAtFixedRate(
+ new GetScalingDirectivesRunnable(this.dynamicScalingYarnService,
createScalingDirectiveSource()),
+ scheduleInterval, scheduleInterval, TimeUnit.SECONDS
+ );
+ }
+
+ @Override
+ protected void shutDown() {
+ log.info("Stopping the " + this.getClass().getSimpleName());
+ ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor,
Optional.of(log));
+ }
+
+ /**
+ * Create a {@link ScalingDirectiveSource} to use for getting scaling
directives.
+ */
+ protected abstract ScalingDirectiveSource createScalingDirectiveSource();
+
+ /**
+ * A {@link Runnable} that gets the scaling directives from the {@link
ScalingDirectiveSource} and passes them to the
+ * {@link DynamicScalingYarnService} for processing.
+ */
+ @AllArgsConstructor
+ static class GetScalingDirectivesRunnable implements Runnable {
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScalingDirectiveSource scalingDirectiveSource;
+
+ @Override
+ public void run() {
+ try {
+ List<ScalingDirective> scalingDirectives =
scalingDirectiveSource.getScalingDirectives();
+ if (CollectionUtils.isNotEmpty(scalingDirectives)) {
+
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
+ }
+ } catch (IOException e) {
+ log.error("Failed to get scaling directives", e);
+ } catch (Throwable t) {
+ log.error("Suppressing error from GetScalingDirectivesRunnable.run()",
t);
Review Comment:
no need to name where, as that shows in the stack trace. maybe just
"Unexpected error with dynamic scaling via directives"
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.Optional;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link FsScalingDirectiveSource} based implementation of {@link
AbstractDynamicScalingYarnServiceManager}.
+ */
+public class FsSourceDynamicScalingYarnServiceManager extends
AbstractDynamicScalingYarnServiceManager {
+ private final static String DYNAMIC_SCALING_DIRECTIVES_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
+ private final static String DYNAMIC_SCALING_ERRORS_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
Review Comment:
`public`, as other code may wish to reference/reuse
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
Review Comment:
these two classes might better belong in `src/test/java`...
anyway, they're completely unrelated to the "temporal load-gen" of the
`loadgen` pkg
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers requested for each worker
profile */
+ private final WorkforceStaffing workforceStaffing;
+ /** this holds the current total workforce plan as per latest received
scaling directives */
+ private final WorkforcePlan workforcePlan;
+
+ public DynamicScalingYarnService(Config config, String applicationName,
String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus)
throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+
+ int initialContainers =
this.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
+
+ this.workforceStaffing = WorkforceStaffing.initialize(initialContainers);
Review Comment:
[less preferred!] either worth a comment that this truly pre-counts the
initial containers in advance of them actually being allocated (when
`startUp()` runs)
[preferred] alternatively, just initialize as empty (which is most accurate):
```
this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
```
then have the base class `YarnService::startup` call a method you override
here
```
@Override
protected void requestInitialContainers() {
StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}
```
?
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+ private Config defaultConfigs;
+ private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+ private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+ @BeforeClass
+ public void setup() {
+ URL url = YarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+ this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+ }
+
+ @Test
+ public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws
Exception {
+ final int containerMemoryMbs = 1500;
+ final int containerCores = 5;
+ final int numContainers = 4;
+ Config config = this.defaultConfigs
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY,
ConfigValueFactory.fromAnyRef(containerCores))
+ .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
ConfigValueFactory.fromAnyRef(numContainers));
+
+ YarnService yarnService = new YarnService(
+ config,
+ "testApplicationName",
+ "testApplicationId",
+ yarnConfiguration,
+ mockFileSystem,
+ eventBus
+ );
+
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
containerMemoryMbs);
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
containerCores);
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
numContainers);
+ }
+
+ @Test
+ public void testBuildContainerCommand() throws Exception {
+ final double jvmMemoryXmxRatio = 0.7;
+ final int jvmMemoryOverheadMbs = 50;
+ final int resourceMemoryMB = 3072;
+ final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio)
- jvmMemoryOverheadMbs;
+
+ Config config =
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));
Review Comment:
nit: would line up better if both `.withValue` started a new line
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -257,27 +250,18 @@ public YarnService(Config config, String applicationName,
String applicationId,
GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS),
TimeUnit.SECONDS).build();
- this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
- GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
- Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 &&
this.jvmMemoryXmxRatio <= 1,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + "
must be between 0 and 1 inclusive");
-
- this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
- Preconditions.checkArgument(this.jvmMemoryOverheadMbs <
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + "
cannot be more than "
- + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
- + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
this.appViewAcl = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.jarCacheEnabled = ConfigUtils.getBoolean(this.config,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+ // Initialising this baseline worker profile to use as default worker
profile in case allocation request id is not in map
+ this.baselineWorkerProfile = new
WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config);
Review Comment:
sorry, I don't want to be too pedantic, but the most important
initialization for Dynamic Scaling is `WorkforcePlan` as:
```
new WorkforcePlan(this.config,
config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
```
which you do in the DynamicScalingYarnService` ctor. when that's present,
the baseline profile should follow from that plan, rather than the reverse of
initializing the plan w/ a free-floating baseline profile, as you have here.
AFAICT, there's no need to maintain a `baselineWorkerProfile` member in this
class at all. instead introduce an overridable method to be called within
`startUp` and give it this fallback impl:
```
/** unless overridden to actually scale, "initial" containers may be the
app's *only* containers! */
protected void requestInitialContainers() {
WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config); //
NOTE: I suggest adding an overloaded single arg ctor, which internally uses
`WorkforceProfiles.BASELINE_NAME
int numContainers =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
LOGGER.info("Requesting {} initial (static) containers with baseline
(only) profile, never to be re-scaled", numContainers);
requestContainersForWorkerProfile(workerProfile, numContainers);
}
```
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+ private Config defaultConfigs;
+ private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+ private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+ @BeforeClass
+ public void setup() {
+ URL url = YarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+ this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+ }
+
+ @Test
+ public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws
Exception {
+ final int containerMemoryMbs = 1500;
+ final int containerCores = 5;
+ final int numContainers = 4;
+ Config config = this.defaultConfigs
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY,
ConfigValueFactory.fromAnyRef(containerCores))
+ .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
ConfigValueFactory.fromAnyRef(numContainers));
+
+ YarnService yarnService = new YarnService(
+ config,
+ "testApplicationName",
+ "testApplicationId",
+ yarnConfiguration,
+ mockFileSystem,
+ eventBus
+ );
+
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
containerMemoryMbs);
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
containerCores);
+
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
numContainers);
Review Comment:
`baselineWorkerProfile` seems like such an internal impl detail that I'm not
sure it belongs in validation. how about instead defining the
`requestInitialContainers` method I proposed above and having this test call
`YarnService::startUp` before validating the args in a call to a method like:
```
protected void requestContainers(int numContainers, Resource resource,
Optional<Long> optAllocationRequestId)
```
?
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+ @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+ @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+ @Mock private GobblinTemporalApplicationMaster
mockGobblinTemporalApplicationMaster;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ // Using 1 second as polling interval so that the test runs faster and
+ // GetScalingDirectivesRunnable.run() will be called equal to amount of
sleep introduced between startUp
+ // and shutDown in seconds
+ Config config =
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL,
ConfigValueFactory.fromAnyRef(1));
+
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+ }
+
+ @Test
+ public void testWhenScalingDirectivesIsNull() throws IOException,
InterruptedException {
+
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(2000);
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Review Comment:
`Mockito.never()`
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+ @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+ @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+ @Mock private GobblinTemporalApplicationMaster
mockGobblinTemporalApplicationMaster;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ // Using 1 second as polling interval so that the test runs faster and
+ // GetScalingDirectivesRunnable.run() will be called equal to amount of
sleep introduced between startUp
+ // and shutDown in seconds
+ Config config =
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL,
ConfigValueFactory.fromAnyRef(1));
+
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+ }
+
+ @Test
+ public void testWhenScalingDirectivesIsNull() throws IOException,
InterruptedException {
+
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(2000);
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ }
+
+ @Test
+ public void testWhenScalingDirectivesIsEmpty() throws IOException,
InterruptedException {
+
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new
ArrayList<>());
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(2000);
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ }
Review Comment:
suggest to combine w/ test above, by chaining a second `.thenReturn`
invocation there
Issue Time Tracking
-------------------
Worklog Id: (was: 946768)
Time Spent: 3h (was: 2h 50m)
> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
> Key: GOBBLIN-2174
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-core
> Reporter: Vivek Rai
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn
> Service needs to be integrated with the dynamic scaling to have fully
> functional dynamic scalable yarn service.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)