Repository: incubator-falcon Updated Branches: refs/heads/master 36d01791c -> 2129285dc
FALCON-443 Process with Hive workflow engine and filesystem input feeds, table output feed fails. Contributed by Sowmya Ramesh Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2129285d Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2129285d Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2129285d Branch: refs/heads/master Commit: 2129285dccb720f0f1f405b52474b9df61ee1708 Parents: 36d0179 Author: Venkatesh Seetharam <[email protected]> Authored: Wed May 21 11:24:48 2014 -0700 Committer: Venkatesh Seetharam <[email protected]> Committed: Wed May 21 11:24:48 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 9 +- .../org/apache/falcon/entity/ProcessHelper.java | 14 +- .../workflow/OozieFeedWorkflowBuilder.java | 10 +- .../falcon/workflow/OozieWorkflowBuilder.java | 24 +-- .../workflow/OozieProcessWorkflowBuilder.java | 23 ++- .../OozieProcessWorkflowBuilderTest.java | 171 +++++++++++++++++++ .../config/process/dumb-hive-process.xml | 39 +++++ .../config/process/hive-process-FSInputFeed.xml | 46 +++++ .../process/hive-process-FSOutputFeed.xml | 46 +++++ 9 files changed, 349 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1efb4a4..fccc9b8 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,11 +13,17 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-443 Process with Hive workflow engine and filesystem input feeds, + table output feed fails (Sowmya Ramesh via Venkatesh Seetharam) + FALCON-382 Error While building Latest trunk code with Hadoop 2.2.0. (Suhas Vasu) FALCON-240 Instance status from CLI on a feed doesn't give the retention details. (pavan kumar kolamuri via Shwetha GS) + FALCON-441 Lineage capture fails for feeds with multiple instances + (Venkatesh Seetharam) + Release Version: 0.5-incubating INCOMPATIBLE CHANGES FALCON-11 Add support for security in Falcon (Venkatesh Seetharam) @@ -148,9 +154,6 @@ Release Version: 0.5-incubating FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS) BUG FIXES - FALCON-441 Lineage capture fails for feeds with multiple instances - (Venkatesh Seetharam) - FALCON-440 Exclude IDEA IntelliJ and other unnecessary files from source distribution (Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java index a0a74e4..ece8982 100644 --- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java @@ -24,6 +24,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; /** @@ -49,7 +50,7 @@ public final class ProcessHelper { public static Storage.TYPE getStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster, Process process) throws FalconException { Storage.TYPE storageType = Storage.TYPE.FILESYSTEM; - if (process.getInputs() == null) { + if (process.getInputs() == null && process.getOutputs() == null) { return storageType; } @@ -61,6 +62,17 @@ public final class ProcessHelper { } } + // If input feeds storage type is file system check storage type of output feeds + if (Storage.TYPE.FILESYSTEM == storageType) { + for (Output output : process.getOutputs().getOutputs()) { + Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed()); + storageType = FeedHelper.getStorageType(feed, cluster); + if (Storage.TYPE.TABLE == storageType) { + break; + } + } + } + return storageType; } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java index 16bff02..faaddac 100644 --- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java +++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java @@ -246,7 +246,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> { addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention"); addOozieRetries(retWfApp); - if (isTableStorageType(cluster, entity)) { + if (shouldSetupHiveConfiguration(cluster, entity)) { setupHiveCredentials(cluster, wfPath, retWfApp); } @@ -325,7 +325,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> { addOozieRetries(repWFapp); - if (isTableStorageType(targetCluster, entity)) { + if (shouldSetupHiveConfiguration(targetCluster, entity)) { setupHiveCredentials(targetCluster, sourceCluster, repWFapp); } @@ -719,4 +719,10 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> { } props.put("userWorkflowVersion", version); } + + protected boolean shouldSetupHiveConfiguration(Cluster cluster, + Feed feed) throws FalconException { + Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster); + return Storage.TYPE.TABLE == storageType; + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java index 7616df1..ad1af73 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java @@ -26,16 +26,11 @@ import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.ExternalId; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.ProcessHelper; -import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Property; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.messaging.EntityInstanceMessage.ARG; import org.apache.falcon.oozie.bundle.BUNDLEAPP; @@ -550,21 +545,8 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui } } - private boolean isTableStorageType(Cluster cluster, T entity) throws FalconException { - return entity.getEntityType() == EntityType.PROCESS - ? isTableStorageType(cluster, (Process) entity) - : isTableStorageType(cluster, (Feed) entity); - } - - protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException { - Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster); - return Storage.TYPE.TABLE == storageType; - } - - protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException { - Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process); - return Storage.TYPE.TABLE == storageType; - } + protected abstract boolean shouldSetupHiveConfiguration(Cluster cluster, + T entity) throws FalconException; protected void decorateWithOozieRetries(ACTION action) { Properties props = RuntimeProperties.get(); @@ -588,7 +570,7 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true"); properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib"); - if (isTableStorageType(cluster, entity)) { + if (shouldSetupHiveConfiguration(cluster, entity)) { propagateHiveCredentials(cluster, properties); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java index 3d6bf7b..5089779 100644 --- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java +++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java @@ -603,8 +603,8 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> { throw new FalconException("Failed to add library extensions for the workflow", e); } - final boolean isTableStorageType = isTableStorageType(cluster, process); - if (isTableStorageType) { + final boolean shouldConfigureHive = shouldSetupHiveConfiguration(cluster, process); + if (shouldConfigureHive) { setupHiveCredentials(cluster, parentWfPath, wfApp); } @@ -620,12 +620,12 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> { if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) { action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath); } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) { - decoratePIGAction(cluster, process, action.getPig(), parentWfPath, isTableStorageType); + decoratePIGAction(cluster, process, action.getPig(), parentWfPath, shouldConfigureHive); } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) { decorateHiveAction(cluster, process, action, parentWfPath); } else if (FALCON_ACTIONS.contains(actionName)) { decorateWithOozieRetries(action); - if (isTableStorageType && actionName.equals("recordsize")) { + if (shouldConfigureHive && actionName.equals("recordsize")) { // adds hive-site.xml in actions classpath action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml"); } @@ -636,6 +636,17 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> { marshal(cluster, wfApp, parentWfPath); } + protected boolean shouldSetupHiveConfiguration(Cluster cluster, + Process process) throws FalconException { + return isTableStorageType(cluster, entity) + || EngineType.HIVE == process.getWorkflow().getEngine(); + } + + protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException { + Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process); + return Storage.TYPE.TABLE == storageType; + } + private void setupHiveCredentials(Cluster cluster, Path parentWfPath, WORKFLOWAPP wfApp) throws FalconException { // create hive-site.xml file so actions can use it in the classpath @@ -648,7 +659,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> { } private void decoratePIGAction(Cluster cluster, Process process, PIG pigAction, - Path parentWfPath, boolean isTableStorageType) throws FalconException { + Path parentWfPath, boolean shouldConfigureHive) throws FalconException { Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()); pigAction.setScript("${nameNode}" + userWfPath.toString()); @@ -660,7 +671,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> { propagateProcessProperties(pigAction, process); - if (isTableStorageType) { // adds hive-site.xml in pig classpath + if (shouldConfigureHive) { // adds hive-site.xml in pig classpath pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml"); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java index 54c1809..5f0efe7 100644 --- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java +++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java @@ -317,6 +317,177 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { ConfigurationStore.get().remove(EntityType.PROCESS, process.getName()); } + @Test (dataProvider = "secureOptions") + public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception { + StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption); + + URL resource = this.getClass().getResource("/config/feed/feed-0.1.xml"); + Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); + + resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml"); + Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.FEED, outFeed); + + resource = this.getClass().getResource("/config/process/hive-process-FSInputFeed.xml"); + Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.PROCESS, process); + + prepare(process); + OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process); + Path bundlePath = new Path("/falcon/staging/workflows", process.getName()); + builder.map(cluster, bundlePath); + assertTrue(fs.exists(bundlePath)); + + BUNDLEAPP bundle = getBundle(bundlePath); + assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName()); + assertEquals(1, bundle.getCoordinator().size()); + assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), + bundle.getCoordinator().get(0).getName()); + String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); + + COORDINATORAPP coord = getCoordinator(new Path(coordPath)); + HashMap<String, String> props = new HashMap<String, String>(); + for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { + props.put(prop.getName(), prop.getValue()); + } + + String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); + WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath)); + testParentWorkflow(process, parentWorkflow); + + List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin(); + + ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4); + Assert.assertEquals("user-hive-job", hiveNode.getName()); + + JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode); + org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue(); + + Assert.assertEquals(hiveAction.getScript(), + "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql"); + Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml"); + Assert.assertNull(hiveAction.getPrepare()); + Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive()); + Assert.assertFalse(hiveAction.getParam().isEmpty()); + Assert.assertEquals(7, hiveAction.getParam().size()); + + Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process)); + assertHCatCredentials(parentWorkflow, wfPath); + + ConfigurationStore.get().remove(EntityType.PROCESS, process.getName()); + } + + @Test (dataProvider = "secureOptions") + public void testHiveProcessMapperWithTableInputFeedAndFSOutputFeed(String secureOption) throws Exception { + StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption); + + URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml"); + Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.FEED, inFeed); + + resource = this.getClass().getResource("/config/feed/feed-0.1.xml"); + Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); + + resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml"); + Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.PROCESS, process); + + prepare(process); + OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process); + Path bundlePath = new Path("/falcon/staging/workflows", process.getName()); + builder.map(cluster, bundlePath); + assertTrue(fs.exists(bundlePath)); + + BUNDLEAPP bundle = getBundle(bundlePath); + assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName()); + assertEquals(1, bundle.getCoordinator().size()); + assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), + bundle.getCoordinator().get(0).getName()); + String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); + + COORDINATORAPP coord = getCoordinator(new Path(coordPath)); + HashMap<String, String> props = new HashMap<String, String>(); + for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { + props.put(prop.getName(), prop.getValue()); + } + + String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); + WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath)); + testParentWorkflow(process, parentWorkflow); + + List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin(); + + ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4); + Assert.assertEquals("user-hive-job", hiveNode.getName()); + + JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode); + org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue(); + + Assert.assertEquals(hiveAction.getScript(), + "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql"); + Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml"); + Assert.assertNotNull(hiveAction.getPrepare()); + Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive()); + Assert.assertFalse(hiveAction.getParam().isEmpty()); + Assert.assertEquals(6, hiveAction.getParam().size()); + + Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process)); + assertHCatCredentials(parentWorkflow, wfPath); + + ConfigurationStore.get().remove(EntityType.PROCESS, process.getName()); + } + + @Test (dataProvider = "secureOptions") + public void testHiveProcessWithNoInputsAndOutputs(String secureOption) throws Exception { + StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption); + + URL resource = this.getClass().getResource("/config/process/dumb-hive-process.xml"); + Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.PROCESS, process); + + prepare(process); + OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process); + Path bundlePath = new Path("/falcon/staging/workflows", process.getName()); + builder.map(cluster, bundlePath); + assertTrue(fs.exists(bundlePath)); + + BUNDLEAPP bundle = getBundle(bundlePath); + assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName()); + assertEquals(1, bundle.getCoordinator().size()); + assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), + bundle.getCoordinator().get(0).getName()); + String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); + + COORDINATORAPP coord = getCoordinator(new Path(coordPath)); + HashMap<String, String> props = new HashMap<String, String>(); + for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { + props.put(prop.getName(), prop.getValue()); + } + + String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); + WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath)); + testParentWorkflow(process, parentWorkflow); + + List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin(); + + ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4); + Assert.assertEquals("user-hive-job", hiveNode.getName()); + + JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode); + org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue(); + + Assert.assertEquals(hiveAction.getScript(), + "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql"); + Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml"); + Assert.assertNull(hiveAction.getPrepare()); + Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive()); + Assert.assertTrue(hiveAction.getParam().isEmpty()); + + assertHCatCredentials(parentWorkflow, wfPath); + + ConfigurationStore.get().remove(EntityType.PROCESS, process.getName()); + } + private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException { Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml"); Assert.assertTrue(fs.exists(hiveConfPath)); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/dumb-hive-process.xml ---------------------------------------------------------------------- diff --git a/process/src/test/resources/config/process/dumb-hive-process.xml b/process/src/test/resources/config/process/dumb-hive-process.xml new file mode 100644 index 0000000..c504074 --- /dev/null +++ b/process/src/test/resources/config/process/dumb-hive-process.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<process name="hive-process" xmlns="uri:falcon:process:0.1"> + <!-- where --> + <clusters> + <cluster name="corp"> + <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/> + </cluster> + </clusters> + + <!-- when --> + <parallel>1</parallel> + <order>LIFO</order> + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + + <!-- what = none --> + + <!-- how --> + <workflow engine="hive" path="/apps/hive/script.hql"/> + + <retry policy="periodic" delay="minutes(10)" attempts="3"/> +</process> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/hive-process-FSInputFeed.xml ---------------------------------------------------------------------- diff --git a/process/src/test/resources/config/process/hive-process-FSInputFeed.xml b/process/src/test/resources/config/process/hive-process-FSInputFeed.xml new file mode 100644 index 0000000..d871377 --- /dev/null +++ b/process/src/test/resources/config/process/hive-process-FSInputFeed.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<process name="hive-process" xmlns="uri:falcon:process:0.1"> + <!-- where --> + <clusters> + <cluster name="corp"> + <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/> + </cluster> + </clusters> + + <!-- when --> + <parallel>1</parallel> + <order>LIFO</order> + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + + <!-- what --> + <inputs> + <input name="input" feed="clicks" start="yesterday(0,0)" end="yesterday(20,0)"/> + </inputs> + + <outputs> + <output name="output" feed="clicks-summary-table" instance="today(0,0)"/> + </outputs> + + <!-- how --> + <workflow engine="hive" path="/apps/hive/script.hql"/> + + <retry policy="periodic" delay="minutes(10)" attempts="3"/> +</process> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2129285d/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml ---------------------------------------------------------------------- diff --git a/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml b/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml new file mode 100644 index 0000000..23d96c3 --- /dev/null +++ b/process/src/test/resources/config/process/hive-process-FSOutputFeed.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<process name="hive-process" xmlns="uri:falcon:process:0.1"> + <!-- where --> + <clusters> + <cluster name="corp"> + <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/> + </cluster> + </clusters> + + <!-- when --> + <parallel>1</parallel> + <order>LIFO</order> + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + + <!-- what --> + <inputs> + <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/> + </inputs> + + <outputs> + <output name="output" feed="clicks" instance="today(0,0)"/> + </outputs> + + <!-- how --> + <workflow engine="hive" path="/apps/hive/script.hql"/> + + <retry policy="periodic" delay="minutes(10)" attempts="3"/> +</process>
