[
https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=933302&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933302
]
ASF GitHub Bot logged work on GOBBLIN-2135:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Sep/24 06:24
Start Date: 05/Sep/24 06:24
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4030:
URL: https://github.com/apache/gobblin/pull/4030#discussion_r1735415804
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java:
##########
@@ -572,56 +573,22 @@ private void addJars(Path jarFileDir, String jarFileList,
Configuration conf) th
for (String jarFile : SPLITTER.split(jarFileList)) {
Path srcJarFile = new Path(jarFile);
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
-
for (FileStatus status : fileStatusList) {
+ Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status,
this.unsharedJarsDir, jarFileDir);
// For each FileStatus there are chances it could fail in copying at
the first attempt, due to file-existence
// or file-copy is ongoing by other job instance since all Gobblin
jobs share the same jar file directory.
// the retryCount is to avoid cases (if any) where retry is going too
far and causes job hanging.
- int retryCount = 0;
- boolean shouldFileBeAddedIntoDC = true;
- Path destJarFile = calculateDestJarFile(status, jarFileDir);
- // Adding destJarFile into HDFS until it exists and the size of file
on targetPath matches the one on local path.
- while (!this.fs.exists(destJarFile) ||
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- try {
- if (this.fs.exists(destJarFile) &&
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
- throw new IOException("Waiting for file to complete on uploading
... ");
- }
- // Set the first parameter as false for not deleting sourceFile
- // Set the second parameter as false for not overwriting existing
file on the target, by default it is true.
- // If the file is preExisted but overwrite flag set to false, then
an IOException if thrown.
- this.fs.copyFromLocalFile(false, false, status.getPath(),
destJarFile);
- } catch (IOException | InterruptedException e) {
- LOG.warn("Path:" + destJarFile + " is not copied successfully.
Will require retry.");
- retryCount += 1;
- if (retryCount >= this.jarFileMaximumRetry) {
- LOG.error("The jar file:" + destJarFile + "failed in being
copied into hdfs", e);
- // If retry reaches upper limit, skip copying this file.
- shouldFileBeAddedIntoDC = false;
- break;
- }
- }
- }
- if (shouldFileBeAddedIntoDC) {
+ if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status,
this.jarFileMaximumRetry, destJarFile)) {
// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
+ } else {
+ LOG.error("Failed to upload jar file: " + status.getPath());
Review Comment:
I don't find the prior code throwing an error...
nonetheless, should everything continue on w/ just some error logs?
shouldn't we instead fail the overall job because presumably necessary jars
won't be there?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -484,12 +487,29 @@ private void requestContainer(Optional<String>
preferredNode, Resource resource)
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
+ Path containerJarsUnsharedDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
Review Comment:
though you said in the PR desc, suggest a comment here about "unshared" dir
being for "-SNAPSHOT" versions
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+ private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+ /**
+ * Calculate the target filePath of the jar file to be copied on HDFS,
+ * given the {@link FileStatus} of a jarFile and the path of directory that
contains jar.
+ * Snapshot dirs should not be shared, as different jobs may be using
different versions of it.
+ * @param fs
+ * @param localJar
+ * @param unsharedJarsDir
+ * @param jarCacheDir
+ * @return
+ * @throws IOException
+ */
+ public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar,
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
Review Comment:
suggest to name `calculateDestJarFilePath`.
and, since the only use is `localJar.getPath().getName()` suggest to make
the param `String jarName`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -300,10 +304,10 @@ public GobblinYarnAppLauncher(Config config,
YarnConfiguration yarnConfiguration
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.detachOnExitEnabled = ConfigUtils
- .getBoolean(config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
+ .getBoolean(this.config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_DETACH_ON_EXIT_ENABLED,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_DETACH_ON_EXIT);
- this.appLauncherMode = ConfigUtils.getString(config,
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
-
+ this.appLauncherMode = ConfigUtils.getString(this.config,
GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE);
+ this.jarCacheEnabled = ConfigUtils.getBoolean(config,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
Review Comment:
NBD, but you just updated the two above to be `this.config`, but only use
`config` here :)
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+ private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
Review Comment:
are these seconds? suggest a suffix to clarify
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java:
##########
@@ -199,6 +203,18 @@ public static void setYarnClassPath(Config config,
Configuration yarnConfigurati
}
}
+ public static Path getJarPathCacheAndCleanIfNeeded(Config config, FileSystem
fs) throws IOException {
Review Comment:
this feels like two separate operations:
```
Path calcJarCacheCurrentPath(Config, FileSystem);
boolean retainKLatestCachePaths(Path parentCachePath, int k); /// true iff
any deletion
```
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+ private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+ /**
+ * Calculate the target filePath of the jar file to be copied on HDFS,
+ * given the {@link FileStatus} of a jarFile and the path of directory that
contains jar.
+ * Snapshot dirs should not be shared, as different jobs may be using
different versions of it.
+ * @param fs
+ * @param localJar
+ * @param unsharedJarsDir
+ * @param jarCacheDir
+ * @return
+ * @throws IOException
+ */
+ public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar,
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+ Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ?
unsharedJarsDir : jarCacheDir;
+ Path destJarFile = new Path(fs.makeQualified(uploadDir),
localJar.getPath().getName());
+ return destJarFile;
+ }
+ /**
+ * Upload a jar file to HDFS with retries to handle already existing jars
+ * @param fs
+ * @param localJar
+ * @param destJarFile
+ * @param jarFileMaximumRetry
+ * @return
+ * @throws IOException
+ */
+ public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar,
int jarFileMaximumRetry, Path destJarFile) throws IOException {
Review Comment:
`jarFileMaximumRetry` => simply `maxAttempts`?
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+ private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+ /**
+ * Calculate the target filePath of the jar file to be copied on HDFS,
+ * given the {@link FileStatus} of a jarFile and the path of directory that
contains jar.
+ * Snapshot dirs should not be shared, as different jobs may be using
different versions of it.
+ * @param fs
+ * @param localJar
+ * @param unsharedJarsDir
+ * @param jarCacheDir
+ * @return
+ * @throws IOException
+ */
+ public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar,
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+ Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ?
unsharedJarsDir : jarCacheDir;
+ Path destJarFile = new Path(fs.makeQualified(uploadDir),
localJar.getPath().getName());
+ return destJarFile;
+ }
+ /**
+ * Upload a jar file to HDFS with retries to handle already existing jars
+ * @param fs
+ * @param localJar
+ * @param destJarFile
+ * @param jarFileMaximumRetry
+ * @return
+ * @throws IOException
+ */
+ public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar,
int jarFileMaximumRetry, Path destJarFile) throws IOException {
+ int retryCount = 0;
+ while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen()
!= localJar.getLen()) {
+ try {
+ if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen()
!= localJar.getLen()) {
+ Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
Review Comment:
what are we waiting on here? is `copyFromLocalFile` asynchronous? or are
we thinking a different process may be doing upload.
with `overwriteAnyExistingDestFile == false`, we don't seem to undertake any
"repair", do we? instead we just wait `(waitTime * jarFileMaximumRetry)`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -640,31 +643,36 @@ private Resource
prepareContainerResource(GetNewApplicationResponse newApplicati
private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId
applicationId) throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
+ Path jarsRootDir = this.jarCacheEnabled ?
YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) :
appWorkDir;
Path appMasterWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
- LOGGER.info("Configured GobblinApplicationMaster work directory to: {}",
appMasterWorkDir.toString());
+ Path appMasterJarsCacheDir = new Path(jarsRootDir,
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
+ LOGGER.info("Configured GobblinApplicationMaster work directory to: {}",
appMasterWorkDir);
+ LOGGER.info("Configured GobblinApplicationMaster jars directory to: {}",
appMasterJarsCacheDir);
Map<String, LocalResource> appMasterResources = Maps.newHashMap();
FileSystem localFs = FileSystem.getLocal(new Configuration());
- // NOTE: log after each step below for insight into what takes bulk of time
if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) {
- Path libJarsDestDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+ // Lib jars are shared between all containers, store at the root level
+ Path libJarsDestDir = new Path(jarsRootDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
+ Path unsharedJarsDestDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME);
addLibJars(new
Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)),
- Optional.of(appMasterResources), libJarsDestDir, localFs);
- LOGGER.info("Added lib jars to directory: {}",
libJarsDestDir.toString());
+ Optional.of(appMasterResources), libJarsDestDir,
unsharedJarsDestDir, localFs);
+ LOGGER.info("Added lib jars to directory: {}", libJarsDestDir);
Review Comment:
it seems `addLibJars` could also add some to `unsharedJarsDestDir`, so good
to mention that while logging
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.filesystem;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility class for uploading jar files to HDFS with retries to handle
concurrency
+ */
+@Slf4j
+public class HdfsJarUploadUtils {
+
+ private static final long WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
+ /**
+ * Calculate the target filePath of the jar file to be copied on HDFS,
+ * given the {@link FileStatus} of a jarFile and the path of directory that
contains jar.
+ * Snapshot dirs should not be shared, as different jobs may be using
different versions of it.
+ * @param fs
+ * @param localJar
+ * @param unsharedJarsDir
+ * @param jarCacheDir
+ * @return
+ * @throws IOException
+ */
+ public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar,
Path unsharedJarsDir, Path jarCacheDir) throws IOException {
+ Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ?
unsharedJarsDir : jarCacheDir;
+ Path destJarFile = new Path(fs.makeQualified(uploadDir),
localJar.getPath().getName());
+ return destJarFile;
+ }
+ /**
+ * Upload a jar file to HDFS with retries to handle already existing jars
+ * @param fs
+ * @param localJar
+ * @param destJarFile
+ * @param jarFileMaximumRetry
+ * @return
+ * @throws IOException
+ */
+ public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar,
int jarFileMaximumRetry, Path destJarFile) throws IOException {
+ int retryCount = 0;
+ while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen()
!= localJar.getLen()) {
+ try {
+ if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen()
!= localJar.getLen()) {
+ Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
+ throw new IOException("Waiting for file to complete on uploading ...
");
+ }
+ // Set the first parameter as false for not deleting sourceFile
+ // Set the second parameter as false for not overwriting existing file
on the target, by default it is true.
+ // If the file is preExisted but overwrite flag set to false, then an
IOException if thrown.
+ fs.copyFromLocalFile(false, false, localJar.getPath(), destJarFile);
Review Comment:
```
boolean deleteSourceFile = false;
boolean overwriteAnyExistingDestFile = false; // IOException will be thrown
if does already exist
fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile,
localJar.getPath(), destJarFile)
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -484,12 +487,29 @@ private void requestContainer(Optional<String>
preferredNode, Resource resource)
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
+ Path containerJarsUnsharedDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ Path jarCacheDir = this.jarCacheEnabled ?
YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) :
appWorkDir;
+ Path containerJarsCachedDir = new Path(jarCacheDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
+ LOGGER.info("Container uncached jars root dir: " +
containerJarsUnsharedDir);
Review Comment:
I'd find something like "execution-private" or "unshared" more explicit than
uncached.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -484,12 +487,29 @@ private void requestContainer(Optional<String>
preferredNode, Resource resource)
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
+ Path containerJarsUnsharedDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ Path jarCacheDir = this.jarCacheEnabled ?
YarnHelixUtils.getJarPathCacheAndCleanIfNeeded(this.config, this.fs) :
appWorkDir;
+ Path containerJarsCachedDir = new Path(jarCacheDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
+ LOGGER.info("Container uncached jars root dir: " +
containerJarsUnsharedDir);
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
- Map<String, LocalResource> resourceMap = Maps.newHashMap();
+ Map<String, LocalResource> resourceMap = Maps.newHashMap();
+ // Always fetch any jars from the appWorkDir for any potential snapshot
jars
addContainerLocalResources(new Path(appWorkDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
- addContainerLocalResources(new Path(containerWorkDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
+ if
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
+ addContainerLocalResources(new Path(containerJarsUnsharedDir,
GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
+ resourceMap);
+ }
+ if (this.jarCacheEnabled) {
+ addContainerLocalResources(new Path(jarCacheDir,
GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
+ if
(this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
Review Comment:
I don't quite understand this key. you're checking it here in two different
conditionals, but in neither one do you actually use (or even check to see)
what value it holds
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir,
Optional<Map<String, LocalResource>>
}
for (FileStatus libJarFile : libJarFiles) {
- Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
- this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
- if (resourceMap.isPresent()) {
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs,
libJarFile, unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent())
{
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ } else {
+ LOGGER.warn("Failed to upload jar file {} to HDFS",
libJarFile.getPath());
}
}
}
-
- private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap,
- Path destDir, FileSystem localFs) throws IOException {
+ private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+ FileSystem localFs) throws IOException {
for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
Path srcFilePath = new Path(jarFilePath);
- Path destFilePath = new Path(destDir, srcFilePath.getName());
- if (localFs.exists(srcFilePath)) {
- this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+ FileStatus localJar = localFs.getFileStatus(srcFilePath);
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs,
localJar, unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+ if (resourceMap.isPresent()) {
+ YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ }
} else {
- LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
- }
- if (resourceMap.isPresent()) {
- YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);
Review Comment:
again, should this be an actual failure, not merely logging?
...or do we believe there are times when it's actually OK to continue?
Issue Time Tracking
-------------------
Worklog Id: (was: 933302)
Time Spent: 20m (was: 10m)
> Cache Yarn jars in GobblinYarnAppLauncher
> -----------------------------------------
>
> Key: GOBBLIN-2135
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2135
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: William Lo
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Gobblin YARN Application Launcher lacks some functionality used in
> MRJobLauncher. One of the biggest gaps in feature parity is the absence of
> jar caching, where MRJobLauncher creates a monthly cache that is
> automatically cleaned up by subsequent executions performed 2 months in
> advance.
> YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15
> mins for a sizeable job to get all the jars), and given that many jobs do
> share the same jars, it makes sense to cache them together and only provide
> YARN the shared path.
> We also want to ensure that SNAPSHOT jars are other files are not uploaded to
> a cache, since they are not immutable unlike jar versions on Artifactory.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)