Repository: incubator-falcon Updated Branches: refs/heads/master 3d3bde558 -> 91704c6ae
FALCON-446 Hive Replications fail because of permissions issue. Contributed by Venkatesh Seetharam Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5ac7544b Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5ac7544b Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5ac7544b Branch: refs/heads/master Commit: 5ac7544bcb0d9e578d5ddc78de9382e5057220af Parents: 3d3bde5 Author: Venkatesh Seetharam <[email protected]> Authored: Fri May 30 21:38:39 2014 -0700 Committer: Venkatesh Seetharam <[email protected]> Committed: Fri May 30 21:38:39 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../falcon/cleanup/AbstractCleanupHandler.java | 17 ++++-- .../falcon/cleanup/FeedCleanupHandler.java | 60 +++----------------- .../falcon/cleanup/ProcessCleanupHandler.java | 23 +++----- .../org/apache/falcon/entity/FeedHelper.java | 5 +- .../config/workflow/replication-workflow.xml | 8 +++ .../converter/OozieFeedWorkflowBuilderTest.java | 5 +- 7 files changed, 44 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e06fc80..d851109 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-446 Hive Replications fail because of permissions issue + (Venkatesh Seetharam) + FALCON-444 Logs dir for replication workflow is incorrect and jobs fail with permission issues (Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java index ab85ae0..846d48a 100644 --- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java +++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java @@ -20,6 +20,7 @@ package org.apache.falcon.cleanup; import org.apache.commons.el.ExpressionEvaluatorImpl; import org.apache.falcon.FalconException; import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.Frequency; @@ -69,21 +70,25 @@ public abstract class AbstractCleanupHandler { "log.cleanup.frequency." + timeunit + ".retention", "days(1)"); } - protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) - throws FalconException { - - String stagingPath = ClusterHelper.getLocation(cluster, "staging"); - Path logPath = getLogPath(entity, stagingPath); + protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster, + Entity entity) throws FalconException { FileSystem fs = getFileSystem(cluster); FileStatus[] paths; try { + Path logPath = getLogPath(cluster, entity); paths = fs.globStatus(logPath); } catch (IOException e) { throw new FalconException(e); } + return paths; } + private Path getLogPath(Cluster cluster, Entity entity) { + // logsPath = base log path + relative path + return new Path(EntityUtil.getLogPath(cluster, entity), getRelativeLogPath()); + } + protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { @@ -139,7 +144,7 @@ public abstract class AbstractCleanupHandler { public abstract void cleanup() throws FalconException; - protected abstract Path getLogPath(Entity entity, String stagingPath); + protected abstract String getRelativeLogPath(); protected String getCurrentColo() { return StartupProperties.get().getProperty("current.colo", "default"); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java index 452ab02..5d4ecd9 100644 --- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java +++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java @@ -18,19 +18,10 @@ package org.apache.falcon.cleanup; import org.apache.falcon.FalconException; -import org.apache.falcon.Tag; -import org.apache.falcon.entity.CatalogStorage; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import java.io.IOException; import java.util.Collection; /** @@ -42,19 +33,15 @@ public class FeedCleanupHandler extends AbstractCleanupHandler { public void cleanup() throws FalconException { Collection<String> feeds = STORE.getEntities(EntityType.FEED); for (String feedName : feeds) { - Feed feed; - feed = STORE.get(EntityType.FEED, feedName); - long retention = getRetention(feed, feed.getFrequency() - .getTimeUnit()); - for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed - .getClusters().getClusters()) { - Cluster currentCluster = STORE.get(EntityType.CLUSTER, - cluster.getName()); + Feed feed = STORE.get(EntityType.FEED, feedName); + long retention = getRetention(feed, feed.getFrequency().getTimeUnit()); + + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName()); if (currentCluster.getColo().equals(getCurrentColo())) { - LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}", feedName, - cluster.getName(), retention); + LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}", + feedName, cluster.getName(), retention); delete(currentCluster, feed, retention); - deleteStagedData(currentCluster, feed, retention); } else { LOG.info("Ignoring cleanup for feed: {} in cluster: {} as this does not belong to current colo", feedName, cluster.getName()); @@ -64,37 +51,8 @@ public class FeedCleanupHandler extends AbstractCleanupHandler { } } - /** - * Delete the staging area used for replicating tables. - * - * @param cluster cluster hosting the staged data - * @param feed feed entity - * @param retention retention limit - * @throws FalconException - */ - private void deleteStagedData(Cluster cluster, Feed feed, long retention) - throws FalconException { - Storage storage = FeedHelper.createStorage(cluster, feed); - if (storage.getType() == Storage.TYPE.FILESYSTEM) { // FS does NOT use staging dirs - return; - } - - final CatalogStorage tableStorage = (CatalogStorage) storage; - String stagingDir = FeedHelper.getStagingDir(cluster, feed, tableStorage, Tag.REPLICATION); - //stagingDir/dataOutPartitionValue/nominal-time/clusterName/data - Path stagingPath = new Path(stagingDir + "/*/*/*/*"); - FileSystem fs = getFileSystem(cluster); - try { - FileStatus[] paths = fs.globStatus(stagingPath); - delete(cluster, feed, retention, paths); - } catch (IOException e) { - throw new FalconException(e); - } - } - @Override - protected Path getLogPath(Entity entity, String stagingPath) { - return new Path(stagingPath, "falcon/workflows/feed/" - + entity.getName() + "/logs/job-*/*/*"); + protected String getRelativeLogPath() { + return "job-*/*/*"; } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java index e6ce72f..4eb9162 100644 --- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java +++ b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java @@ -18,11 +18,9 @@ package org.apache.falcon.cleanup; import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.process.Process; -import org.apache.hadoop.fs.Path; import java.util.Collection; @@ -35,14 +33,11 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler { public void cleanup() throws FalconException { Collection<String> processes = STORE.getEntities(EntityType.PROCESS); for (String processName : processes) { - Process process; - process = STORE.get(EntityType.PROCESS, processName); - long retention = getRetention(process, process.getFrequency() - .getTimeUnit()); - for (org.apache.falcon.entity.v0.process.Cluster cluster : process - .getClusters().getClusters()) { - Cluster currentCluster = STORE.get(EntityType.CLUSTER, - cluster.getName()); + Process process = STORE.get(EntityType.PROCESS, processName); + long retention = getRetention(process, process.getFrequency().getTimeUnit()); + + for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters() .getClusters()) { + Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName()); if (currentCluster.getColo().equals(getCurrentColo())) { LOG.info("Cleaning up logs for process: {} in cluster: {} with retention: {}", processName, cluster.getName(), retention); @@ -52,15 +47,11 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler { processName, cluster.getName()); } } - } } @Override - protected Path getLogPath(Entity entity, String stagingPath) { - Path logPath = new Path(stagingPath, "falcon/workflows/process/" - + entity.getName() + "/logs/job-*/*"); - return logPath; + protected String getRelativeLogPath() { + return "job-*/*"; } - } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 9b66363..44d8d01 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -266,10 +266,11 @@ public final class FeedHelper { String workflowName = EntityUtil.getWorkflowName( tag, Arrays.asList(clusterEntity.getName()), feed).toString(); - return ClusterHelper.getCompleteLocation(clusterEntity, "staging") + "/" + // log path is created at scheduling wf and has 777 perms + return ClusterHelper.getStorageUrl(clusterEntity) + + EntityUtil.getLogPath(clusterEntity, feed) + "/" + workflowName + "/" + storage.getDatabase() + "/" + storage.getTable(); } - } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/feed/src/main/resources/config/workflow/replication-workflow.xml ---------------------------------------------------------------------- diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml index 6f94dd7..a7acfe1 100644 --- a/feed/src/main/resources/config/workflow/replication-workflow.xml +++ b/feed/src/main/resources/config/workflow/replication-workflow.xml @@ -158,6 +158,14 @@ <param>falconTargetPartition=${falconTargetPartition}</param> <param>falconTargetStagingDir=${distcpTargetPaths}</param> </hive> + <ok to="cleanup-table-staging-dir"/> + <error to="failed-post-processing"/> + </action> + <action name="cleanup-table-staging-dir"> + <fs> + <delete path="${distcpSourcePaths}"/> + <delete path="${distcpTargetPaths}"/> + </fs> <ok to="succeeded-post-processing"/> <error to="failed-post-processing"/> </action> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java index 64b38ff..5d6879a 100644 --- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java +++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java @@ -374,8 +374,9 @@ public class OozieFeedWorkflowBuilderTest { Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName()); Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName()); Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName()); - Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName()); - Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName()); + Assert.assertEquals("cleanup-table-staging-dir", ((ACTION) decisionOrForkOrJoin.get(7)).getName()); + Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName()); + Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(9)).getName()); } @DataProvider(name = "secureOptions")
