Author: daijy Date: Wed Jan 21 06:09:06 2015 New Revision: 1653445 URL: http://svn.apache.org/r1653445 Log: PIG-4359: Port local mode tests to Tez - part4
Added: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java pig/trunk/test/org/apache/pig/test/TestStoreBase.java pig/trunk/test/org/apache/pig/test/TestStoreLocal.java pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java pig/trunk/test/org/apache/pig/tez/TezUtil.java Modified: pig/trunk/CHANGES.txt pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java pig/trunk/test/excluded-tests-20 pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java pig/trunk/test/org/apache/pig/test/TestStore.java pig/trunk/test/tez-local-tests Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Jan 21 06:09:06 2015 @@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multi BUG FIXES +PIG-4359: Port local mode tests to Tez - part4 (daijy) + PIG-4340: PigStorage fails parsing empty map (daijy) PIG-4366: Port local mode tests to Tez - part5 (daijy) Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (original) +++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Wed Jan 21 06:09:06 2015 @@ -27,7 +27,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.Launcher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; /** * This class builds a single instance of itself with the Singleton @@ -128,4 +130,8 @@ public class MiniCluster extends MiniGen if (m_mr != null) { m_mr.stop(); } m_mr = null; } + + static public Launcher getLauncher() { + return new MapReduceLauncher(); + } } Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original) +++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Wed Jan 21 06:09:06 2015 @@ -33,7 +33,9 @@ import org.apache.hadoop.mapreduce.v2.Mi import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.hadoop.executionengine.Launcher; import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType; +import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -185,4 +187,8 @@ public class TezMiniCluster extends Mini YARN_CONF_FILE.delete(); } } + + static public Launcher getLauncher() { + return new TezLauncher(); + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Wed Jan 21 06:09:06 2015 @@ -368,7 +368,7 @@ public class TezCompiler extends PhyPlan storeOnlyPhyPlan.addAsLeaf(store); storeOnlyTezOperator.plan = storeOnlyPhyPlan; tezPlan.add(storeOnlyTezOperator); - phyToTezOpMap.put(store, storeOnlyTezOperator); + phyToTezOpMap.put(p, storeOnlyTezOperator); // Create new operator as second splittee curTezOp = getTezOp(); Modified: pig/trunk/test/excluded-tests-20 URL: http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-20?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/test/excluded-tests-20 (original) +++ pig/trunk/test/excluded-tests-20 Wed Jan 21 06:09:06 2015 @@ -7,3 +7,4 @@ **/TestGroupConstParallelTez.java **/TestLoaderStorerShipCacheFilesTez.java **/TestPigStatsTez.java +**/TestPOPartialAggPlanTez.java Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original) +++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Wed Jan 21 06:09:06 2015 @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.pig.ExecType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.Launcher; /** * This class builds a single instance of itself with the Singleton @@ -146,4 +147,18 @@ abstract public class MiniGenericCluster String msg = "function called on MiniCluster that has been shutdown"; throw new RuntimeException(msg); } + + static public Launcher getLauncher() { + String execType = System.getProperty("test.exec.type"); + if (execType == null) { + System.setProperty("test.exec.type", EXECTYPE_MR); + } + if (execType.equalsIgnoreCase(EXECTYPE_MR)) { + return MiniCluster.getLauncher(); + } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) { + return TezMiniCluster.getLauncher(); + } else { + throw new RuntimeException("Unknown test.exec.type: " + execType); + } + } } Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original) +++ pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Wed Jan 21 06:09:06 2015 @@ -19,8 +19,8 @@ package org.apache.pig.test; import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.StringReader; import java.util.ArrayList; import java.util.Collections; @@ -29,17 +29,19 @@ import java.util.Properties; import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigServer; +import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; +import org.apache.pig.backend.hadoop.executionengine.Launcher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.builtin.PigStorage; @@ -56,17 +58,18 @@ import org.apache.pig.tools.pigscript.pa import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.PigStats; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; public class TestMultiQueryLocal { - private PigServer myPig; + protected PigServer myPig; private String TMP_DIR; @Before public void setUp() throws Exception { - PigContext context = new PigContext(ExecType.LOCAL, new Properties()); + PigContext context = new PigContext(Util.getLocalTestMode(), new Properties()); context.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true); myPig = new PigServer(context); myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false"); @@ -351,22 +354,33 @@ public class TestMultiQueryLocal { public static class PigStorageWithConfig extends PigStorage { - private static final String key = "test.key"; + private static final String key1 = "test.key1"; + private static final String key2 = "test.key2"; private String suffix; + private String myKey; - public PigStorageWithConfig(String s) { + public PigStorageWithConfig(String key, String s) { this.suffix = s; + this.myKey = key; } @Override public void setStoreLocation(String location, Job job) throws IOException { super.setStoreLocation(location, job); - Assert.assertNull(job.getConfiguration().get(key)); + if (myKey.equals(key1)) { + Assert.assertNull(job.getConfiguration().get(key2)); + } else { + Assert.assertNull(job.getConfiguration().get(key1)); + } } @Override public OutputFormat getOutputFormat() { - return new PigTextOutputFormatWithConfig(); + if (myKey.equals(key1)) { + return new PigTextOutputFormatWithConfig1(); + } else { + return new PigTextOutputFormatWithConfig2(); + } } @Override @@ -384,16 +398,30 @@ public class TestMultiQueryLocal { } } - private static class PigTextOutputFormatWithConfig extends PigTextOutputFormat { + private static class PigTextOutputFormatWithConfig1 extends PigTextOutputFormat { + + public PigTextOutputFormatWithConfig1() { + super((byte) '\t'); + } + + @Override + public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + context.getConfiguration().set(PigStorageWithConfig.key1, MRConfiguration.WORK_OUPUT_DIR); + return super.getOutputCommitter(context); + } + } + + private static class PigTextOutputFormatWithConfig2 extends PigTextOutputFormat { - public PigTextOutputFormatWithConfig() { + public PigTextOutputFormatWithConfig2() { super((byte) '\t'); } @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { - context.getConfiguration().set(PigStorageWithConfig.key, MRConfiguration.WORK_OUPUT_DIR); + context.getConfiguration().set(PigStorageWithConfig.key2, MRConfiguration.WORK_OUPUT_DIR); return super.getOutputCommitter(context); } } @@ -411,17 +439,20 @@ public class TestMultiQueryLocal { "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); myPig.registerQuery("b = filter a by uid < 5;"); myPig.registerQuery("c = filter a by uid > 5;"); - myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('a');"); - myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('b');"); + myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('test.key1', 'a');"); + myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('test.key2', 'b');"); myPig.executeBatch(); myPig.discardBatch(); - BufferedReader reader = new BufferedReader(new FileReader(TMP_DIR + "/Pig-TestMultiQueryLocal1/part-m-00000")); + FileSystem fs = FileSystem.getLocal(new Configuration()); + BufferedReader reader = new BufferedReader(new InputStreamReader + (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal1"))))); String line; while ((line = reader.readLine())!=null) { Assert.assertTrue(line.endsWith("a")); } - reader = new BufferedReader(new FileReader(TMP_DIR + "/Pig-TestMultiQueryLocal2/part-m-00000")); + reader = new BufferedReader(new InputStreamReader + (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal2"))))); while ((line = reader.readLine())!=null) { Assert.assertTrue(line.endsWith("b")); } @@ -505,8 +536,9 @@ public class TestMultiQueryLocal { } @Test - public void testMultiQueryWithIllustrate() { + public void testMultiQueryWithIllustrate() throws Exception { + Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", !Util.getLocalTestMode().toString().startsWith("TEZ")); System.out.println("===== test multi-query with illustrate ====="); try { @@ -626,7 +658,7 @@ public class TestMultiQueryLocal { lp.optimize(myPig.getPigContext()); System.out.println("===== check physical plan ====="); - PhysicalPlan pp = ((MRExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile( + PhysicalPlan pp = ((HExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile( lp, null); Assert.assertEquals(expectedRoots, pp.getRoots().size()); @@ -638,9 +670,9 @@ public class TestMultiQueryLocal { return pp; } - private boolean executePlan(PhysicalPlan pp) throws IOException { + protected boolean executePlan(PhysicalPlan pp) throws IOException { boolean failed = true; - MapReduceLauncher launcher = new MapReduceLauncher(); + Launcher launcher = MiniGenericCluster.getLauncher(); PigStats stats = null; try { stats = launcher.launchPig(pp, "execute", myPig.getPigContext()); Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java Wed Jan 21 06:09:06 2015 @@ -23,28 +23,28 @@ import static org.junit.Assert.assertNul import java.util.Iterator; -import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; import org.apache.pig.impl.PigContext; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; /** * Test POPartialAgg runtime */ +@Ignore public class TestPOPartialAggPlan { - private static PigContext pc; - private static PigServer ps; + protected static PigContext pc; + protected static PigServer ps; @Before - public void setUp() throws ExecException { - ps = new PigServer(ExecType.LOCAL); + public void setUp() throws Exception { + ps = new PigServer(Util.getLocalTestMode()); pc = ps.getPigContext(); pc.connect(); } @@ -89,7 +89,7 @@ public class TestPOPartialAggPlan { return findPOPartialAgg(mapPlan); } - private String getGByQuery() { + protected String getGByQuery() { return "l = load 'x' as (a,b,c);" + "g = group l by a;" + "f = foreach g generate group, COUNT(l.b);"; @@ -122,8 +122,8 @@ public class TestPOPartialAggPlan { assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp)); } - private PhysicalOperator findPOPartialAgg(PhysicalPlan mapPlan) { - Iterator<PhysicalOperator> it = mapPlan.iterator(); + protected PhysicalOperator findPOPartialAgg(PhysicalPlan plan) { + Iterator<PhysicalOperator> it = plan.iterator(); while(it.hasNext()){ PhysicalOperator op = it.next(); if(op instanceof POPartialAgg){ @@ -132,7 +132,4 @@ public class TestPOPartialAggPlan { } return null; } - - - } Added: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java?rev=1653445&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java (added) +++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java Wed Jan 21 06:09:06 2015 @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.junit.Test; + +public class TestPOPartialAggPlanMR extends TestPOPartialAggPlan { + @Test + public void testNoMapAggProp() throws Exception{ + //test with pig.exec.mapPartAgg not set + String query = getGByQuery(); + + MROperPlan mrp = Util.buildMRPlan(query, pc); + assertEquals(mrp.size(), 1); + + assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp)); + } + + @Test + public void testMapAggPropFalse() throws Exception{ + //test with pig.exec.mapPartAgg set to false + String query = getGByQuery(); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "false"); + MROperPlan mrp = Util.buildMRPlan(query, pc); + assertEquals(mrp.size(), 1); + + assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp)); + } + + @Test + public void testMapAggPropTrue() throws Exception{ + //test with pig.exec.mapPartAgg to true + String query = getGByQuery(); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); + MROperPlan mrp = Util.buildMRPlan(query, pc); + assertEquals(mrp.size(), 1); + + assertNotNull("POPartialAgg should be present",findPOPartialAgg(mrp)); + + } + + + private PhysicalOperator findPOPartialAgg(MROperPlan mrp) { + PhysicalPlan mapPlan = mrp.getRoots().get(0).mapPlan; + return findPOPartialAgg(mapPlan); + } + + @Test + public void testMapAggNoAggFunc() throws Exception{ + //no agg func, so there should not be a POPartial + String query = "l = load 'x' as (a,b,c);" + + "g = group l by a;" + + "f = foreach g generate group;"; + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); + MROperPlan mrp = Util.buildMRPlan(query, pc); + assertEquals(mrp.size(), 1); + + assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp)); + } + + @Test + public void testMapAggNotCombinable() throws Exception{ + //not combinable, so there should not be a POPartial + String query = "l = load 'x' as (a,b,c);" + + "g = group l by a;" + + "f = foreach g generate group, COUNT(l.b), l.b;"; + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); + MROperPlan mrp = Util.buildMRPlan(query, pc); + assertEquals(mrp.size(), 1); + + assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp)); + } +} Modified: pig/trunk/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestStore.java (original) +++ pig/trunk/test/org/apache/pig/test/TestStore.java Wed Jan 21 06:09:06 2015 @@ -28,8 +28,6 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -45,7 +43,6 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.pig.EvalFunc; -import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigServer; @@ -56,12 +53,9 @@ import org.apache.pig.StoreMetadata; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.builtin.BinStorage; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataBag; @@ -84,37 +78,22 @@ import org.apache.pig.test.utils.GenRand import org.apache.pig.test.utils.TestHelper; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; -public class TestStore { +public class TestStore extends TestStoreBase { POStore st; DataBag inpDB; static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); PigContext pc; POProject proj; - PigServer pig; - - String inputFileName; - String outputFileName; - - private static final String DUMMY_STORE_CLASS_NAME - = "org.apache.pig.test.TestStore\\$DummyStore"; - - private static final String FAIL_UDF_NAME - = "org.apache.pig.test.TestStore\\$FailUDF"; - private static final String MAP_MAX_ATTEMPTS = MRConfiguration.MAP_MAX_ATTEMPTS; - private static final String TESTDIR = "/tmp/" + TestStore.class.getSimpleName(); - private static ExecType[] modes = new ExecType[] { ExecType.LOCAL, cluster.getExecType() }; - + @Before public void setUp() throws Exception { - pig = new PigServer(cluster.getExecType(), cluster.getProperties()); - pc = pig.getPigContext(); - inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + ".txt"; - outputFileName = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; - + mode = cluster.getExecType(); + setupPigServer(); + pc = ps.getPigContext(); + super.setUp(); } @After @@ -124,14 +103,20 @@ public class TestStore { Util.deleteFile(cluster, TESTDIR); } + @Override + protected void setupPigServer() throws Exception { + ps = new PigServer(cluster.getExecType(), + cluster.getProperties()); + } + private void storeAndCopyLocally(DataBag inpDB) throws Exception { setUpInputFileOnCluster(inpDB); String script = "a = load '" + inputFileName + "'; " + "store a into '" + outputFileName + "' using PigStorage('\t');" + "fs -ls " + TESTDIR; - pig.setBatchOn(); - Util.registerMultiLineQuery(pig, script); - pig.executeBatch(); + ps.setBatchOn(); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); Path path = getFirstOutputFile(cluster.getConfiguration(), new Path(outputFileName), cluster.getExecType(), true); Util.copyFromClusterToLocal( @@ -139,28 +124,6 @@ public class TestStore { path.toString(), outputFileName); } - public static Path getFirstOutputFile(Configuration conf, Path outputDir, - ExecType exectype, boolean isMapOutput) throws IOException { - FileSystem fs = outputDir.getFileSystem(conf); - FileStatus[] outputFiles = fs.listStatus(outputDir, - Util.getSuccessMarkerPathFilter()); - - boolean filefound = false; - if (outputFiles != null && outputFiles.length != 0) { - String name = outputFiles[0].getPath().getName(); - if (exectype == ExecType.LOCAL || exectype == ExecType.MAPREDUCE) { - if (isMapOutput) { - filefound = name.equals("part-m-00000"); - } else { - filefound = name.equals("part-r-00000"); - } - } else { - filefound = name.startsWith("part-"); - } - } - return filefound ? outputFiles[0].getPath() : null; - } - @AfterClass public static void oneTimeTearDown() throws Exception { cluster.shutDown(); @@ -173,13 +136,13 @@ public class TestStore { String query = "a = load '" + inputFileName + "' as (c:chararray, " + "i:int,d:double);" + "store a into '" + outputFileName + "' using " + "PigStorage();"; - org.apache.pig.newplan.logical.relational.LogicalPlan lp = Util.buildLp( pig, query ); + org.apache.pig.newplan.logical.relational.LogicalPlan lp = Util.buildLp( ps, query ); } catch (PlanValidationException e){ // Since output file is not present, validation should pass // and not throw this exception. fail("Store validation test failed."); } finally { - Util.deleteFile(pig.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); } } @@ -189,11 +152,11 @@ public class TestStore { String outputFileName = "test-output.txt"; boolean sawException = false; try { - Util.createInputFile(pig.getPigContext(),outputFileName, input); + Util.createInputFile(ps.getPigContext(),outputFileName, input); String query = "a = load '" + inputFileName + "' as (c:chararray, " + "i:int,d:double);" + "store a into '" + outputFileName + "' using PigStorage();"; - Util.buildLp( pig, query ); + Util.buildLp( ps, query ); } catch (InvocationTargetException e){ FrontendException pve = (FrontendException)e.getCause(); pve.printStackTrace(); @@ -205,7 +168,7 @@ public class TestStore { sawException = true; } finally { assertTrue(sawException); - Util.deleteFile(pig.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); } } @@ -363,24 +326,24 @@ public class TestStore { String inputFileName = "testGetSchema-input.txt"; String outputFileName = "testGetSchema-output.txt"; try { - Util.createInputFile(pig.getPigContext(), + Util.createInputFile(ps.getPigContext(), inputFileName, input); String query = "a = load '" + inputFileName + "' as (c:chararray, " + "i:int,d:double);store a into '" + outputFileName + "' using " + "BinStorage();"; - pig.setBatchOn(); - Util.registerMultiLineQuery(pig, query); - pig.executeBatch(); + ps.setBatchOn(); + Util.registerMultiLineQuery(ps, query); + ps.executeBatch(); ResourceSchema rs = new BinStorage().getSchema(outputFileName, - new Job(ConfigurationUtil.toConfiguration(pig.getPigContext(). + new Job(ConfigurationUtil.toConfiguration(ps.getPigContext(). getProperties()))); Schema expectedSchema = Utils.getSchemaFromString( "c:chararray,i:int,d:double"); assertTrue("Checking binstorage getSchema output", Schema.equals( expectedSchema, Schema.getPigSchema(rs), true, true)); } finally { - Util.deleteFile(pig.getPigContext(), inputFileName); - Util.deleteFile(pig.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), inputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); } } @@ -414,391 +377,6 @@ public class TestStore { checkStorePath("/tmp/foo/../././","/tmp/foo/.././."); } - @Test - public void testSetStoreSchema() throws Exception { - PigServer ps = null; - Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); - filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, Boolean.FALSE); - String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; - - String script = "a = load '"+ inputFileName + "' as (a0:chararray, a1:chararray);" + - "store a into '" + outputFileName + "' using " + - DUMMY_STORE_CLASS_NAME + "();"; - - for (ExecType execType : modes) { - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - if (Util.isHadoop1_x()) { - // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce - // OutputCommitter) is fixed only in 0.23.1 - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE); - } - } - ps.setBatchOn(); - Util.deleteFile(ps.getPigContext(), TESTDIR); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { - String condition = entry.getValue() ? "" : "not"; - assertEquals("Checking if file " + entry.getKey() + - " does " + condition + " exists in " + execType + - " mode", (boolean) entry.getValue(), - Util.exists(ps.getPigContext(), entry.getKey())); - } - } - } - - @Test - public void testCleanupOnFailure() throws Exception { - PigServer ps = null; - String cleanupSuccessFile = outputFileName + "_cleanupOnFailure_succeeded"; - String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed"; - String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; - - String script = "a = load '"+ inputFileName + "';" + - "store a into '" + outputFileName + "' using " + - DUMMY_STORE_CLASS_NAME + "('true');"; - - for (ExecType execType : modes) { - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - } - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - assertEquals( - "Checking if file indicating that cleanupOnFailure failed " + - " does not exists in " + execType + " mode", false, - Util.exists(ps.getPigContext(), cleanupFailFile)); - assertEquals( - "Checking if file indicating that cleanupOnFailure was " + - "successfully called exists in " + execType + " mode", true, - Util.exists(ps.getPigContext(), cleanupSuccessFile)); - } - } - - - @Test - public void testCleanupOnFailureMultiStore() throws Exception { - PigServer ps = null; - String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; - String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; - - Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); - filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); - filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); - filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); - filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); - - String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; - - // though the second store should - // not cause a failure, the first one does and the result should be - // that both stores are considered to have failed - String script = "a = load '"+ inputFileName + "';" + - "store a into '" + outputFileName1 + "' using " + - DUMMY_STORE_CLASS_NAME + "('true', '1');" + - "store a into '" + outputFileName2 + "' using " + - DUMMY_STORE_CLASS_NAME + "('false', '2');"; - - for (ExecType execType : new ExecType[] {cluster.getExecType(), ExecType.LOCAL}) { - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - // LocalJobRunner does not call abortTask - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); - if (Util.isHadoop1_x()) { - // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce - // OutputCommitter) is fixed only in 0.23.1 - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); - } - } - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { - String condition = entry.getValue() ? "" : "not"; - assertEquals("Checking if file " + entry.getKey() + - " does " + condition + " exists in " + execType + - " mode", (boolean) entry.getValue(), - Util.exists(ps.getPigContext(), entry.getKey())); - } - } - } - - // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs" - // property is set to true - // The test covers multi store and single store case in local and mapreduce mode - // The test also checks that "_SUCCESS" file is NOT created when the property - // is not set to true in all the modes. - @Test - public void testSuccessFileCreation1() throws Exception { - PigServer ps = null; - - try { - String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; - - String multiStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hello';" + - "c = filter a by $0 == 'hi';" + - "d = filter a by $0 == 'bye';" + - "store b into '" + outputFileName + "_1';" + - "store c into '" + outputFileName + "_2';" + - "store d into '" + outputFileName + "_3';"; - - String singleStoreScript = "a = load '"+ inputFileName + "';" + - "store a into '" + outputFileName + "_1';" ; - - for (ExecType execType : modes) { - for(boolean isPropertySet: new boolean[] { true, false}) { - for(boolean isMultiStore: new boolean[] { true, false}) { - String script = (isMultiStore ? multiStoreScript : - singleStoreScript); - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - } - ps.getPigContext().getProperties().setProperty( - MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, - Boolean.toString(isPropertySet)); - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { - String sucFile = outputFileName + "_" + i + "/" + - MapReduceLauncher.SUCCEEDED_FILE_NAME; - assertEquals("Checking if _SUCCESS file exists in " + - execType + " mode", isPropertySet, - Util.exists(ps.getPigContext(), sucFile)); - } - } - } - } - } finally { - Util.deleteFile(ps.getPigContext(), TESTDIR); - } - } - - // Test _SUCCESS file is NOT created when job fails and when - // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true - // The test covers multi store and single store case in local and mapreduce mode - // The test also checks that "_SUCCESS" file is NOT created when the property - // is not set to true in all the modes. - @Test - public void testSuccessFileCreation2() throws Exception { - PigServer ps = null; - try { - String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; - System.err.println("XXX: " + TestStore.FailUDF.class.getName()); - String multiStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hello';" + - "b = foreach b generate " + FAIL_UDF_NAME + "($0);" + - "c = filter a by $0 == 'hi';" + - "d = filter a by $0 == 'bye';" + - "store b into '" + outputFileName + "_1';" + - "store c into '" + outputFileName + "_2';" + - "store d into '" + outputFileName + "_3';"; - - String singleStoreScript = "a = load '"+ inputFileName + "';" + - "b = foreach a generate " + FAIL_UDF_NAME + "($0);" + - "store b into '" + outputFileName + "_1';" ; - - for (ExecType execType : modes) { - for(boolean isPropertySet: new boolean[] { true, false}) { - for(boolean isMultiStore: new boolean[] { true, false}) { - String script = (isMultiStore ? multiStoreScript : - singleStoreScript); - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - // since the job is guaranteed to fail, let's set - // number of retries to 1. - Properties props = cluster.getProperties(); - props.setProperty(MAP_MAX_ATTEMPTS, "1"); - ps = new PigServer(cluster.getExecType(), props); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - // since the job is guaranteed to fail, let's set - // number of retries to 1. - props.setProperty(MAP_MAX_ATTEMPTS, "1"); - ps = new PigServer(ExecType.LOCAL, props); - } - ps.getPigContext().getProperties().setProperty( - MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, - Boolean.toString(isPropertySet)); - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - try { - ps.executeBatch(); - } catch(IOException ioe) { - if(!ioe.getMessage().equals("FailUDFException")) { - // an unexpected exception - throw ioe; - } - } - for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { - String sucFile = outputFileName + "_" + i + "/" + - MapReduceLauncher.SUCCEEDED_FILE_NAME; - assertEquals("Checking if _SUCCESS file exists in " + - execType + " mode", false, - Util.exists(ps.getPigContext(), sucFile)); - } - } - } - } - } finally { - Util.deleteFile(ps.getPigContext(), TESTDIR); - } - } - - /** - * Test whether "part-m-00000" file is created on empty output when - * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat is - * supported by Hadoop. - * The test covers multi store and single store case in local and mapreduce mode - * - * @throws IOException - */ - @Test - public void testEmptyPartFileCreation() throws IOException { - - boolean isLazyOutputPresent = true; - try { - Class<?> clazz = PigContext - .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); - clazz.getMethod("setOutputFormatClass", Job.class, Class.class); - } - catch (Exception e) { - isLazyOutputPresent = false; - } - - //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0) - Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is skipped", isLazyOutputPresent); - - PigServer ps = null; - - try { - String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; - - String multiStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hey';" + - "c = filter a by $1 == 'globe';" + - "d = limit a 2;" + - "e = foreach d generate *, 'x';" + - "f = filter e by $3 == 'y';" + - "store b into '" + outputFileName + "_1';" + - "store c into '" + outputFileName + "_2';" + - "store f into '" + outputFileName + "_3';"; - - String singleStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hey';" + - "store b into '" + outputFileName + "_1';" ; - - for (ExecType execType : modes) { - for(boolean isMultiStore: new boolean[] { true, false}) { - if (isMultiStore && (execType.equals(ExecType.LOCAL) || - execType.equals(ExecType.MAPREDUCE))) { - // Skip this test for Mapreduce as MapReducePOStoreImpl - // does not handle LazyOutputFormat - continue; - } - - String script = (isMultiStore ? multiStoreScript - : singleStoreScript); - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - } - ps.getPigContext().getProperties().setProperty( - PigConfiguration.PIG_OUTPUT_LAZY, "true"); - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - Configuration conf = ConfigurationUtil.toConfiguration(ps.getPigContext().getProperties()); - for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { - assertEquals("For an empty output part-m-00000 should not exist in " + execType + " mode", - null, - getFirstOutputFile(conf, new Path(outputFileName + "_" + i), execType, true)); - } - } - } - } finally { - Util.deleteFile(ps.getPigContext(), TESTDIR); - } - } - // A UDF which always throws an Exception so that the job can fail public static class FailUDF extends EvalFunc<String> { Added: pig/trunk/test/org/apache/pig/test/TestStoreBase.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1653445&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (added) +++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Wed Jan 21 06:09:06 2015 @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.ExecType; +import org.apache.pig.PigConfiguration; +import org.apache.pig.PigServer; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; +import org.apache.pig.impl.PigContext; +import org.apache.pig.test.TestStore.DummyOutputCommitter; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public abstract class TestStoreBase { + protected ExecType mode; + protected String inputFileName; + protected String outputFileName; + + protected static final String TESTDIR = "/tmp/" + TestStore.class.getSimpleName(); + + protected static final String DUMMY_STORE_CLASS_NAME + = "org.apache.pig.test.TestStore\\$DummyStore"; + + protected static final String FAIL_UDF_NAME + = "org.apache.pig.test.TestStore\\$FailUDF"; + protected static final String MAP_MAX_ATTEMPTS = MRConfiguration.MAP_MAX_ATTEMPTS; + + protected PigServer ps = null; + + @Before + public void setUp() throws Exception { + inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + ".txt"; + outputFileName = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; + setupPigServer(); + } + + abstract protected void setupPigServer() throws Exception; + + @Test + public void testSetStoreSchema() throws Exception { + Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); + filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, Boolean.FALSE); + String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; + + String script = "a = load '"+ inputFileName + "' as (a0:chararray, a1:chararray);" + + "store a into '" + outputFileName + "' using " + + DUMMY_STORE_CLASS_NAME + "();"; + + if(!mode.isLocal()) { + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); + } else { + if (Util.isHadoop1_x()) { + // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce + // OutputCommitter) is fixed only in 0.23.1 + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE); + } + } + ps.setBatchOn(); + Util.deleteFile(ps.getPigContext(), TESTDIR); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { + String condition = entry.getValue() ? "" : "not"; + assertEquals("Checking if file " + entry.getKey() + + " does " + condition + " exists in " + mode + + " mode", (boolean) entry.getValue(), + Util.exists(ps.getPigContext(), entry.getKey())); + } + } + + @Test + public void testCleanupOnFailure() throws Exception { + String cleanupSuccessFile = outputFileName + "_cleanupOnFailure_succeeded"; + String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed"; + String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; + + String script = "a = load '"+ inputFileName + "';" + + "store a into '" + outputFileName + "' using " + + DUMMY_STORE_CLASS_NAME + "('true');"; + + Util.deleteFile(ps.getPigContext(), TESTDIR); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + assertEquals( + "Checking if file indicating that cleanupOnFailure failed " + + " does not exists in " + mode + " mode", false, + Util.exists(ps.getPigContext(), cleanupFailFile)); + assertEquals( + "Checking if file indicating that cleanupOnFailure was " + + "successfully called exists in " + mode + " mode", true, + Util.exists(ps.getPigContext(), cleanupSuccessFile)); + } + + @Test + public void testCleanupOnFailureMultiStore() throws Exception { + String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; + String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; + + Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); + + String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; + + // though the second store should + // not cause a failure, the first one does and the result should be + // that both stores are considered to have failed + String script = "a = load '"+ inputFileName + "';" + + "store a into '" + outputFileName1 + "' using " + + DUMMY_STORE_CLASS_NAME + "('true', '1');" + + "store a into '" + outputFileName2 + "' using " + + DUMMY_STORE_CLASS_NAME + "('false', '2');"; + + if(mode.isLocal()) { + // MR LocalJobRunner does not call abortTask + if (!mode.toString().startsWith("TEZ")) { + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); + } + if (Util.isHadoop1_x()) { + // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce + // OutputCommitter) is fixed only in 0.23.1 + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); + } + } + Util.deleteFile(ps.getPigContext(), TESTDIR); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { + String condition = entry.getValue() ? "" : "not"; + assertEquals("Checking if file " + entry.getKey() + + " does " + condition + " exists in " + mode + + " mode", (boolean) entry.getValue(), + Util.exists(ps.getPigContext(), entry.getKey())); + } + } + + // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs" + // property is set to true + // The test covers multi store and single store case in local and mapreduce mode + // The test also checks that "_SUCCESS" file is NOT created when the property + // is not set to true in all the modes. + @Test + public void testSuccessFileCreation1() throws Exception { + + try { + String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; + + String multiStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hello';" + + "c = filter a by $0 == 'hi';" + + "d = filter a by $0 == 'bye';" + + "store b into '" + outputFileName + "_1';" + + "store c into '" + outputFileName + "_2';" + + "store d into '" + outputFileName + "_3';"; + + String singleStoreScript = "a = load '"+ inputFileName + "';" + + "store a into '" + outputFileName + "_1';" ; + + for(boolean isPropertySet: new boolean[] { true, false}) { + for(boolean isMultiStore: new boolean[] { true, false}) { + String script = (isMultiStore ? multiStoreScript : + singleStoreScript); + ps.getPigContext().getProperties().setProperty( + MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, + Boolean.toString(isPropertySet)); + Util.deleteFile(ps.getPigContext(), TESTDIR); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { + String sucFile = outputFileName + "_" + i + "/" + + MapReduceLauncher.SUCCEEDED_FILE_NAME; + assertEquals("Checking if _SUCCESS file exists in " + + mode + " mode", isPropertySet, + Util.exists(ps.getPigContext(), sucFile)); + } + } + } + } finally { + Util.deleteFile(ps.getPigContext(), TESTDIR); + } + } + + // Test _SUCCESS file is NOT created when job fails and when + // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true + // The test covers multi store and single store case in local and mapreduce mode + // The test also checks that "_SUCCESS" file is NOT created when the property + // is not set to true in all the modes. + @Test + public void testSuccessFileCreation2() throws Exception { + try { + String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; + System.err.println("XXX: " + TestStore.FailUDF.class.getName()); + String multiStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hello';" + + "b = foreach b generate " + FAIL_UDF_NAME + "($0);" + + "c = filter a by $0 == 'hi';" + + "d = filter a by $0 == 'bye';" + + "store b into '" + outputFileName + "_1';" + + "store c into '" + outputFileName + "_2';" + + "store d into '" + outputFileName + "_3';"; + + String singleStoreScript = "a = load '"+ inputFileName + "';" + + "b = foreach a generate " + FAIL_UDF_NAME + "($0);" + + "store b into '" + outputFileName + "_1';" ; + + for(boolean isPropertySet: new boolean[] { true, false}) { + for(boolean isMultiStore: new boolean[] { true, false}) { + String script = (isMultiStore ? multiStoreScript : + singleStoreScript); + if (mode.isLocal()) { + // since the job is guaranteed to fail, let's set + // number of retries to 1. + ps.getPigContext().getProperties().setProperty(MAP_MAX_ATTEMPTS, "1"); + } + ps.getPigContext().getProperties().setProperty( + MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, + Boolean.toString(isPropertySet)); + Util.deleteFile(ps.getPigContext(), TESTDIR); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + try { + ps.executeBatch(); + } catch(IOException ioe) { + if(!ioe.getMessage().equals("FailUDFException")) { + // an unexpected exception + throw ioe; + } + } + for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { + String sucFile = outputFileName + "_" + i + "/" + + MapReduceLauncher.SUCCEEDED_FILE_NAME; + assertEquals("Checking if _SUCCESS file exists in " + + mode + " mode", false, + Util.exists(ps.getPigContext(), sucFile)); + } + } + } + } finally { + Util.deleteFile(ps.getPigContext(), TESTDIR); + } + } + + /** + * Test whether "part-m-00000" file is created on empty output when + * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat is + * supported by Hadoop. + * The test covers multi store and single store case in local and mapreduce mode + * + * @throws IOException + */ + @Test + public void testEmptyPartFileCreation() throws Exception { + + boolean isLazyOutputPresent = true; + try { + Class<?> clazz = PigContext + .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); + clazz.getMethod("setOutputFormatClass", Job.class, Class.class); + } + catch (Exception e) { + isLazyOutputPresent = false; + } + + //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0) + Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is skipped", isLazyOutputPresent); + + try { + String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; + + String multiStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hey';" + + "c = filter a by $1 == 'globe';" + + "d = limit a 2;" + + "e = foreach d generate *, 'x';" + + "f = filter e by $3 == 'y';" + + "store b into '" + outputFileName + "_1';" + + "store c into '" + outputFileName + "_2';" + + "store f into '" + outputFileName + "_3';"; + + String singleStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hey';" + + "store b into '" + outputFileName + "_1';" ; + + for(boolean isMultiStore: new boolean[] { true, false}) { + if (isMultiStore && (mode.isLocal() || + mode.equals(ExecType.MAPREDUCE))) { + // Skip this test for Mapreduce as MapReducePOStoreImpl + // does not handle LazyOutputFormat + continue; + } + + String script = (isMultiStore ? multiStoreScript + : singleStoreScript); + ps.getPigContext().getProperties().setProperty( + PigConfiguration.PIG_OUTPUT_LAZY, "true"); + Util.deleteFile(ps.getPigContext(), TESTDIR); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + Configuration conf = ConfigurationUtil.toConfiguration(ps.getPigContext().getProperties()); + for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { + assertEquals("For an empty output part-m-00000 should not exist in " + mode + " mode", + null, + getFirstOutputFile(conf, new Path(outputFileName + "_" + i), mode, true)); + } + } + } finally { + Util.deleteFile(ps.getPigContext(), TESTDIR); + } + } + + public static Path getFirstOutputFile(Configuration conf, Path outputDir, + ExecType exectype, boolean isMapOutput) throws Exception { + FileSystem fs = outputDir.getFileSystem(conf); + FileStatus[] outputFiles = fs.listStatus(outputDir, + Util.getSuccessMarkerPathFilter()); + + boolean filefound = false; + if (outputFiles != null && outputFiles.length != 0) { + String name = outputFiles[0].getPath().getName(); + if (exectype == Util.getLocalTestMode() || exectype == ExecType.MAPREDUCE) { + if (isMapOutput) { + filefound = name.equals("part-m-00000"); + } else { + filefound = name.equals("part-r-00000"); + } + } else { + filefound = name.startsWith("part-"); + } + } + return filefound ? outputFiles[0].getPath() : null; + } +} Added: pig/trunk/test/org/apache/pig/test/TestStoreLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreLocal.java?rev=1653445&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestStoreLocal.java (added) +++ pig/trunk/test/org/apache/pig/test/TestStoreLocal.java Wed Jan 21 06:09:06 2015 @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.util.Properties; + +import org.apache.pig.PigServer; +import org.junit.Before; + +public class TestStoreLocal extends TestStoreBase { + @Before + public void setUp() throws Exception { + mode = Util.getLocalTestMode(); + super.setUp(); + } + + @Override + protected void setupPigServer() throws Exception { + Properties props = new Properties(); + ps = new PigServer(Util.getLocalTestMode(), props); + } +} Added: pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java?rev=1653445&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java (added) +++ pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java Wed Jan 21 06:09:06 2015 @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tez; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.test.TestPOPartialAggPlan; +import org.junit.Test; + +public class TestPOPartialAggPlanTez extends TestPOPartialAggPlan { + @Test + public void testNoMapAggProp() throws Exception{ + //test with pig.exec.mapPartAgg not set + String query = getGByQuery(); + + TezPlanContainer tezPlanContainer = TezUtil.buildTezPlanContainer(query, pc); + assertEquals(tezPlanContainer.size(), 1); + + assertNull("POPartialAgg should be absent",findPOPartialAgg(tezPlanContainer)); + } + + @Test + public void testMapAggPropFalse() throws Exception{ + //test with pig.exec.mapPartAgg set to false + String query = getGByQuery(); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "false"); + TezPlanContainer tezPlanContainer = TezUtil.buildTezPlanContainer(query, pc); + assertEquals(tezPlanContainer.size(), 1); + + assertNull("POPartialAgg should be absent", findPOPartialAgg(tezPlanContainer)); + } + + @Test + public void testMapAggPropTrue() throws Exception{ + //test with pig.exec.mapPartAgg to true + String query = getGByQuery(); + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); + TezPlanContainer tezPlanContainer = TezUtil.buildTezPlanContainer(query, pc); + assertEquals(tezPlanContainer.size(), 1); + + assertNotNull("POPartialAgg should be present",findPOPartialAgg(tezPlanContainer)); + + } + + @Test + public void testMapAggNoAggFunc() throws Exception{ + //no agg func, so there should not be a POPartial + String query = "l = load 'x' as (a,b,c);" + + "g = group l by a;" + + "f = foreach g generate group;"; + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); + TezPlanContainer tezPlanContainer = TezUtil.buildTezPlanContainer(query, pc); + assertEquals(tezPlanContainer.size(), 1); + + assertNull("POPartialAgg should be absent",findPOPartialAgg(tezPlanContainer)); + } + + @Test + public void testMapAggNotCombinable() throws Exception{ + //not combinable, so there should not be a POPartial + String query = "l = load 'x' as (a,b,c);" + + "g = group l by a;" + + "f = foreach g generate group, COUNT(l.b), l.b;"; + pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "true"); + TezPlanContainer tezPlanContainer = TezUtil.buildTezPlanContainer(query, pc); + assertEquals(tezPlanContainer.size(), 1); + + assertNull("POPartialAgg should be absent", findPOPartialAgg(tezPlanContainer)); + } + + private PhysicalOperator findPOPartialAgg(TezPlanContainer tezPlanContainer) { + for (TezPlanContainerNode node : tezPlanContainer) { + TezOperPlan tezPlan = node.getTezOperPlan(); + for (TezOperator tezOper : tezPlan) { + PhysicalOperator partialAgg = findPOPartialAgg(tezOper.plan); + if (partialAgg != null) { + return partialAgg; + } + } + } + return null; + } +} Added: pig/trunk/test/org/apache/pig/tez/TezUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TezUtil.java?rev=1653445&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TezUtil.java (added) +++ pig/trunk/test/org/apache/pig/tez/TezUtil.java Wed Jan 21 06:09:06 2015 @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tez; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.impl.PigContext; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.apache.pig.test.Util; + +public class TezUtil { + public static TezPlanContainer buildTezPlanContainer(String query, PigContext pc) throws Exception { + LogicalPlan lp = Util.parse(query, pc); + Util.optimizeNewLP(lp); + PhysicalPlan pp = Util.buildPhysicalPlanFromNewLP(lp, pc); + TezPlanContainer tezPlanContainer = buildTezPlanWithOptimizer(pp, pc); + return tezPlanContainer; + } + + public static TezPlanContainer buildTezPlanWithOptimizer(PhysicalPlan pp, PigContext pc) throws Exception { + MapRedUtil.checkLeafIsStore(pp, pc); + TezLauncher launcher = new TezLauncher(); + return launcher.compile(pp, pc); + } +} Modified: pig/trunk/test/tez-local-tests URL: http://svn.apache.org/viewvc/pig/trunk/test/tez-local-tests?rev=1653445&r1=1653444&r2=1653445&view=diff ============================================================================== --- pig/trunk/test/tez-local-tests (original) +++ pig/trunk/test/tez-local-tests Wed Jan 21 06:09:06 2015 @@ -80,3 +80,6 @@ **/TestRank3.java **/TestScalarAliasesLocal.java **/TestPigStatsTez.java +**/TestStoreLocal.java +**/TestPOPartialAggPlanTez.java +**/TestMultiQueryLocal.java