[ 
https://issues.apache.org/jira/browse/GOBBLIN-1915?focusedWorklogId=881337&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-881337
 ]

ASF GitHub Bot logged work on GOBBLIN-1915:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Sep/23 07:55
            Start Date: 22/Sep/23 07:55
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3785:
URL: https://github.com/apache/gobblin/pull/3785#discussion_r1333987840


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.util.List;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnLogSource;
+import org.apache.gobblin.yarn.YarnContainerSecurityManager;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+
+/**
+ * The Yarn ApplicationMaster class for Gobblin using Temporal.
+ *
+ * <p>
+ *   This class runs the {@link YarnService} for all Yarn-related stuffs like 
ApplicationMaster registration
+ *   and un-registration and Yarn container provisioning.
+ * </p>
+ *
+ * @author Yinan Li

Review Comment:
   not accurate



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
+import org.apache.gobblin.cluster.ContainerHealthMetricsService;
+import org.apache.gobblin.cluster.ContainerMetrics;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The main class running in the containers managing services for running 
Gobblin
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
+ *
+ * <p>
+ *   If for some reason, the container exits or gets killed, the {@link 
GobblinClusterManager} will
+ *   be notified for the completion of the container and will start a new 
container to replace this one.
+ * </p>
+ *
+ * @author Yinan Li
+ */
+@Alpha
+public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
+  // Working directory key for applications. This config is set dynamically.
+  public static final String CLUSTER_APP_WORK_DIR = 
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
+
+  private static final Logger logger = 
LoggerFactory.getLogger(GobblinTemporalTaskRunner.class);
+
+  static final java.nio.file.Path CLUSTER_CONF_PATH = 
Paths.get("generated-gobblin-cluster.conf");
+
+  private final Optional<ContainerMetrics> containerMetrics;
+  private final Path appWorkPath;
+  private boolean isTaskDriver;
+  @Getter
+  private volatile boolean started = false;
+  private volatile boolean stopInProgress = false;
+  private volatile boolean isStopped = false;
+  @Getter
+  @Setter
+  private volatile boolean healthCheckFailed = false;
+
+  protected final String taskRunnerId;
+  protected final EventBus eventBus = new 
EventBus(GobblinTemporalTaskRunner.class.getSimpleName());
+  protected final Config clusterConfig;
+  @Getter
+  protected final FileSystem fs;
+  protected final String applicationName;
+  protected final String applicationId;
+  protected final int numTemporalWorkers;
+  protected final String temporalQueueName;
+  private final boolean isMetricReportingFailureFatal;
+  private final boolean isEventReportingFailureFatal;
+
+  public GobblinTemporalTaskRunner(String applicationName,
+      String applicationId,
+      String taskRunnerId,
+      Config config,
+      Optional<Path> appWorkDirOptional) throws Exception {
+    GobblinClusterUtils.setSystemProperties(config);
+
+    //Add dynamic config
+    config = GobblinClusterUtils.addDynamicConfig(config);
+
+    this.isTaskDriver = ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.TASK_DRIVER_ENABLED,false);
+    this.taskRunnerId = taskRunnerId;
+    this.applicationName = applicationName;
+    this.applicationId = applicationId;
+    Configuration conf = HadoopUtils.newConfiguration();
+    this.fs = GobblinClusterUtils.buildFileSystem(config, conf);
+    this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
+    this.clusterConfig = saveConfigToFile(config);
+
+    logger.info("Configured GobblinTaskRunner work dir to: {}", 
this.appWorkPath.toString());
+
+    this.containerMetrics = buildContainerMetrics();
+    this.numTemporalWorkers = ConfigUtils.getInt(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
+    this.temporalQueueName = ConfigUtils.getString(config, 
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
+        GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
+
+    this.isMetricReportingFailureFatal = 
ConfigUtils.getBoolean(this.clusterConfig,
+        ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+        ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+
+    this.isEventReportingFailureFatal = 
ConfigUtils.getBoolean(this.clusterConfig,
+        ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+        ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
+
+    logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, 
taskRunnerId {}, config {}, appWorkDir {}",
+        this.isTaskDriver ? "taskDriver" : "worker",
+        applicationName,
+        applicationId,
+        taskRunnerId,
+        config,
+        appWorkDirOptional);
+  }
+
+  public TaskRunnerSuiteBase.Builder getTaskRunnerSuiteBuilder() throws 
ReflectiveOperationException {
+    String builderStr = ConfigUtils.getString(this.clusterConfig,
+        GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
+        TaskRunnerSuiteBase.Builder.class.getName());
+
+    String hostName = "";
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      logger.warn("Cannot find host name for Helix instance: {}");
+    }
+
+    TaskRunnerSuiteBase.Builder builder = 
GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
+        new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
+            .resolveClass(builderStr), this.clusterConfig);
+
+    return builder.setAppWorkPath(this.appWorkPath)
+        .setContainerMetrics(this.containerMetrics)
+        .setFileSystem(this.fs)
+        .setApplicationId(applicationId)
+        .setApplicationName(applicationName)
+        .setContainerId(taskRunnerId)
+        .setHostName(hostName);
+  }
+
+  private Path initAppWorkDir(Config config, Optional<Path> 
appWorkDirOptional) {
+    return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : 
GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, 
this.applicationId);
+  }
+
+  private Config saveConfigToFile(Config config)
+      throws IOException {
+    Config newConf = config
+        .withValue(CLUSTER_APP_WORK_DIR, 
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+    ConfigUtils configUtils = new ConfigUtils(new FileUtils());
+    configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
+    return newConf;
+  }
+
+  /**
+   * Start this {@link GobblinTemporalTaskRunner} instance.
+   */
+  public void start()
+      throws ContainerHealthCheckException {
+    logger.info("Calling start method in GobblinTemporalTaskRunner");
+    logger.info(String.format("Starting in container %s", this.taskRunnerId));
+
+    // Start metric reporting
+    initMetricReporter();
+
+    // Add a shutdown hook so the task scheduler gets properly shutdown
+    addShutdownHook();
+
+    try {
+      for (int i = 0; i < this.numTemporalWorkers; i++) {
+        initiateWorker();
+      }
+    }catch (Exception e) {
+      logger.info(e + " for initiate workers");
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void initiateWorker() throws Exception{
+    logger.info("Starting Temporal Worker");
+
+    String connectionUri = 
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
+    WorkflowServiceStubs service = 
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
+
+    String namespace = ConfigUtils.getString(clusterConfig, 
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
+            
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
+    WorkflowClient client = 
TemporalWorkflowClientFactory.createClientInstance(service, namespace);
+
+    String workerClassName = ConfigUtils.getString(clusterConfig,
+        GobblinTemporalConfigurationKeys.WORKER_CLASS, 
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+    AbstractTemporalWorker worker = 
GobblinConstructorUtils.invokeLongestConstructor(
+        (Class<AbstractTemporalWorker>) Class.forName(workerClassName), 
clusterConfig, client);
+    worker.start();

Review Comment:
   not sure how important this may for proper resource management, but these 
workers do have a corresponding `shutdown()` method, I don't see called 
anywhere.
   
   if challenging to stitch in now, at least leave a TODO



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
+import org.apache.gobblin.cluster.ContainerHealthMetricsService;
+import org.apache.gobblin.cluster.ContainerMetrics;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The main class running in the containers managing services for running 
Gobblin
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
+ *
+ * <p>
+ *   If for some reason, the container exits or gets killed, the {@link 
GobblinClusterManager} will
+ *   be notified for the completion of the container and will start a new 
container to replace this one.
+ * </p>
+ *
+ * @author Yinan Li

Review Comment:
   also



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -0,0 +1,948 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnEventConstants;
+import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This class is responsible for all Yarn-related stuffs including 
ApplicationMaster registration,
+ * ApplicationMaster un-registration, Yarn container management, etc.
+ *
+ * NOTE: This is a stripped down version of {@link 
org.apache.gobblin.yarn.YarnService} that is used for temporal testing
+ * without any dependency on Helix. There are some references to helix 
concepts, but they are left in for the sake of
+ * keeping some features in-tact. They don't have an actual dependency on 
helix anymore.
+ *
+ * @author Yinan Li

Review Comment:
   here too





Issue Time Tracking
-------------------

    Worklog Id:     (was: 881337)
    Time Spent: 0.5h  (was: 20m)

> Create a module that depends on Temporal instead of Helix Task Framework
> ------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1915
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1915
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Helix task framework is slowly fading away as a technology. The industry has 
> adopted Temporal as a reliable task framework with good observability, low 
> boiler plate, and a theoretical higher throughput. In order to explore this 
> technology further, we should create a separate module that is able to spin 
> up yarn app master that leverages Temporal instead of Helix TF



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to