Repository: incubator-falcon Updated Branches: refs/heads/master 2129285dc -> 3d3bde558
FALCON-444 Logs dir for replication workflow is incorrect and jobs fail with permission issues. Contributed by Venkatesh Seetharam Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b7afe36f Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b7afe36f Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b7afe36f Branch: refs/heads/master Commit: b7afe36f494091e283702c28af5f42ef482226a0 Parents: 2129285 Author: Venkatesh Seetharam <[email protected]> Authored: Fri May 23 11:13:44 2014 -0700 Committer: Venkatesh Seetharam <[email protected]> Committed: Fri May 23 11:13:44 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../workflow/OozieFeedWorkflowBuilder.java | 8 +- .../converter/OozieFeedWorkflowBuilderTest.java | 10 +++ .../falcon/workflow/OozieWorkflowBuilder.java | 93 +++++++++++++------- .../workflow/OozieProcessWorkflowBuilder.java | 2 +- .../OozieProcessWorkflowBuilderTest.java | 20 +++-- .../apache/falcon/latedata/LateDataHandler.java | 2 +- 7 files changed, 91 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fccc9b8..0e26c7d 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-444 Logs dir for replication workflow is incorrect and jobs fail + with permission issues (Venkatesh Seetharam) + FALCON-443 Process with Hive workflow engine and filesystem input feeds, table output feed fails (Sowmya Ramesh via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java index faaddac..6d36840 100644 --- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java +++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java @@ -146,11 +146,11 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> { for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) { if (feedCluster.getType() == ClusterType.SOURCE) { String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString(); - Path basePath = getCoordPath(bundlePath, coordName); + Path coordPath = getCoordPath(bundlePath, coordName); Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()); // workflow is serialized to a specific dir - Path sourceSpecificWfPath = new Path(basePath, srcCluster.getName()); + Path sourceSpecificWfPath = new Path(coordPath, srcCluster.getName()); // Different workflow for each source since hive credentials vary for each cluster replicationMapper.createReplicationWorkflow( @@ -211,7 +211,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> { createRetentionWorkflow(cluster, wfPath, wfName); retentionWorkflow.setAppPath(getStoragePath(wfPath.toString())); - Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName); + Map<String, String> props = createCoordDefaultConfiguration(cluster, wfName); props.put("timeZone", entity.getTimezone().getID()); props.put("frequency", entity.getFrequency().getTimeUnit().name()); @@ -539,7 +539,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> { WORKFLOW replicationWF = new WORKFLOW(); replicationWF.setAppPath(getStoragePath(wfPath.toString())); - Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName); + Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfName); props.put("srcClusterName", srcCluster.getName()); props.put("srcClusterColo", srcCluster.getColo()); if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java index 7cb055a..64b38ff 100644 --- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java +++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java @@ -211,6 +211,7 @@ public class OozieFeedWorkflowBuilderTest { Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}"); Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions); Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name()); + Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, feed)); // verify the post processing params Assert.assertEquals(props.get("feedNames"), feed.getName()); @@ -359,6 +360,7 @@ public class OozieFeedWorkflowBuilderTest { Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name()); Assert.assertEquals(props.get("maxMaps"), "33"); Assert.assertEquals(props.get("mapBandwidthKB"), "2048"); + Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed)); } public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) { @@ -472,6 +474,7 @@ public class OozieFeedWorkflowBuilderTest { Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName()); Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}"); Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name()); + Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed)); // verify the post processing params Assert.assertEquals(props.get("feedNames"), tableFeed.getName()); @@ -575,6 +578,7 @@ public class OozieFeedWorkflowBuilderTest { // verify the post processing params Assert.assertEquals(props.get("feedNames"), feed.getName()); Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE"); + Assert.assertEquals(props.get("logDir"), getLogPath(srcCluster, feed)); assertWorkflowRetries(coord); } @@ -620,6 +624,7 @@ public class OozieFeedWorkflowBuilderTest { // verify the post processing params Assert.assertEquals(props.get("feedNames"), tableFeed.getName()); Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE"); + Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed)); assertWorkflowRetries(coord); @@ -655,4 +660,9 @@ public class OozieFeedWorkflowBuilderTest { } } } + + private String getLogPath(Cluster aCluster, Feed aFeed) { + Path logPath = EntityUtil.getLogPath(aCluster, aFeed); + return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath; + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java index ad1af73..f5ff27a 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java @@ -147,20 +147,21 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui for (COORDINATORAPP coordinatorapp : coordinators) { Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName()); - String coordXmlName = marshal(cluster, coordinatorapp, coordPath, - EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity)); + marshal(cluster, coordinatorapp, coordPath); - createLogsDir(cluster, coordPath); //create logs dir // copy falcon libs to the workflow dir copySharedLibs(cluster, coordinatorapp); // add the coordinator to the bundle COORDINATOR bundleCoord = new COORDINATOR(); bundleCoord.setName(coordinatorapp.getName()); - bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName); + bundleCoord.setAppPath(getStoragePath(coordPath) + "/coordinator.xml"); bundleApp.getCoordinator().add(bundleCoord); } + // create logs dir once since its at the root of the bundle path + createLogsDir(cluster); + marshal(cluster, bundleApp, bundlePath); // write the bundle return true; } @@ -237,28 +238,54 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui return conf; } - protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) { + protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, String coordName) { Map<String, String> props = new HashMap<String, String>(); props.put(ARG.entityName.getPropName(), entity.getName()); + props.put(ARG.entityType.getPropName(), entity.getEntityType().name()); props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL); props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL); + + addBrokerProperties(cluster, props); + + props.put(OozieClient.EXTERNAL_ID, + new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity), + "${coord:nominalTime()}").getId()); + props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster)); + + addLateDataProperties(props); + + addClusterProperties(cluster, props); + + props.put(MR_QUEUE_NAME, "default"); + props.put(MR_JOB_PRIORITY, "NORMAL"); + + //props in entity override the set props. + props.putAll(getEntityProperties()); + + // this cannot be overridden + props.put("logDir", getStoragePath(EntityUtil.getLogPath(cluster, entity))); + + return props; + } + + private void addBrokerProperties(Cluster cluster, Map<String, String> props) { props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster)); props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster)); - String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(), - "tcp://localhost:61616?daemon=true"); + + String falconBrokerUrl = StartupProperties.get().getProperty( + ARG.brokerUrl.getPropName(), "tcp://localhost:61616?daemon=true"); props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl); - String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(), - ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); + + String falconBrokerImplClass = StartupProperties.get().getProperty( + ARG.brokerImplClass.getPropName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass); - String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins", - DEFAULT_BROKER_MSG_TTL.toString()); + + String jmsMessageTTL = StartupProperties.get().getProperty( + "broker.ttlInMins", DEFAULT_BROKER_MSG_TTL.toString()); props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL); - props.put(ARG.entityType.getPropName(), entity.getEntityType().name()); - props.put("logDir", getStoragePath(new Path(coordPath, "../../logs"))); - props.put(OozieClient.EXTERNAL_ID, - new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity), - "${coord:nominalTime()}").getId()); - props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster)); + } + + private void addLateDataProperties(Map<String, String> props) { try { if (EntityUtil.getLateProcess(entity) == null || EntityUtil.getLateProcess(entity).getLateInputs() == null @@ -271,20 +298,16 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui LOG.error("Unable to get Late Process for entity: {}", entity, e); throw new FalconRuntimException(e); } - props.put("entityName", entity.getName()); - props.put("entityType", entity.getEntityType().name().toLowerCase()); + } + + private void addClusterProperties(Cluster cluster, Map<String, String> props) { props.put(ARG.cluster.getPropName(), cluster.getName()); + if (cluster.getProperties() != null) { for (Property prop : cluster.getProperties().getProperties()) { props.put(prop.getName(), prop.getValue()); } } - - props.put(MR_QUEUE_NAME, "default"); - props.put(MR_JOB_PRIORITY, "NORMAL"); - //props in entity override the set props. - props.putAll(getEntityProperties()); - return props; } protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name, @@ -322,27 +345,29 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui } } - private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException { + private void createLogsDir(Cluster cluster) throws FalconException { + Path logsDir = EntityUtil.getLogPath(cluster, entity); try { FileSystem fs = HadoopClientFactory.get().createFileSystem( - coordPath.toUri(), ClusterHelper.getConfiguration(cluster)); - Path logsDir = new Path(coordPath, "../../logs"); + ClusterHelper.getConfiguration(cluster)); + if (fs.exists(logsDir)) { + return; + } + fs.mkdirs(logsDir); // logs are copied with in oozie as the user in Post Processing and hence 777 permissions FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); fs.setPermission(logsDir, permission); } catch (Exception e) { - throw new FalconException("Unable to create temp dir in " + coordPath, e); + throw new FalconException("Unable to create logs dir at: " + logsDir, e); } } - protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, - String name) throws FalconException { - name = (StringUtils.isEmpty(name) ? "coordinator" : name) + ".xml"; + protected void marshal(Cluster cluster, COORDINATORAPP coord, + Path outPath) throws FalconException { marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), - OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, name)); - return name; + OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml")); } protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java index 5089779..70aeebd 100644 --- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java +++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java @@ -280,7 +280,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> { coord.setControls(controls); // Configuration - Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName); + Map<String, String> props = createCoordDefaultConfiguration(cluster, coordName); initializeInputPaths(cluster, entity, coord, props); // inputs initializeOutputPaths(cluster, entity, coord, props); // outputs http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java index 5f0efe7..8cfa9fc 100644 --- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java +++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java @@ -186,10 +186,16 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { props.put(prop.getName(), prop.getValue()); } assertEquals(props.get("mapred.job.priority"), "LOW"); + Assert.assertEquals(props.get("logDir"), getLogPath(process)); assertLibExtensions(coord); } + private String getLogPath(Process process) { + Path logPath = EntityUtil.getLogPath(cluster, process); + return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath; + } + @Test public void testBundle() throws Exception { String path = StartupProperties.get().getProperty("system.lib.location"); @@ -290,6 +296,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(entry.getValue(), expected.get(entry.getKey())); } } + Assert.assertEquals(props.get("logDir"), getLogPath(process)); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath)); @@ -321,10 +328,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception { StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption); - URL resource = this.getClass().getResource("/config/feed/feed-0.1.xml"); - Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); - - resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml"); + URL resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml"); Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); ConfigurationStore.get().publish(EntityType.FEED, outFeed); @@ -350,6 +354,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { props.put(prop.getName(), prop.getValue()); } + Assert.assertEquals(props.get("logDir"), getLogPath(process)); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath)); @@ -385,9 +390,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); ConfigurationStore.get().publish(EntityType.FEED, inFeed); - resource = this.getClass().getResource("/config/feed/feed-0.1.xml"); - Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); - resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml"); Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource); ConfigurationStore.get().publish(EntityType.PROCESS, process); @@ -410,6 +412,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { props.put(prop.getName(), prop.getValue()); } + Assert.assertEquals(props.get("logDir"), getLogPath(process)); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath)); @@ -463,6 +466,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { props.put(prop.getName(), prop.getValue()); } + Assert.assertEquals(props.get("logDir"), getLogPath(process)); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath)); @@ -566,6 +570,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(entry.getValue(), expected.get(entry.getKey())); } } + Assert.assertEquals(props.get("logDir"), getLogPath(process)); // verify the late data params Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed()); @@ -773,6 +778,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { props.put(prop.getName(), prop.getValue()); } + Assert.assertEquals(props.get("logDir"), getLogPath(processEntity)); String[] expected = { EntityInstanceMessage.ARG.feedNames.getPropName(), http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java index bcbdb08..75de12e 100644 --- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java +++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java @@ -99,9 +99,9 @@ public class LateDataHandler extends Configured implements Tool { String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#"); Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes); - LOG.info("MAP data: {}", metrics); Path file = new Path(command.getOptionValue("out")); + LOG.info("Persisting late data metrics: {} to file: {}", metrics, file); persistMetrics(metrics, file); return 0;
