This is an automated email from the ASF dual-hosted git repository. dionusos pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push: new 4f4e9ee43 OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos) 4f4e9ee43 is described below commit 4f4e9ee43daf2c8e86b56cabf674a3a4e606007f Author: Denes Bodo <dionu...@apache.org> AuthorDate: Thu Dec 1 15:39:26 2022 +0100 OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos) --- core/src/main/java/org/apache/oozie/ErrorCode.java | 1 + .../apache/oozie/action/hadoop/FsELFunctions.java | 53 +++++++++++ core/src/main/resources/oozie-default.xml | 56 +++++++++++ .../oozie/action/hadoop/TestFsELFunctions.java | 102 +++++++++++++++++---- docs/src/site/markdown/WorkflowFunctionalSpec.md | 23 +++++ release-log.txt | 1 + 6 files changed, 220 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 8f4e21d03..a8053004e 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -158,6 +158,7 @@ public enum ErrorCode { E0756(XLog.STD, "Exception parsing Kill node message [{0}]"), E0757(XLog.STD, "Fork node [{0}] has multiple joins: [{1}]"), E0758(XLog.STD, "Join node [{0}] has multiple forks: [{1}]"), + E0759(XLog.STD, "Could not read the workflow configuration"), E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"), E0801(XLog.STD, "Workflow already running, workflow [{0}]"), diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java b/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java index 0f81d7633..d877e785b 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/FsELFunctions.java @@ -19,6 +19,7 @@ package org.apache.oozie.action.hadoop; import java.io.IOException; +import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; @@ -28,26 +29,78 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.oozie.DagELFunctions; +import org.apache.oozie.ErrorCode; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.Services; import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.util.XConfiguration; /** * EL function for fs action executor. */ public class FsELFunctions { + static final String FS_EL_FUNCTIONS_CONF = "FsELFunctions.conf.fs."; private static FileSystem getFileSystem(URI uri) throws HadoopAccessorException { WorkflowJob workflow = DagELFunctions.getWorkflow(); String user = workflow.getUser(); HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); Configuration conf = has.createConfiguration(uri.getAuthority()); + + extractExtraFsConfiguration(workflow, conf, uri); + return has.createFileSystem(user, uri, conf); } + static void extractExtraFsConfiguration(WorkflowJob workflow, Configuration conf, URI uri) + throws HadoopAccessorException { + if (workflow.getConf() != null) { + try { + readFsConfigFromOozieSite(conf, uri); + readFsConfigFromWorkflow(workflow, conf, uri); + } catch (Exception e) { + throw new HadoopAccessorException(ErrorCode.E0759, e); + } + } + } + + private static void readFsConfigFromOozieSite(Configuration conf, URI uri) { + final String fsElFunctionsConfWithScheme = FS_EL_FUNCTIONS_CONF + uri.getScheme(); + final String customELFsProperties = ConfigurationService.get(fsElFunctionsConfWithScheme); + + for (final String entry : customELFsProperties.split(",")) { + final String[] nameAndValue = entry.trim().split("=", 2); + if (nameAndValue.length < 2) { + continue; + } + putKeyToConfIfAllowed(conf, nameAndValue[0], nameAndValue[1]); + } + } + + private static void readFsConfigFromWorkflow(WorkflowJob workflow, Configuration conf, URI uri) throws Exception { + if (workflow.getConf() == null) { + return; + } + final String FS_EL_FUNCTIONS_CONF_WITH_SCHEME = FS_EL_FUNCTIONS_CONF + uri.getScheme() + "."; + final XConfiguration workflowConf = new XConfiguration(new StringReader(workflow.getConf())); + for (Object _key : workflowConf.toProperties().keySet()) { + String key = (String) _key; + if (!key.startsWith(FS_EL_FUNCTIONS_CONF_WITH_SCHEME)) { + continue; + } + putKeyToConfIfAllowed(conf, key.substring(FS_EL_FUNCTIONS_CONF_WITH_SCHEME.length()), workflowConf.get(key)); + } + } + + private static void putKeyToConfIfAllowed(Configuration conf, String key, String value) { + if (!JavaActionExecutor.DISALLOWED_PROPERTIES.contains(key)) { + conf.set(key, value); + } + } + /** * Get file status. * diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index a11c2e753..01c1095fe 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -826,6 +826,62 @@ will be the requeue interval for the actions which are waiting for a long time w </description> </property> + <!-- Have default empty value for every file system enumerated in oozie.service.HadoopAccessorService.supported.filesystems --> + + <property> + <name>FsELFunctions.conf.fs.hdfs</name> + <value></value> + <description> + You can configure custom hdfs file system properties for EL functions globally. + Value shall be a comma separated list of key=value pairs. + </description> + </property> + + <property> + <name>FsELFunctions.conf.fs.hftp</name> + <value></value> + <description> + You can configure custom hftp file system properties for EL functions globally. + Value shall be a comma separated list of key=value pairs. + </description> + </property> + + <property> + <name>FsELFunctions.conf.fs.abfs</name> + <value></value> + <description> + You can configure custom abfs file system properties for EL functions globally. + Value shall be a comma separated list of key=value pairs. + </description> + </property> + + <property> + <name>FsELFunctions.conf.fs.abfss</name> + <value></value> + <description> + You can configure custom abfss file system properties for EL functions globally. + Value shall be a comma separated list of key=value pairs. + </description> + </property> + + <property> + <name>FsELFunctions.conf.fs.s3</name> + <value></value> + <description> + You can configure custom s3 file system properties for EL functions globally. + Value shall be a comma separated list of key=value pairs. + </description> + </property> + + <property> + <name>FsELFunctions.conf.fs.webhdfs</name> + <value></value> + <description> + You can configure custom webhdfs file system properties for EL functions globally. + Value shall be a comma separated list of key=value pairs. + </description> + </property> + <property> <name>oozie.service.WorkflowAppService.WorkflowDefinitionMaxLength</name> <value>100000</value> diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java index 7b8187e0f..bab5243e6 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java @@ -30,6 +30,7 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.DagELFunctions; import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.workflow.lite.EndNodeDef; import org.apache.oozie.workflow.lite.LiteWorkflowApp; @@ -43,28 +44,33 @@ import org.apache.oozie.util.XConfiguration; public class TestFsELFunctions extends XFsTestCase { + public static final String PASSWORD_FILE_KEY = "hadoop.security.credstore.java-keystore-provider.password-file"; + public static final String CREDENTIAL_PATH_KEY = "hadoop.security.credential.provider.path"; + public static final String HDFS_FILE = "hdfs://path/to/file"; + public static final String JCEKS_FILE = "jceks://path/to/file"; + public static final String PASSWORD_FILE = "password.file"; + public static final String KEY_WE_DONOT_WANT = "key.shall.not.used"; + public static final String HDFS_ELF_FS_CONF_PREFIX = FsELFunctions.FS_EL_FUNCTIONS_CONF + "hdfs"; + private Configuration jobConf; + private Configuration protoConf; + private Configuration conf; + private LiteWorkflowInstance job; + private WorkflowJobBean wf; + private FileSystem fs; + @Override protected void setUp() throws Exception { super.setUp(); new Services().init(); - } - @Override - protected void tearDown() throws Exception { - Services.get().destroy(); - super.tearDown(); - } - - public void testFunctions() throws Exception { String file1 = new Path(getFsTestCaseDir(), "file1").toString(); String file2 = new Path(getFsTestCaseDir(), "file2").toString(); String dir = new Path(getFsTestCaseDir(), "dir").toString(); - Configuration protoConf = new Configuration(); + protoConf = new Configuration(); protoConf.set(OozieClient.USER_NAME, getTestUser()); protoConf.set("hadoop.job.ugi", getTestUser() + "," + "group"); - - FileSystem fs = getFileSystem(); + fs = getFileSystem(); fs.mkdirs(new Path(dir)); fs.create(new Path(file1)).close(); OutputStream os = fs.create(new Path(dir, "a")); @@ -83,7 +89,7 @@ public class TestFsELFunctions extends XFsTestCase { String dirExtraSlashInPath = new URI(dirURI.getScheme(), dirURI.getAuthority(), "/" + dirURI.getPath(), null, null).toString(); - Configuration conf = new XConfiguration(); + conf = new XConfiguration(); conf.set(OozieClient.APP_PATH, "appPath"); conf.set(OozieClient.USER_NAME, getTestUser()); @@ -100,11 +106,11 @@ public class TestFsELFunctions extends XFsTestCase { LiteWorkflowApp def = new LiteWorkflowApp("name", "<workflow-app/>", - new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")). - addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); - LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId"); + new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")). + addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); + job = new LiteWorkflowInstance(def, conf, "wfId"); - WorkflowJobBean wf = new WorkflowJobBean(); + wf = new WorkflowJobBean(); wf.setId(job.getId()); wf.setAppName("name"); wf.setAppPath("appPath"); @@ -114,7 +120,15 @@ public class TestFsELFunctions extends XFsTestCase { ByteArrayOutputStream baos = new ByteArrayOutputStream(); protoConf.writeXml(baos); wf.setProtoActionConf(baos.toString(StandardCharsets.UTF_8.name())); + } + @Override + protected void tearDown() throws Exception { + Services.get().destroy(); + super.tearDown(); + } + + public void testFunctions() throws Exception { WorkflowActionBean action = new WorkflowActionBean(); action.setId("actionId"); action.setName("actionName"); @@ -140,4 +154,60 @@ public class TestFsELFunctions extends XFsTestCase { 3, (int) eval.evaluate("${fs:dirSize(wf:conf('dirExtraSlashInPath'))}", Integer.class)); } + public void testCustomFileSystemPropertiesCanBeSet() throws Exception { + jobConf = new Configuration(); + jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + CREDENTIAL_PATH_KEY, JCEKS_FILE); + jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + PASSWORD_FILE_KEY, PASSWORD_FILE); + jobConf.set("hadoop.irrelevant.configuration", "value"); + jobConf.set("user.name", "Malice"); + jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "user.name", "Malice"); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jobConf.writeXml(baos); + wf.setConf(baos.toString(StandardCharsets.UTF_8.name())); + + Configuration resultFsConf = new Configuration(false); + FsELFunctions.extractExtraFsConfiguration(wf, resultFsConf, fs.getUri()); + assertEquals(JCEKS_FILE, resultFsConf.get(CREDENTIAL_PATH_KEY)); + assertEquals(PASSWORD_FILE, resultFsConf.get(PASSWORD_FILE_KEY)); + assertNull("Irrelevant property shall not be set.", resultFsConf.get("hadoop.irrelevant.configuration")); + assertNull("Disallowed property shall not be set.", resultFsConf.get("user.name")); + + } + + public void testOozieSiteConfigRead() throws Exception { + Configuration cnf = new Configuration(false); + URI uri = new URI(HDFS_FILE); + ConfigurationService.set(HDFS_ELF_FS_CONF_PREFIX, + CREDENTIAL_PATH_KEY + "=" + JCEKS_FILE + "," + PASSWORD_FILE_KEY + "=" + PASSWORD_FILE); + ConfigurationService.set(FsELFunctions.FS_EL_FUNCTIONS_CONF + "hdfsx", KEY_WE_DONOT_WANT + "=value"); + + jobConf = new Configuration(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jobConf.writeXml(baos); + wf.setConf(baos.toString(StandardCharsets.UTF_8.name())); + + FsELFunctions.extractExtraFsConfiguration(wf, cnf, uri); + + assertEquals(JCEKS_FILE, cnf.get(CREDENTIAL_PATH_KEY)); + assertEquals(PASSWORD_FILE, cnf.get(PASSWORD_FILE_KEY)); + assertNull(cnf.get(KEY_WE_DONOT_WANT)); + } + + public void testIfWorkflowConfOverwritesSiteConf() throws Exception { + Configuration cnf = new Configuration(false); + URI uri = new URI(HDFS_FILE); + String KEY_TO_OVERRIDE = CREDENTIAL_PATH_KEY; + ConfigurationService.set(HDFS_ELF_FS_CONF_PREFIX, KEY_TO_OVERRIDE + "=" + JCEKS_FILE); + + jobConf = new Configuration(); + jobConf.set(HDFS_ELF_FS_CONF_PREFIX + "." + KEY_TO_OVERRIDE, "Desired value"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jobConf.writeXml(baos); + wf.setConf(baos.toString(StandardCharsets.UTF_8.name())); + + FsELFunctions.extractExtraFsConfiguration(wf, cnf, uri); + + assertEquals("Desired value", cnf.get(CREDENTIAL_PATH_KEY)); + } } diff --git a/docs/src/site/markdown/WorkflowFunctionalSpec.md b/docs/src/site/markdown/WorkflowFunctionalSpec.md index 136f1d77c..3e63d662e 100644 --- a/docs/src/site/markdown/WorkflowFunctionalSpec.md +++ b/docs/src/site/markdown/WorkflowFunctionalSpec.md @@ -2288,6 +2288,29 @@ It returns the size in bytes of specified file. If the path is not a file, or if It returns the block size in bytes of specified file. If the path is not a file, or if it does not exist it returns -1. +#### 4.2.7.1 File system configuration for EL functions + +There is a use case when you need to access special file systems to check data availability and you need to have +file system principal and key file set. You can have them configured globally in oozie-site.xml or per workflow run +through job.properties file. + +Example of global configuration to have all the abfss:// access via EL functions configured: +``` +<property> + <name>FsELFunctions.conf.fs.abfss</name> + <value> + hadoop.security.credstore.java-keystore-provider.password-file=password_file_name, + hadoop.security.credential.provider.path=jceks:///path/to/file + </value> + </property> +``` + +Example of per workflow configuration via job.properties: +``` +FsELFunctions.conf.fs.abfss.hadoop.security.credstore.java-keystore-provider.password-file=password_file_name, +FsELFunctions.conf.fs.abfss.hadoop.security.credential.provider.path=jceks:///path/to/file +``` + #### 4.2.8 HCatalog EL Functions For all the functions in this section the URI must be a hcatalog URI identifying a table or set of partitions in a table. diff --git a/release-log.txt b/release-log.txt index 8c658ef7b..372837af3 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.3.0 release (trunk - unreleased) +OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos) OOZIE-3677 Oozie should accept a keyStoreType and trustStoreType property in oozie-site.xml (jmakai via dionusos) OOZIE-3678 Reduce the number of NameNode access when starting the Yarn job (jmakai via dionusos) OOZIE-3670 Actions can stuck while running in a Fork-Join workflow (jmakai via dionusos)