[
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946211&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946211
]
ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Nov/24 14:05
Start Date: 30/Nov/24 14:05
Worklog Time Spent: 10m
Work Description: Blazer-007 commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1862400342
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+ private final String DYNAMIC_SCALING_PREFIX =
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+ private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX
+ "directives.dir";
+ private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX +
"errors.dir";
+ private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX
+ "initial.delay";
+ private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+ private final String DYNAMIC_SCALING_POLLING_INTERVAL =
DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ private final Config config;
+ DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+ private final FileSystem fs;
+
+ public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+ com.google.common.base.Optional.of("DynamicScalingExecutor")));
+ this.fs = appMaster.getFs();
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
+ DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ int initialDelay = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_INITIAL_DELAY,
+ DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS);
+
+ ScalingDirectiveSource fsScalingDirectiveSource = new
FsScalingDirectiveSource(
+ this.fs,
+ this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+ Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+ );
+
+ // TODO: remove this line later
+ // Using for testing purposes only
+ ScalingDirectiveSource scalingDirectiveSource = new
DummyScalingDirectiveSource();
Review Comment:
I am using `DummyScalingDirectiveSource();` to launch containers at runtime
if i run any job to test complete e2e.
>... which reminds me.... how is this DSYSM created and initialized at
present?
here after starting yarnservice -
https://github.com/apache/gobblin/blob/master/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java#L102
we initialize other service classes whose names are passed through config
https://github.com/apache/gobblin/blob/e5d897edaee391d05a55e6ac8a420e3416fef6d9/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java#L78
Issue Time Tracking
-------------------
Worklog Id: (was: 946211)
Time Spent: 2h 40m (was: 2.5h)
> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
> Key: GOBBLIN-2174
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-core
> Reporter: Vivek Rai
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn
> Service needs to be integrated with the dynamic scaling to have fully
> functional dynamic scalable yarn service.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)