khandelwal-prateek commented on code in PR #4160:
URL: https://github.com/apache/gobblin/pull/4160#discussion_r2624268164
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -999,6 +1000,88 @@ private static Config addDynamicConfig(Config config)
throws IOException {
}
}
+ /**
+ * Configures jar caching by validating root directories and setting
JAR_CACHE_DIR.
+ *
+ * <p>This method validates that the configured jar cache root directory
exists on the filesystem
+ * before enabling jar caching. It follows this logic:</p>
+ * <ol>
+ * <li>Check if jar caching is enabled, if not return config as-is</li>
+ * <li>Read JAR_CACHE_ROOT_DIR and JAR_CACHE_SUFFIX from config</li>
+ * <li>Check if JAR_CACHE_ROOT_DIR exists on filesystem (e.g.,
/user/${user.to.proxy})</li>
+ * <li>If it exists: Set JAR_CACHE_DIR = JAR_CACHE_ROOT_DIR +
JAR_CACHE_SUFFIX</li>
+ * <li>If not: Try FALLBACK_JAR_CACHE_ROOT_DIR with same logic</li>
+ * <li>If neither exists: Disable jar caching by setting JAR_CACHE_ENABLED
to false</li>
+ * </ol>
+ *
+ * <p>This ensures that the base user directory exists before attempting to
cache jars in nested
+ * subdirectories, preventing runtime failures from misconfigured paths.</p>
+ *
+ * @param config the application configuration
+ * @param fs the filesystem to use for validation
+ * @return updated config with JAR_CACHE_DIR set or JAR_CACHE_ENABLED
disabled
+ * @throws IOException if filesystem operations fail
+ */
+ private static Config addJarCachingConfig(Config config, FileSystem fs)
throws IOException {
+ // Skip validation if jar caching is not enabled
+ boolean jarCachingEnabled = ConfigUtils.getBoolean(config,
+ GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
+ GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+ if (!jarCachingEnabled) {
+ LOGGER.info("Jar caching is not enabled, skipping jar cache directory
validation");
+ return config;
+ }
+
+ String suffix = ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, "");
+
+ // Try primary root directory
+ if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) {
+ String rootDir =
config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+ Config result = validateAndSetJarCacheDir(config, fs, rootDir, suffix,
GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+ if (result != null) {
+ return result;
+ }
+ }
+
+ // Try fallback root directory
+ if
(config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) {
+ String fallbackRootDir =
config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+ Config result = validateAndSetJarCacheDir(config, fs, fallbackRootDir,
suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+ if (result != null) {
+ return result;
+ }
+ }
+
+ // Neither root directory exists, disable jar caching
+ LOGGER.warn("No valid jar cache root directory found, disabling jar
caching");
+ return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
+ ConfigValueFactory.fromAnyRef(false));
+ }
+
+ /**
+ * Validates if the root directory exists and sets JAR_CACHE_DIR if it does.
+ *
+ * @param config the configuration
+ * @param fs the filesystem to check
+ * @param rootDir the root directory to validate
+ * @param suffix the suffix to append to root directory
+ * @param configName the config name for logging
+ * @return updated config if valid, null otherwise
+ */
+ private static Config validateAndSetJarCacheDir(Config config, FileSystem
fs, String rootDir,
+ String suffix, String configName) throws IOException {
+ Path rootPath = new Path(rootDir);
+ if (fs.exists(rootPath)) {
+ String fullPath = new Path(rootPath, suffix).toString();
Review Comment:
it would be good to check that suffix is always present(non-empty), else it
will write jars directly to root directory
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -999,6 +1000,88 @@ private static Config addDynamicConfig(Config config)
throws IOException {
}
}
+ /**
+ * Configures jar caching by validating root directories and setting
JAR_CACHE_DIR.
+ *
+ * <p>This method validates that the configured jar cache root directory
exists on the filesystem
+ * before enabling jar caching. It follows this logic:</p>
+ * <ol>
+ * <li>Check if jar caching is enabled, if not return config as-is</li>
+ * <li>Read JAR_CACHE_ROOT_DIR and JAR_CACHE_SUFFIX from config</li>
+ * <li>Check if JAR_CACHE_ROOT_DIR exists on filesystem (e.g.,
/user/${user.to.proxy})</li>
+ * <li>If it exists: Set JAR_CACHE_DIR = JAR_CACHE_ROOT_DIR +
JAR_CACHE_SUFFIX</li>
+ * <li>If not: Try FALLBACK_JAR_CACHE_ROOT_DIR with same logic</li>
+ * <li>If neither exists: Disable jar caching by setting JAR_CACHE_ENABLED
to false</li>
+ * </ol>
+ *
+ * <p>This ensures that the base user directory exists before attempting to
cache jars in nested
+ * subdirectories, preventing runtime failures from misconfigured paths.</p>
+ *
+ * @param config the application configuration
+ * @param fs the filesystem to use for validation
+ * @return updated config with JAR_CACHE_DIR set or JAR_CACHE_ENABLED
disabled
+ * @throws IOException if filesystem operations fail
+ */
+ private static Config addJarCachingConfig(Config config, FileSystem fs)
throws IOException {
+ // Skip validation if jar caching is not enabled
+ boolean jarCachingEnabled = ConfigUtils.getBoolean(config,
+ GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
+ GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+ if (!jarCachingEnabled) {
+ LOGGER.info("Jar caching is not enabled, skipping jar cache directory
validation");
+ return config;
+ }
+
+ String suffix = ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.JAR_CACHE_SUFFIX, "");
+
+ // Try primary root directory
+ if (config.hasPath(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR)) {
+ String rootDir =
config.getString(GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+ Config result = validateAndSetJarCacheDir(config, fs, rootDir, suffix,
GobblinYarnConfigurationKeys.JAR_CACHE_ROOT_DIR);
+ if (result != null) {
+ return result;
+ }
+ }
+
+ // Try fallback root directory
+ if
(config.hasPath(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR)) {
+ String fallbackRootDir =
config.getString(GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+ Config result = validateAndSetJarCacheDir(config, fs, fallbackRootDir,
suffix, GobblinYarnConfigurationKeys.FALLBACK_JAR_CACHE_ROOT_DIR);
+ if (result != null) {
+ return result;
+ }
+ }
+
+ // Neither root directory exists, disable jar caching
+ LOGGER.warn("No valid jar cache root directory found, disabling jar
caching");
+ return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
+ ConfigValueFactory.fromAnyRef(false));
+ }
+
+ /**
+ * Validates if the root directory exists and sets JAR_CACHE_DIR if it does.
+ *
+ * @param config the configuration
+ * @param fs the filesystem to check
+ * @param rootDir the root directory to validate
+ * @param suffix the suffix to append to root directory
+ * @param configName the config name for logging
+ * @return updated config if valid, null otherwise
+ */
+ private static Config validateAndSetJarCacheDir(Config config, FileSystem
fs, String rootDir,
+ String suffix, String configName) throws IOException {
+ Path rootPath = new Path(rootDir);
+ if (fs.exists(rootPath)) {
+ String fullPath = new Path(rootPath, suffix).toString();
+ LOGGER.info("{} exists: {}, setting JAR_CACHE_DIR to: {}", configName,
rootDir, fullPath);
+ return config.withValue(GobblinYarnConfigurationKeys.JAR_CACHE_DIR,
+ ConfigValueFactory.fromAnyRef(fullPath));
Review Comment:
computing `JAR_CACHE_DIR` from filesystem state at config load time makes it
hard to debug. We can introduce a `JarCachePathResolver` instead that computes
the resolved cache dir, when needed.
If that's too big a change, for now, let's check that `JAR_CACHE_DIR` is not
already explicitly configured(if it is, use it as is) to avoid overriding an
existing config.
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java:
##########
@@ -55,6 +55,13 @@ public class GobblinYarnConfigurationKeys {
public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX +
"jar.cache.dir";
+ public static final String JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX +
"jar.cache.root.dir";
+
+ public static final String FALLBACK_JAR_CACHE_ROOT_DIR = GOBBLIN_YARN_PREFIX
+ "jar.cache.fallback.root.dir";
+
+ public static final String JAR_CACHE_SUFFIX = GOBBLIN_YARN_PREFIX +
"jar.cache.suffix";
+
+
Review Comment:
nit: remove additional blank line
##########
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java:
##########
@@ -514,6 +514,147 @@ public String answer(InvocationOnMock invocation) {
Assert.assertFalse(config.hasPath(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID));
}
+ @Test
+ public void testAddJarCachingConfig_JarCachingDisabled() throws Exception {
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
ConfigValueFactory.fromAnyRef(false));
+
+ Config result = GobblinYarnAppLauncher.addJarCachingConfig(config, mockFs);
Review Comment:
`addJarCachingConfig` method is defined as private in
GobblinYarnAppLauncher, this method call would not work, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]