[
https://issues.apache.org/jira/browse/GOBBLIN-1915?focusedWorklogId=881337&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-881337
]
ASF GitHub Bot logged work on GOBBLIN-1915:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Sep/23 07:55
Start Date: 22/Sep/23 07:55
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3785:
URL: https://github.com/apache/gobblin/pull/3785#discussion_r1333987840
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.util.List;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnLogSource;
+import org.apache.gobblin.yarn.YarnContainerSecurityManager;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+
+/**
+ * The Yarn ApplicationMaster class for Gobblin using Temporal.
+ *
+ * <p>
+ * This class runs the {@link YarnService} for all Yarn-related stuffs like
ApplicationMaster registration
+ * and un-registration and Yarn container provisioning.
+ * </p>
+ *
+ * @author Yinan Li
Review Comment:
not accurate
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import
com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
+import org.apache.gobblin.cluster.ContainerHealthMetricsService;
+import org.apache.gobblin.cluster.ContainerMetrics;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The main class running in the containers managing services for running
Gobblin
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
+ *
+ * <p>
+ * If for some reason, the container exits or gets killed, the {@link
GobblinClusterManager} will
+ * be notified for the completion of the container and will start a new
container to replace this one.
+ * </p>
+ *
+ * @author Yinan Li
+ */
+@Alpha
+public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
+ // Working directory key for applications. This config is set dynamically.
+ public static final String CLUSTER_APP_WORK_DIR =
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
+
+ private static final Logger logger =
LoggerFactory.getLogger(GobblinTemporalTaskRunner.class);
+
+ static final java.nio.file.Path CLUSTER_CONF_PATH =
Paths.get("generated-gobblin-cluster.conf");
+
+ private final Optional<ContainerMetrics> containerMetrics;
+ private final Path appWorkPath;
+ private boolean isTaskDriver;
+ @Getter
+ private volatile boolean started = false;
+ private volatile boolean stopInProgress = false;
+ private volatile boolean isStopped = false;
+ @Getter
+ @Setter
+ private volatile boolean healthCheckFailed = false;
+
+ protected final String taskRunnerId;
+ protected final EventBus eventBus = new
EventBus(GobblinTemporalTaskRunner.class.getSimpleName());
+ protected final Config clusterConfig;
+ @Getter
+ protected final FileSystem fs;
+ protected final String applicationName;
+ protected final String applicationId;
+ protected final int numTemporalWorkers;
+ protected final String temporalQueueName;
+ private final boolean isMetricReportingFailureFatal;
+ private final boolean isEventReportingFailureFatal;
+
+ public GobblinTemporalTaskRunner(String applicationName,
+ String applicationId,
+ String taskRunnerId,
+ Config config,
+ Optional<Path> appWorkDirOptional) throws Exception {
+ GobblinClusterUtils.setSystemProperties(config);
+
+ //Add dynamic config
+ config = GobblinClusterUtils.addDynamicConfig(config);
+
+ this.isTaskDriver = ConfigUtils.getBoolean(config,
GobblinClusterConfigurationKeys.TASK_DRIVER_ENABLED,false);
+ this.taskRunnerId = taskRunnerId;
+ this.applicationName = applicationName;
+ this.applicationId = applicationId;
+ Configuration conf = HadoopUtils.newConfiguration();
+ this.fs = GobblinClusterUtils.buildFileSystem(config, conf);
+ this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
+ this.clusterConfig = saveConfigToFile(config);
+
+ logger.info("Configured GobblinTaskRunner work dir to: {}",
this.appWorkPath.toString());
+
+ this.containerMetrics = buildContainerMetrics();
+ this.numTemporalWorkers = ConfigUtils.getInt(config,
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
+ this.temporalQueueName = ConfigUtils.getString(config,
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
+ GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
+
+ this.isMetricReportingFailureFatal =
ConfigUtils.getBoolean(this.clusterConfig,
+ ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+ ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+
+ this.isEventReportingFailureFatal =
ConfigUtils.getBoolean(this.clusterConfig,
+ ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+ ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
+
+ logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {},
taskRunnerId {}, config {}, appWorkDir {}",
+ this.isTaskDriver ? "taskDriver" : "worker",
+ applicationName,
+ applicationId,
+ taskRunnerId,
+ config,
+ appWorkDirOptional);
+ }
+
+ public TaskRunnerSuiteBase.Builder getTaskRunnerSuiteBuilder() throws
ReflectiveOperationException {
+ String builderStr = ConfigUtils.getString(this.clusterConfig,
+ GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
+ TaskRunnerSuiteBase.Builder.class.getName());
+
+ String hostName = "";
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ logger.warn("Cannot find host name for Helix instance: {}");
+ }
+
+ TaskRunnerSuiteBase.Builder builder =
GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
+ new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
+ .resolveClass(builderStr), this.clusterConfig);
+
+ return builder.setAppWorkPath(this.appWorkPath)
+ .setContainerMetrics(this.containerMetrics)
+ .setFileSystem(this.fs)
+ .setApplicationId(applicationId)
+ .setApplicationName(applicationName)
+ .setContainerId(taskRunnerId)
+ .setHostName(hostName);
+ }
+
+ private Path initAppWorkDir(Config config, Optional<Path>
appWorkDirOptional) {
+ return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() :
GobblinClusterUtils
+ .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName,
this.applicationId);
+ }
+
+ private Config saveConfigToFile(Config config)
+ throws IOException {
+ Config newConf = config
+ .withValue(CLUSTER_APP_WORK_DIR,
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+ ConfigUtils configUtils = new ConfigUtils(new FileUtils());
+ configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
+ return newConf;
+ }
+
+ /**
+ * Start this {@link GobblinTemporalTaskRunner} instance.
+ */
+ public void start()
+ throws ContainerHealthCheckException {
+ logger.info("Calling start method in GobblinTemporalTaskRunner");
+ logger.info(String.format("Starting in container %s", this.taskRunnerId));
+
+ // Start metric reporting
+ initMetricReporter();
+
+ // Add a shutdown hook so the task scheduler gets properly shutdown
+ addShutdownHook();
+
+ try {
+ for (int i = 0; i < this.numTemporalWorkers; i++) {
+ initiateWorker();
+ }
+ }catch (Exception e) {
+ logger.info(e + " for initiate workers");
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initiateWorker() throws Exception{
+ logger.info("Starting Temporal Worker");
+
+ String connectionUri =
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
+ WorkflowServiceStubs service =
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
+
+ String namespace = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
+
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
+ WorkflowClient client =
TemporalWorkflowClientFactory.createClientInstance(service, namespace);
+
+ String workerClassName = ConfigUtils.getString(clusterConfig,
+ GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+ AbstractTemporalWorker worker =
GobblinConstructorUtils.invokeLongestConstructor(
+ (Class<AbstractTemporalWorker>) Class.forName(workerClassName),
clusterConfig, client);
+ worker.start();
Review Comment:
not sure how important this may for proper resource management, but these
workers do have a corresponding `shutdown()` method, I don't see called
anywhere.
if challenging to stitch in now, at least leave a TODO
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import
com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
+import org.apache.gobblin.cluster.ContainerHealthMetricsService;
+import org.apache.gobblin.cluster.ContainerMetrics;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The main class running in the containers managing services for running
Gobblin
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
+ *
+ * <p>
+ * If for some reason, the container exits or gets killed, the {@link
GobblinClusterManager} will
+ * be notified for the completion of the container and will start a new
container to replace this one.
+ * </p>
+ *
+ * @author Yinan Li
Review Comment:
also
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -0,0 +1,948 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnEventConstants;
+import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This class is responsible for all Yarn-related stuffs including
ApplicationMaster registration,
+ * ApplicationMaster un-registration, Yarn container management, etc.
+ *
+ * NOTE: This is a stripped down version of {@link
org.apache.gobblin.yarn.YarnService} that is used for temporal testing
+ * without any dependency on Helix. There are some references to helix
concepts, but they are left in for the sake of
+ * keeping some features in-tact. They don't have an actual dependency on
helix anymore.
+ *
+ * @author Yinan Li
Review Comment:
here too
Issue Time Tracking
-------------------
Worklog Id: (was: 881337)
Time Spent: 0.5h (was: 20m)
> Create a module that depends on Temporal instead of Helix Task Framework
> ------------------------------------------------------------------------
>
> Key: GOBBLIN-1915
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1915
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Helix task framework is slowly fading away as a technology. The industry has
> adopted Temporal as a reliable task framework with good observability, low
> boiler plate, and a theoretical higher throughput. In order to explore this
> technology further, we should create a separate module that is able to spin
> up yarn app master that leverages Temporal instead of Helix TF
--
This message was sent by Atlassian Jira
(v8.20.10#820010)