Author: daijy
Date: Wed Jan 21 06:09:06 2015
New Revision: 1653445

URL: http://svn.apache.org/r1653445
Log:
PIG-4359: Port local mode tests to Tez - part4

Added:
    pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java
    pig/trunk/test/org/apache/pig/test/TestStoreBase.java
    pig/trunk/test/org/apache/pig/test/TestStoreLocal.java
    pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java
    pig/trunk/test/org/apache/pig/tez/TezUtil.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/test/excluded-tests-20
    pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
    pig/trunk/test/org/apache/pig/test/TestStore.java
    pig/trunk/test/tez-local-tests

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 21 06:09:06 2015
@@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4359: Port local mode tests to Tez - part4 (daijy)
+
 PIG-4340: PigStorage fails parsing empty map (daijy)
 
 PIG-4366: Port local mode tests to Tez - part5 (daijy)

Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java 
(original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Wed Jan 
21 06:09:06 2015
@@ -27,7 +27,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 
 /**
  * This class builds a single instance of itself with the Singleton
@@ -128,4 +130,8 @@ public class MiniCluster extends MiniGen
         if (m_mr != null) { m_mr.stop(); }
         m_mr = null;
     }
+
+    static public Launcher getLauncher() {
+        return new MapReduceLauncher();
+    }
 }

Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java 
(original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Wed 
Jan 21 06:09:06 2015
@@ -33,7 +33,9 @@ import org.apache.hadoop.mapreduce.v2.Mi
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -185,4 +187,8 @@ public class TezMiniCluster extends Mini
             YARN_CONF_FILE.delete();
         }
     }
+
+    static public Launcher getLauncher() {
+        return new TezLauncher();
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Wed Jan 21 06:09:06 2015
@@ -368,7 +368,7 @@ public class TezCompiler extends PhyPlan
                     storeOnlyPhyPlan.addAsLeaf(store);
                     storeOnlyTezOperator.plan = storeOnlyPhyPlan;
                     tezPlan.add(storeOnlyTezOperator);
-                    phyToTezOpMap.put(store, storeOnlyTezOperator);
+                    phyToTezOpMap.put(p, storeOnlyTezOperator);
 
                     // Create new operator as second splittee
                     curTezOp = getTezOp();

Modified: pig/trunk/test/excluded-tests-20
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-20?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/excluded-tests-20 (original)
+++ pig/trunk/test/excluded-tests-20 Wed Jan 21 06:09:06 2015
@@ -7,3 +7,4 @@
 **/TestGroupConstParallelTez.java
 **/TestLoaderStorerShipCacheFilesTez.java
 **/TestPigStatsTez.java
+**/TestPOPartialAggPlanTez.java

Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Wed Jan 21 
06:09:06 2015
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.pig.ExecType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
 
 /**
  * This class builds a single instance of itself with the Singleton
@@ -146,4 +147,18 @@ abstract public class MiniGenericCluster
         String msg = "function called on MiniCluster that has been shutdown";
         throw new RuntimeException(msg);
     }
+
+    static public Launcher getLauncher() {
+        String execType = System.getProperty("test.exec.type");
+        if (execType == null) {
+            System.setProperty("test.exec.type", EXECTYPE_MR);
+        }
+        if (execType.equalsIgnoreCase(EXECTYPE_MR)) {
+            return MiniCluster.getLauncher();
+        } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) {
+            return TezMiniCluster.getLauncher();
+        } else {
+            throw new RuntimeException("Unknown test.exec.type: " + execType);
+        }
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Wed Jan 21 
06:09:06 2015
@@ -19,8 +19,8 @@ package org.apache.pig.test;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,17 +29,19 @@ import java.util.Properties;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.builtin.PigStorage;
@@ -56,17 +58,18 @@ import org.apache.pig.tools.pigscript.pa
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestMultiQueryLocal {
 
-    private PigServer myPig;
+    protected PigServer myPig;
     private String TMP_DIR;
 
     @Before
     public void setUp() throws Exception {
-        PigContext context = new PigContext(ExecType.LOCAL, new Properties());
+        PigContext context = new PigContext(Util.getLocalTestMode(), new 
Properties());
         
context.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, 
""+true);
         myPig = new PigServer(context);
         
myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", 
"false");
@@ -351,22 +354,33 @@ public class TestMultiQueryLocal {
 
     public static class PigStorageWithConfig extends PigStorage {
 
-        private static final String key = "test.key";
+        private static final String key1 = "test.key1";
+        private static final String key2 = "test.key2";
         private String suffix;
+        private String myKey;
 
-        public PigStorageWithConfig(String s) {
+        public PigStorageWithConfig(String key, String s) {
             this.suffix = s;
+            this.myKey = key;
         }
 
         @Override
         public void setStoreLocation(String location, Job job) throws 
IOException {
             super.setStoreLocation(location, job);
-            Assert.assertNull(job.getConfiguration().get(key));
+            if (myKey.equals(key1)) {
+                Assert.assertNull(job.getConfiguration().get(key2));
+            } else {
+                Assert.assertNull(job.getConfiguration().get(key1));
+            }
         }
 
         @Override
         public OutputFormat getOutputFormat() {
-            return new PigTextOutputFormatWithConfig();
+            if (myKey.equals(key1)) {
+                return new PigTextOutputFormatWithConfig1();
+            } else {
+                return new PigTextOutputFormatWithConfig2();
+            }
         }
 
         @Override
@@ -384,16 +398,30 @@ public class TestMultiQueryLocal {
         }
     }
 
-    private static class PigTextOutputFormatWithConfig extends 
PigTextOutputFormat {
+    private static class PigTextOutputFormatWithConfig1 extends 
PigTextOutputFormat {
+
+        public PigTextOutputFormatWithConfig1() {
+            super((byte) '\t');
+        }
+
+        @Override
+        public synchronized OutputCommitter 
getOutputCommitter(TaskAttemptContext context)
+                throws IOException {
+            context.getConfiguration().set(PigStorageWithConfig.key1, 
MRConfiguration.WORK_OUPUT_DIR);
+            return super.getOutputCommitter(context);
+        }
+    }
+
+    private static class PigTextOutputFormatWithConfig2 extends 
PigTextOutputFormat {
 
-        public PigTextOutputFormatWithConfig() {
+        public PigTextOutputFormatWithConfig2() {
             super((byte) '\t');
         }
 
         @Override
         public synchronized OutputCommitter 
getOutputCommitter(TaskAttemptContext context)
                 throws IOException {
-            context.getConfiguration().set(PigStorageWithConfig.key, 
MRConfiguration.WORK_OUPUT_DIR);
+            context.getConfiguration().set(PigStorageWithConfig.key2, 
MRConfiguration.WORK_OUPUT_DIR);
             return super.getOutputCommitter(context);
         }
     }
@@ -411,17 +439,20 @@ public class TestMultiQueryLocal {
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
             myPig.registerQuery("b = filter a by uid < 5;");
             myPig.registerQuery("c = filter a by uid > 5;");
-            myPig.registerQuery("store b into '" + TMP_DIR + 
"/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + 
"('a');");
-            myPig.registerQuery("store c into '" + TMP_DIR + 
"/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + 
"('b');");
+            myPig.registerQuery("store b into '" + TMP_DIR + 
"/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + 
"('test.key1', 'a');");
+            myPig.registerQuery("store c into '" + TMP_DIR + 
"/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + 
"('test.key2', 'b');");
 
             myPig.executeBatch();
             myPig.discardBatch();
-            BufferedReader reader = new BufferedReader(new FileReader(TMP_DIR 
+ "/Pig-TestMultiQueryLocal1/part-m-00000"));
+            FileSystem fs = FileSystem.getLocal(new Configuration());
+            BufferedReader reader = new BufferedReader(new InputStreamReader
+                    (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + 
"/Pig-TestMultiQueryLocal1")))));
             String line;
             while ((line = reader.readLine())!=null) {
                 Assert.assertTrue(line.endsWith("a"));
             }
-            reader = new BufferedReader(new FileReader(TMP_DIR + 
"/Pig-TestMultiQueryLocal2/part-m-00000"));
+            reader = new BufferedReader(new InputStreamReader
+                    (fs.open(Util.getFirstPartFile(new Path(TMP_DIR + 
"/Pig-TestMultiQueryLocal2")))));
             while ((line = reader.readLine())!=null) {
                 Assert.assertTrue(line.endsWith("b"));
             }
@@ -505,8 +536,9 @@ public class TestMultiQueryLocal {
     }
 
     @Test
-    public void testMultiQueryWithIllustrate() {
+    public void testMultiQueryWithIllustrate() throws Exception {
 
+        Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", 
!Util.getLocalTestMode().toString().startsWith("TEZ"));
         System.out.println("===== test multi-query with illustrate =====");
 
         try {
@@ -626,7 +658,7 @@ public class TestMultiQueryLocal {
         lp.optimize(myPig.getPigContext());
         System.out.println("===== check physical plan =====");        
 
-        PhysicalPlan pp = 
((MRExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile(
+        PhysicalPlan pp = 
((HExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile(
                 lp, null);
 
         Assert.assertEquals(expectedRoots, pp.getRoots().size());
@@ -638,9 +670,9 @@ public class TestMultiQueryLocal {
         return pp;
     }
 
-    private boolean executePlan(PhysicalPlan pp) throws IOException {
+    protected boolean executePlan(PhysicalPlan pp) throws IOException {
         boolean failed = true;
-        MapReduceLauncher launcher = new MapReduceLauncher();
+        Launcher launcher = MiniGenericCluster.getLauncher();
         PigStats stats = null;
         try {
             stats = launcher.launchPig(pp, "execute", myPig.getPigContext());

Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java Wed Jan 21 
06:09:06 2015
@@ -23,28 +23,28 @@ import static org.junit.Assert.assertNul
 
 import java.util.Iterator;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.impl.PigContext;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  * Test POPartialAgg runtime
  */
+@Ignore
 public class TestPOPartialAggPlan  {
-    private static PigContext pc;
-    private static PigServer ps;
+    protected static PigContext pc;
+    protected static PigServer ps;
 
     @Before
-    public void setUp() throws ExecException {
-        ps = new PigServer(ExecType.LOCAL);
+    public void setUp() throws Exception {
+        ps = new PigServer(Util.getLocalTestMode());
         pc = ps.getPigContext();
         pc.connect();
     }
@@ -89,7 +89,7 @@ public class TestPOPartialAggPlan  {
         return findPOPartialAgg(mapPlan);
     }
 
-    private String getGByQuery() {
+    protected String getGByQuery() {
         return "l = load 'x' as (a,b,c);" +
                 "g = group l by a;" +
                 "f = foreach g generate group, COUNT(l.b);";
@@ -122,8 +122,8 @@ public class TestPOPartialAggPlan  {
         assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
     }
 
-    private PhysicalOperator findPOPartialAgg(PhysicalPlan mapPlan) {
-        Iterator<PhysicalOperator> it = mapPlan.iterator();
+    protected PhysicalOperator findPOPartialAgg(PhysicalPlan plan) {
+        Iterator<PhysicalOperator> it = plan.iterator();
         while(it.hasNext()){
             PhysicalOperator op = it.next();
             if(op instanceof POPartialAgg){
@@ -132,7 +132,4 @@ public class TestPOPartialAggPlan  {
         }
         return null;
     }
-
-
-
 }

Added: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java Wed Jan 21 
06:09:06 2015
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.pig.PigConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.junit.Test;
+
+public class TestPOPartialAggPlanMR extends TestPOPartialAggPlan {
+    @Test
+    public void testNoMapAggProp() throws Exception{
+        //test with pig.exec.mapPartAgg not set
+        String query = getGByQuery();
+
+        MROperPlan mrp = Util.buildMRPlan(query, pc);
+        assertEquals(mrp.size(), 1);
+
+        assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp));
+    }
+
+    @Test
+    public void testMapAggPropFalse() throws Exception{
+        //test with pig.exec.mapPartAgg set to false
+        String query = getGByQuery();
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"false");
+        MROperPlan mrp = Util.buildMRPlan(query, pc);
+        assertEquals(mrp.size(), 1);
+
+        assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
+    }
+
+    @Test
+    public void testMapAggPropTrue() throws Exception{
+        //test with pig.exec.mapPartAgg to true
+        String query = getGByQuery();
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
+        MROperPlan mrp = Util.buildMRPlan(query, pc);
+        assertEquals(mrp.size(), 1);
+
+        assertNotNull("POPartialAgg should be present",findPOPartialAgg(mrp));
+
+    }
+
+
+    private PhysicalOperator findPOPartialAgg(MROperPlan mrp) {
+        PhysicalPlan mapPlan = mrp.getRoots().get(0).mapPlan;
+        return findPOPartialAgg(mapPlan);
+    }
+
+    @Test
+    public void testMapAggNoAggFunc() throws Exception{
+        //no agg func, so there should not be a POPartial
+        String query = "l = load 'x' as (a,b,c);" +
+                "g = group l by a;" +
+                "f = foreach g generate group;";
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
+        MROperPlan mrp = Util.buildMRPlan(query, pc);
+        assertEquals(mrp.size(), 1);
+
+        assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp));
+    }
+
+    @Test
+    public void testMapAggNotCombinable() throws Exception{
+        //not combinable, so there should not be a POPartial
+        String query = "l = load 'x' as (a,b,c);" +
+                "g = group l by a;" +
+                "f = foreach g generate group, COUNT(l.b), l.b;";
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
+        MROperPlan mrp = Util.buildMRPlan(query, pc);
+        assertEquals(mrp.size(), 1);
+
+        assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestStore.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStore.java Wed Jan 21 06:09:06 2015
@@ -28,8 +28,6 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -45,7 +43,6 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
@@ -56,12 +53,9 @@ import org.apache.pig.StoreMetadata;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
@@ -84,37 +78,22 @@ import org.apache.pig.test.utils.GenRand
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestStore {
+public class TestStore extends TestStoreBase {
     POStore st;
     DataBag inpDB;
     static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
     PigContext pc;
     POProject proj;
-    PigServer pig;
-
-    String inputFileName;
-    String outputFileName;
-
-    private static final String DUMMY_STORE_CLASS_NAME
-    = "org.apache.pig.test.TestStore\\$DummyStore";
-
-    private static final String FAIL_UDF_NAME
-    = "org.apache.pig.test.TestStore\\$FailUDF";
-    private static final String MAP_MAX_ATTEMPTS = 
MRConfiguration.MAP_MAX_ATTEMPTS;
-    private static final String TESTDIR = "/tmp/" + 
TestStore.class.getSimpleName();
-    private static ExecType[] modes = new ExecType[] { ExecType.LOCAL, 
cluster.getExecType() };
-
+    
     @Before
     public void setUp() throws Exception {
-        pig = new PigServer(cluster.getExecType(), cluster.getProperties());
-        pc = pig.getPigContext();
-        inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + 
".txt";
-        outputFileName = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
-
+        mode = cluster.getExecType();
+        setupPigServer();
+        pc = ps.getPigContext();
+        super.setUp();
     }
 
     @After
@@ -124,14 +103,20 @@ public class TestStore {
         Util.deleteFile(cluster, TESTDIR);
     }
 
+    @Override
+    protected void setupPigServer() throws Exception {
+        ps = new PigServer(cluster.getExecType(),
+                cluster.getProperties());
+    }
+
     private void storeAndCopyLocally(DataBag inpDB) throws Exception {
         setUpInputFileOnCluster(inpDB);
         String script = "a = load '" + inputFileName + "'; " +
                 "store a into '" + outputFileName + "' using 
PigStorage('\t');" +
                 "fs -ls " + TESTDIR;
-        pig.setBatchOn();
-        Util.registerMultiLineQuery(pig, script);
-        pig.executeBatch();
+        ps.setBatchOn();
+        Util.registerMultiLineQuery(ps, script);
+        ps.executeBatch();
         Path path = getFirstOutputFile(cluster.getConfiguration(),
                 new Path(outputFileName), cluster.getExecType(), true);
         Util.copyFromClusterToLocal(
@@ -139,28 +124,6 @@ public class TestStore {
                 path.toString(), outputFileName);
     }
 
-    public static Path getFirstOutputFile(Configuration conf, Path outputDir,
-            ExecType exectype, boolean isMapOutput) throws IOException {
-        FileSystem fs = outputDir.getFileSystem(conf);
-        FileStatus[] outputFiles = fs.listStatus(outputDir,
-                Util.getSuccessMarkerPathFilter());
-
-        boolean filefound = false;
-        if (outputFiles != null && outputFiles.length != 0) {
-            String name = outputFiles[0].getPath().getName();
-            if (exectype == ExecType.LOCAL || exectype == ExecType.MAPREDUCE) {
-                if (isMapOutput) {
-                    filefound = name.equals("part-m-00000");
-                } else {
-                    filefound = name.equals("part-r-00000");
-                }
-            } else {
-                filefound = name.startsWith("part-");
-            }
-        }
-        return filefound ? outputFiles[0].getPath() : null;
-    }
-
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
@@ -173,13 +136,13 @@ public class TestStore {
             String query = "a = load '" + inputFileName + "' as (c:chararray, 
" +
                            "i:int,d:double);" +
                            "store a into '" + outputFileName + "' using " + 
"PigStorage();";
-            org.apache.pig.newplan.logical.relational.LogicalPlan lp = 
Util.buildLp( pig, query );
+            org.apache.pig.newplan.logical.relational.LogicalPlan lp = 
Util.buildLp( ps, query );
         } catch (PlanValidationException e){
                 // Since output file is not present, validation should pass
                 // and not throw this exception.
                 fail("Store validation test failed.");
         } finally {
-            Util.deleteFile(pig.getPigContext(), outputFileName);
+            Util.deleteFile(ps.getPigContext(), outputFileName);
         }
     }
 
@@ -189,11 +152,11 @@ public class TestStore {
         String outputFileName = "test-output.txt";
         boolean sawException = false;
         try {
-            Util.createInputFile(pig.getPigContext(),outputFileName, input);
+            Util.createInputFile(ps.getPigContext(),outputFileName, input);
             String query = "a = load '" + inputFileName + "' as (c:chararray, 
" +
                            "i:int,d:double);" +
                            "store a into '" + outputFileName + "' using 
PigStorage();";
-            Util.buildLp( pig, query );
+            Util.buildLp( ps, query );
         } catch (InvocationTargetException e){
             FrontendException pve = (FrontendException)e.getCause();
             pve.printStackTrace();
@@ -205,7 +168,7 @@ public class TestStore {
             sawException = true;
         } finally {
             assertTrue(sawException);
-            Util.deleteFile(pig.getPigContext(), outputFileName);
+            Util.deleteFile(ps.getPigContext(), outputFileName);
         }
     }
 
@@ -363,24 +326,24 @@ public class TestStore {
         String inputFileName = "testGetSchema-input.txt";
         String outputFileName = "testGetSchema-output.txt";
         try {
-            Util.createInputFile(pig.getPigContext(),
+            Util.createInputFile(ps.getPigContext(),
                     inputFileName, input);
             String query = "a = load '" + inputFileName + "' as (c:chararray, 
" +
                     "i:int,d:double);store a into '" + outputFileName + "' 
using " +
                             "BinStorage();";
-            pig.setBatchOn();
-            Util.registerMultiLineQuery(pig, query);
-            pig.executeBatch();
+            ps.setBatchOn();
+            Util.registerMultiLineQuery(ps, query);
+            ps.executeBatch();
             ResourceSchema rs = new BinStorage().getSchema(outputFileName,
-                    new 
Job(ConfigurationUtil.toConfiguration(pig.getPigContext().
+                    new 
Job(ConfigurationUtil.toConfiguration(ps.getPigContext().
                             getProperties())));
             Schema expectedSchema = Utils.getSchemaFromString(
                     "c:chararray,i:int,d:double");
             assertTrue("Checking binstorage getSchema output", Schema.equals(
                     expectedSchema, Schema.getPigSchema(rs), true, true));
         } finally {
-            Util.deleteFile(pig.getPigContext(), inputFileName);
-            Util.deleteFile(pig.getPigContext(), outputFileName);
+            Util.deleteFile(ps.getPigContext(), inputFileName);
+            Util.deleteFile(ps.getPigContext(), outputFileName);
         }
     }
 
@@ -414,391 +377,6 @@ public class TestStore {
         checkStorePath("/tmp/foo/../././","/tmp/foo/.././.");
     }
 
-    @Test
-    public void testSetStoreSchema() throws Exception {
-        PigServer ps = null;
-        Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
-        filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, 
Boolean.FALSE);
-        String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
-
-        String script = "a = load '"+ inputFileName + "' as (a0:chararray, 
a1:chararray);" +
-                "store a into '" + outputFileName + "' using " +
-                DUMMY_STORE_CLASS_NAME + "();";
-
-        for (ExecType execType : modes) {
-            Util.resetStateForExecModeSwitch();
-            if(execType == cluster.getExecType()) {
-                ps = new PigServer(cluster.getExecType(),
-                        cluster.getProperties());
-                filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, 
Boolean.TRUE);
-                filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, 
Boolean.TRUE);
-            } else {
-                Properties props = new Properties();
-                props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
-                ps = new PigServer(ExecType.LOCAL, props);
-                if (Util.isHadoop1_x()) {
-                    // MAPREDUCE-1447/3563 (LocalJobRunner does not call 
methods of mapreduce
-                    // OutputCommitter) is fixed only in 0.23.1
-                    
filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE);
-                    
filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE);
-                }
-            }
-            ps.setBatchOn();
-            Util.deleteFile(ps.getPigContext(), TESTDIR);
-            Util.createInputFile(ps.getPigContext(),
-                    inputFileName, inputData);
-            Util.registerMultiLineQuery(ps, script);
-            ps.executeBatch();
-            for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
-                String condition = entry.getValue() ? "" : "not";
-                assertEquals("Checking if file " + entry.getKey() +
-                        " does " + condition + " exists in " + execType +
-                        " mode", (boolean) entry.getValue(),
-                        Util.exists(ps.getPigContext(), entry.getKey()));
-            }
-        }
-    }
-
-    @Test
-    public void testCleanupOnFailure() throws Exception {
-        PigServer ps = null;
-        String cleanupSuccessFile = outputFileName + 
"_cleanupOnFailure_succeeded";
-        String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed";
-        String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
-
-        String script = "a = load '"+ inputFileName + "';" +
-                "store a into '" + outputFileName + "' using " +
-                DUMMY_STORE_CLASS_NAME + "('true');";
-
-        for (ExecType execType : modes) {
-            Util.resetStateForExecModeSwitch();
-            if(execType == cluster.getExecType()) {
-                ps = new PigServer(cluster.getExecType(),
-                        cluster.getProperties());
-            } else {
-                Properties props = new Properties();
-                props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
-                ps = new PigServer(ExecType.LOCAL, props);
-            }
-            Util.deleteFile(ps.getPigContext(), TESTDIR);
-            ps.setBatchOn();
-            Util.createInputFile(ps.getPigContext(),
-                    inputFileName, inputData);
-            Util.registerMultiLineQuery(ps, script);
-            ps.executeBatch();
-            assertEquals(
-                    "Checking if file indicating that cleanupOnFailure failed 
" +
-                    " does not exists in " + execType + " mode", false,
-                    Util.exists(ps.getPigContext(), cleanupFailFile));
-            assertEquals(
-                    "Checking if file indicating that cleanupOnFailure was " +
-                    "successfully called exists in " + execType + " mode", 
true,
-                    Util.exists(ps.getPigContext(), cleanupSuccessFile));
-        }
-    }
-
-
-    @Test
-    public void testCleanupOnFailureMultiStore() throws Exception {
-        PigServer ps = null;
-        String outputFileName1 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
-        String outputFileName2 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
-
-        Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
-        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", 
Boolean.TRUE);
-        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", 
Boolean.TRUE);
-        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", 
Boolean.FALSE);
-        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", 
Boolean.FALSE);
-
-        String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
-
-        // though the second store should
-        // not cause a failure, the first one does and the result should be
-        // that both stores are considered to have failed
-        String script = "a = load '"+ inputFileName + "';" +
-                "store a into '" + outputFileName1 + "' using " +
-                DUMMY_STORE_CLASS_NAME + "('true', '1');" +
-                "store a into '" + outputFileName2 + "' using " +
-                DUMMY_STORE_CLASS_NAME + "('false', '2');";
-
-        for (ExecType execType : new ExecType[] {cluster.getExecType(), 
ExecType.LOCAL}) {
-            Util.resetStateForExecModeSwitch();
-            if(execType == cluster.getExecType()) {
-                ps = new PigServer(cluster.getExecType(),
-                        cluster.getProperties());
-            } else {
-                Properties props = new Properties();
-                props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
-                ps = new PigServer(ExecType.LOCAL, props);
-                // LocalJobRunner does not call abortTask
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.FALSE);
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"2", Boolean.FALSE);
-                if (Util.isHadoop1_x()) {
-                    // MAPREDUCE-1447/3563 (LocalJobRunner does not call 
methods of mapreduce
-                    // OutputCommitter) is fixed only in 0.23.1
-                    
filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", 
Boolean.FALSE);
-                    
filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", 
Boolean.FALSE);
-                    
filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.FALSE);
-                    
filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.FALSE);
-                }
-            }
-            Util.deleteFile(ps.getPigContext(), TESTDIR);
-            ps.setBatchOn();
-            Util.createInputFile(ps.getPigContext(),
-                    inputFileName, inputData);
-            Util.registerMultiLineQuery(ps, script);
-            ps.executeBatch();
-            for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
-                String condition = entry.getValue() ? "" : "not";
-                assertEquals("Checking if file " + entry.getKey() +
-                        " does " + condition + " exists in " + execType +
-                        " mode", (boolean) entry.getValue(),
-                        Util.exists(ps.getPigContext(), entry.getKey()));
-            }
-        }
-    }
-
-    // Test that "_SUCCESS" file is created when 
"mapreduce.fileoutputcommitter.marksuccessfuljobs"
-    // property is set to true
-    // The test covers multi store and single store case in local and 
mapreduce mode
-    // The test also checks that "_SUCCESS" file is NOT created when the 
property
-    // is not set to true in all the modes.
-    @Test
-    public void testSuccessFileCreation1() throws Exception {
-        PigServer ps = null;
-
-        try {
-            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
-
-            String multiStoreScript = "a = load '"+ inputFileName + "';" +
-                    "b = filter a by $0 == 'hello';" +
-                    "c = filter a by $0 == 'hi';" +
-                    "d = filter a by $0 == 'bye';" +
-                    "store b into '" + outputFileName + "_1';" +
-                    "store c into '" + outputFileName + "_2';" +
-                    "store d into '" + outputFileName + "_3';";
-
-            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
-                "store a into '" + outputFileName + "_1';" ;
-
-            for (ExecType execType : modes) {
-                for(boolean isPropertySet: new boolean[] { true, false}) {
-                    for(boolean isMultiStore: new boolean[] { true, false}) {
-                        String script = (isMultiStore ? multiStoreScript :
-                            singleStoreScript);
-                        Util.resetStateForExecModeSwitch();
-                        if(execType == cluster.getExecType()) {
-                            ps = new PigServer(cluster.getExecType(),
-                                    cluster.getProperties());
-                        } else {
-                            Properties props = new Properties();
-                            props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, 
"file:///");
-                            ps = new PigServer(ExecType.LOCAL, props);
-                        }
-                        ps.getPigContext().getProperties().setProperty(
-                                
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
-                                Boolean.toString(isPropertySet));
-                        Util.deleteFile(ps.getPigContext(), TESTDIR);
-                        ps.setBatchOn();
-                        Util.createInputFile(ps.getPigContext(),
-                                inputFileName, inputData);
-                        Util.registerMultiLineQuery(ps, script);
-                        ps.executeBatch();
-                        for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
-                            String sucFile = outputFileName + "_" + i + "/" +
-                                               
MapReduceLauncher.SUCCEEDED_FILE_NAME;
-                            assertEquals("Checking if _SUCCESS file exists in 
" +
-                                    execType + " mode", isPropertySet,
-                                    Util.exists(ps.getPigContext(), sucFile));
-                        }
-                    }
-                }
-            }
-        } finally {
-            Util.deleteFile(ps.getPigContext(), TESTDIR);
-        }
-    }
-
-    // Test _SUCCESS file is NOT created when job fails and when
-    // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to 
true
-    // The test covers multi store and single store case in local and 
mapreduce mode
-    // The test also checks that "_SUCCESS" file is NOT created when the 
property
-    // is not set to true in all the modes.
-    @Test
-    public void testSuccessFileCreation2() throws Exception {
-        PigServer ps = null;
-        try {
-            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
-            System.err.println("XXX: " + TestStore.FailUDF.class.getName());
-            String multiStoreScript = "a = load '"+ inputFileName + "';" +
-                    "b = filter a by $0 == 'hello';" +
-                    "b = foreach b generate " + FAIL_UDF_NAME + "($0);" +
-                    "c = filter a by $0 == 'hi';" +
-                    "d = filter a by $0 == 'bye';" +
-                    "store b into '" + outputFileName + "_1';" +
-                    "store c into '" + outputFileName + "_2';" +
-                    "store d into '" + outputFileName + "_3';";
-
-            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
-                "b = foreach a generate " + FAIL_UDF_NAME + "($0);" +
-                "store b into '" + outputFileName + "_1';" ;
-
-            for (ExecType execType : modes) {
-                for(boolean isPropertySet: new boolean[] { true, false}) {
-                    for(boolean isMultiStore: new boolean[] { true, false}) {
-                        String script = (isMultiStore ? multiStoreScript :
-                            singleStoreScript);
-                        Util.resetStateForExecModeSwitch();
-                        if(execType == cluster.getExecType()) {
-                            // since the job is guaranteed to fail, let's set
-                            // number of retries to 1.
-                            Properties props = cluster.getProperties();
-                            props.setProperty(MAP_MAX_ATTEMPTS, "1");
-                            ps = new PigServer(cluster.getExecType(), props);
-                        } else {
-                            Properties props = new Properties();
-                            props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, 
"file:///");
-                            // since the job is guaranteed to fail, let's set
-                            // number of retries to 1.
-                            props.setProperty(MAP_MAX_ATTEMPTS, "1");
-                            ps = new PigServer(ExecType.LOCAL, props);
-                        }
-                        ps.getPigContext().getProperties().setProperty(
-                                
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
-                                Boolean.toString(isPropertySet));
-                        Util.deleteFile(ps.getPigContext(), TESTDIR);
-                        ps.setBatchOn();
-                        Util.createInputFile(ps.getPigContext(),
-                                inputFileName, inputData);
-                        Util.registerMultiLineQuery(ps, script);
-                        try {
-                            ps.executeBatch();
-                        } catch(IOException ioe) {
-                            if(!ioe.getMessage().equals("FailUDFException")) {
-                                // an unexpected exception
-                                throw ioe;
-                            }
-                        }
-                        for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
-                            String sucFile = outputFileName + "_" + i + "/" +
-                                               
MapReduceLauncher.SUCCEEDED_FILE_NAME;
-                            assertEquals("Checking if _SUCCESS file exists in 
" +
-                                    execType + " mode", false,
-                                    Util.exists(ps.getPigContext(), sucFile));
-                        }
-                    }
-                }
-            }
-        } finally {
-            Util.deleteFile(ps.getPigContext(), TESTDIR);
-        }
-    }
-
-    /**
-     * Test whether "part-m-00000" file is created on empty output when
-     * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat 
is
-     * supported by Hadoop.
-     * The test covers multi store and single store case in local and 
mapreduce mode
-     *
-     * @throws IOException
-     */
-    @Test
-    public void testEmptyPartFileCreation() throws IOException {
-
-        boolean isLazyOutputPresent = true;
-        try {
-            Class<?> clazz = PigContext
-                    
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
-            clazz.getMethod("setOutputFormatClass", Job.class, Class.class);
-        }
-        catch (Exception e) {
-            isLazyOutputPresent = false;
-        }
-
-        //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0)
-        Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is 
skipped", isLazyOutputPresent);
-
-        PigServer ps = null;
-
-        try {
-            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
-
-            String multiStoreScript = "a = load '"+ inputFileName + "';" +
-                    "b = filter a by $0 == 'hey';" +
-                    "c = filter a by $1 == 'globe';" +
-                    "d = limit a 2;" +
-                    "e = foreach d generate *, 'x';" +
-                    "f = filter e by $3 == 'y';" +
-                    "store b into '" + outputFileName + "_1';" +
-                    "store c into '" + outputFileName + "_2';" +
-                    "store f into '" + outputFileName + "_3';";
-
-            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
-                    "b = filter a by $0 == 'hey';" +
-                    "store b into '" + outputFileName + "_1';" ;
-
-            for (ExecType execType : modes) {
-                for(boolean isMultiStore: new boolean[] { true, false}) {
-                    if (isMultiStore && (execType.equals(ExecType.LOCAL) ||
-                            execType.equals(ExecType.MAPREDUCE))) {
-                        // Skip this test for Mapreduce as MapReducePOStoreImpl
-                        // does not handle LazyOutputFormat
-                        continue;
-                    }
-
-                    String script = (isMultiStore ? multiStoreScript
-                            : singleStoreScript);
-                    Util.resetStateForExecModeSwitch();
-                    if(execType == cluster.getExecType()) {
-                        ps = new PigServer(cluster.getExecType(),
-                                cluster.getProperties());
-                    } else {
-                        Properties props = new Properties();
-                        props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, 
"file:///");
-                        ps = new PigServer(ExecType.LOCAL, props);
-                    }
-                    ps.getPigContext().getProperties().setProperty(
-                            PigConfiguration.PIG_OUTPUT_LAZY, "true");
-                    Util.deleteFile(ps.getPigContext(), TESTDIR);
-                    ps.setBatchOn();
-                    Util.createInputFile(ps.getPigContext(),
-                            inputFileName, inputData);
-                    Util.registerMultiLineQuery(ps, script);
-                    ps.executeBatch();
-                    Configuration conf = 
ConfigurationUtil.toConfiguration(ps.getPigContext().getProperties());
-                    for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
-                        assertEquals("For an empty output part-m-00000 should 
not exist in " + execType + " mode",
-                                null,
-                                getFirstOutputFile(conf, new 
Path(outputFileName + "_" + i), execType, true));
-                    }
-                }
-            }
-        } finally {
-            Util.deleteFile(ps.getPigContext(), TESTDIR);
-        }
-    }
-
     // A UDF which always throws an Exception so that the job can fail
     public static class FailUDF extends EvalFunc<String> {
 

Added: pig/trunk/test/org/apache/pig/test/TestStoreBase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Wed Jan 21 06:09:06 
2015
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.TestStore.DummyOutputCommitter;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class TestStoreBase {
+    protected ExecType mode;
+    protected String inputFileName;
+    protected String outputFileName;
+
+    protected static final String TESTDIR = "/tmp/" + 
TestStore.class.getSimpleName();
+
+    protected static final String DUMMY_STORE_CLASS_NAME
+    = "org.apache.pig.test.TestStore\\$DummyStore";
+
+    protected static final String FAIL_UDF_NAME
+    = "org.apache.pig.test.TestStore\\$FailUDF";
+    protected static final String MAP_MAX_ATTEMPTS = 
MRConfiguration.MAP_MAX_ATTEMPTS;
+
+    protected PigServer ps = null;
+
+    @Before
+    public void setUp() throws Exception {
+        inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + 
".txt";
+        outputFileName = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
+        setupPigServer();
+    }
+
+    abstract protected void setupPigServer() throws Exception;
+
+    @Test
+    public void testSetStoreSchema() throws Exception {
+        Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+        filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, 
Boolean.FALSE);
+        String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+
+        String script = "a = load '"+ inputFileName + "' as (a0:chararray, 
a1:chararray);" +
+                "store a into '" + outputFileName + "' using " +
+                DUMMY_STORE_CLASS_NAME + "();";
+
+        if(!mode.isLocal()) {
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, 
Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, 
Boolean.TRUE);
+        } else {
+            if (Util.isHadoop1_x()) {
+                // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods 
of mapreduce
+                // OutputCommitter) is fixed only in 0.23.1
+                filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, 
Boolean.FALSE);
+                filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, 
Boolean.FALSE);
+            }
+        }
+        ps.setBatchOn();
+        Util.deleteFile(ps.getPigContext(), TESTDIR);
+        Util.createInputFile(ps.getPigContext(),
+                inputFileName, inputData);
+        Util.registerMultiLineQuery(ps, script);
+        ps.executeBatch();
+        for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+            String condition = entry.getValue() ? "" : "not";
+            assertEquals("Checking if file " + entry.getKey() +
+                    " does " + condition + " exists in " + mode +
+                    " mode", (boolean) entry.getValue(),
+                    Util.exists(ps.getPigContext(), entry.getKey()));
+        }
+    }
+
+    @Test
+    public void testCleanupOnFailure() throws Exception {
+        String cleanupSuccessFile = outputFileName + 
"_cleanupOnFailure_succeeded";
+        String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed";
+        String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+
+        String script = "a = load '"+ inputFileName + "';" +
+                "store a into '" + outputFileName + "' using " +
+                DUMMY_STORE_CLASS_NAME + "('true');";
+
+        Util.deleteFile(ps.getPigContext(), TESTDIR);
+        ps.setBatchOn();
+        Util.createInputFile(ps.getPigContext(),
+                inputFileName, inputData);
+        Util.registerMultiLineQuery(ps, script);
+        ps.executeBatch();
+        assertEquals(
+                "Checking if file indicating that cleanupOnFailure failed " +
+                " does not exists in " + mode + " mode", false,
+                Util.exists(ps.getPigContext(), cleanupFailFile));
+        assertEquals(
+                "Checking if file indicating that cleanupOnFailure was " +
+                "successfully called exists in " + mode + " mode", true,
+                Util.exists(ps.getPigContext(), cleanupSuccessFile));
+    }
+
+    @Test
+    public void testCleanupOnFailureMultiStore() throws Exception {
+        String outputFileName1 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
+        String outputFileName2 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
+
+        Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", 
Boolean.TRUE);
+        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", 
Boolean.TRUE);
+        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", 
Boolean.FALSE);
+        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", 
Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", 
Boolean.FALSE);
+
+        String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+
+        // though the second store should
+        // not cause a failure, the first one does and the result should be
+        // that both stores are considered to have failed
+        String script = "a = load '"+ inputFileName + "';" +
+                "store a into '" + outputFileName1 + "' using " +
+                DUMMY_STORE_CLASS_NAME + "('true', '1');" +
+                "store a into '" + outputFileName2 + "' using " +
+                DUMMY_STORE_CLASS_NAME + "('false', '2');";
+
+        if(mode.isLocal()) {
+            // MR LocalJobRunner does not call abortTask
+            if (!mode.toString().startsWith("TEZ")) {
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.FALSE);
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"2", Boolean.FALSE);
+            }
+            if (Util.isHadoop1_x()) {
+                // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods 
of mapreduce
+                // OutputCommitter) is fixed only in 0.23.1
+                filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + 
"1", Boolean.FALSE);
+                filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + 
"2", Boolean.FALSE);
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + 
"1", Boolean.FALSE);
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + 
"2", Boolean.FALSE);
+            }
+        }
+        Util.deleteFile(ps.getPigContext(), TESTDIR);
+        ps.setBatchOn();
+        Util.createInputFile(ps.getPigContext(),
+                inputFileName, inputData);
+        Util.registerMultiLineQuery(ps, script);
+        ps.executeBatch();
+        for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+            String condition = entry.getValue() ? "" : "not";
+            assertEquals("Checking if file " + entry.getKey() +
+                    " does " + condition + " exists in " + mode +
+                    " mode", (boolean) entry.getValue(),
+                    Util.exists(ps.getPigContext(), entry.getKey()));
+        }
+    }
+
+    // Test that "_SUCCESS" file is created when 
"mapreduce.fileoutputcommitter.marksuccessfuljobs"
+    // property is set to true
+    // The test covers multi store and single store case in local and 
mapreduce mode
+    // The test also checks that "_SUCCESS" file is NOT created when the 
property
+    // is not set to true in all the modes.
+    @Test
+    public void testSuccessFileCreation1() throws Exception {
+        
+        try {
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
+
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hello';" +
+                    "c = filter a by $0 == 'hi';" +
+                    "d = filter a by $0 == 'bye';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" +
+                    "store d into '" + outputFileName + "_3';";
+
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                "store a into '" + outputFileName + "_1';" ;
+
+            for(boolean isPropertySet: new boolean[] { true, false}) {
+                for(boolean isMultiStore: new boolean[] { true, false}) {
+                    String script = (isMultiStore ? multiStoreScript :
+                        singleStoreScript);
+                    ps.getPigContext().getProperties().setProperty(
+                            
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
+                            Boolean.toString(isPropertySet));
+                    Util.deleteFile(ps.getPigContext(), TESTDIR);
+                    ps.setBatchOn();
+                    Util.createInputFile(ps.getPigContext(),
+                            inputFileName, inputData);
+                    Util.registerMultiLineQuery(ps, script);
+                    ps.executeBatch();
+                    for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                        String sucFile = outputFileName + "_" + i + "/" +
+                                           
MapReduceLauncher.SUCCEEDED_FILE_NAME;
+                        assertEquals("Checking if _SUCCESS file exists in " +
+                                mode + " mode", isPropertySet,
+                                Util.exists(ps.getPigContext(), sucFile));
+                    }
+                }
+            }
+        } finally {
+            Util.deleteFile(ps.getPigContext(), TESTDIR);
+        }
+    }
+
+    // Test _SUCCESS file is NOT created when job fails and when
+    // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to 
true
+    // The test covers multi store and single store case in local and 
mapreduce mode
+    // The test also checks that "_SUCCESS" file is NOT created when the 
property
+    // is not set to true in all the modes.
+    @Test
+    public void testSuccessFileCreation2() throws Exception {
+        try {
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
+            System.err.println("XXX: " + TestStore.FailUDF.class.getName());
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hello';" +
+                    "b = foreach b generate " + FAIL_UDF_NAME + "($0);" +
+                    "c = filter a by $0 == 'hi';" +
+                    "d = filter a by $0 == 'bye';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" +
+                    "store d into '" + outputFileName + "_3';";
+
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                "b = foreach a generate " + FAIL_UDF_NAME + "($0);" +
+                "store b into '" + outputFileName + "_1';" ;
+
+            for(boolean isPropertySet: new boolean[] { true, false}) {
+                for(boolean isMultiStore: new boolean[] { true, false}) {
+                    String script = (isMultiStore ? multiStoreScript :
+                        singleStoreScript);
+                    if (mode.isLocal()) {
+                        // since the job is guaranteed to fail, let's set
+                        // number of retries to 1.
+                        
ps.getPigContext().getProperties().setProperty(MAP_MAX_ATTEMPTS, "1");
+                    }
+                    ps.getPigContext().getProperties().setProperty(
+                            
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
+                            Boolean.toString(isPropertySet));
+                    Util.deleteFile(ps.getPigContext(), TESTDIR);
+                    ps.setBatchOn();
+                    Util.createInputFile(ps.getPigContext(),
+                            inputFileName, inputData);
+                    Util.registerMultiLineQuery(ps, script);
+                    try {
+                        ps.executeBatch();
+                    } catch(IOException ioe) {
+                        if(!ioe.getMessage().equals("FailUDFException")) {
+                            // an unexpected exception
+                            throw ioe;
+                        }
+                    }
+                    for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                        String sucFile = outputFileName + "_" + i + "/" +
+                                           
MapReduceLauncher.SUCCEEDED_FILE_NAME;
+                        assertEquals("Checking if _SUCCESS file exists in " +
+                                mode + " mode", false,
+                                Util.exists(ps.getPigContext(), sucFile));
+                    }
+                }
+            }
+        } finally {
+            Util.deleteFile(ps.getPigContext(), TESTDIR);
+        }
+    }
+
+    /**
+     * Test whether "part-m-00000" file is created on empty output when
+     * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat 
is
+     * supported by Hadoop.
+     * The test covers multi store and single store case in local and 
mapreduce mode
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testEmptyPartFileCreation() throws Exception {
+
+        boolean isLazyOutputPresent = true;
+        try {
+            Class<?> clazz = PigContext
+                    
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+            clazz.getMethod("setOutputFormatClass", Job.class, Class.class);
+        }
+        catch (Exception e) {
+            isLazyOutputPresent = false;
+        }
+
+        //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0)
+        Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is 
skipped", isLazyOutputPresent);
+
+        try {
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", 
"bye\tworld"};
+
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hey';" +
+                    "c = filter a by $1 == 'globe';" +
+                    "d = limit a 2;" +
+                    "e = foreach d generate *, 'x';" +
+                    "f = filter e by $3 == 'y';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" +
+                    "store f into '" + outputFileName + "_3';";
+
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hey';" +
+                    "store b into '" + outputFileName + "_1';" ;
+
+            for(boolean isMultiStore: new boolean[] { true, false}) {
+                if (isMultiStore && (mode.isLocal() ||
+                        mode.equals(ExecType.MAPREDUCE))) {
+                    // Skip this test for Mapreduce as MapReducePOStoreImpl
+                    // does not handle LazyOutputFormat
+                    continue;
+                }
+
+                String script = (isMultiStore ? multiStoreScript
+                        : singleStoreScript);
+                ps.getPigContext().getProperties().setProperty(
+                        PigConfiguration.PIG_OUTPUT_LAZY, "true");
+                Util.deleteFile(ps.getPigContext(), TESTDIR);
+                ps.setBatchOn();
+                Util.createInputFile(ps.getPigContext(),
+                        inputFileName, inputData);
+                Util.registerMultiLineQuery(ps, script);
+                ps.executeBatch();
+                Configuration conf = 
ConfigurationUtil.toConfiguration(ps.getPigContext().getProperties());
+                for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                    assertEquals("For an empty output part-m-00000 should not 
exist in " + mode + " mode",
+                            null,
+                            getFirstOutputFile(conf, new Path(outputFileName + 
"_" + i), mode, true));
+                }
+            }
+        } finally {
+            Util.deleteFile(ps.getPigContext(), TESTDIR);
+        }
+    }
+
+    public static Path getFirstOutputFile(Configuration conf, Path outputDir,
+            ExecType exectype, boolean isMapOutput) throws Exception {
+        FileSystem fs = outputDir.getFileSystem(conf);
+        FileStatus[] outputFiles = fs.listStatus(outputDir,
+                Util.getSuccessMarkerPathFilter());
+
+        boolean filefound = false;
+        if (outputFiles != null && outputFiles.length != 0) {
+            String name = outputFiles[0].getPath().getName();
+            if (exectype == Util.getLocalTestMode() || exectype == 
ExecType.MAPREDUCE) {
+                if (isMapOutput) {
+                    filefound = name.equals("part-m-00000");
+                } else {
+                    filefound = name.equals("part-r-00000");
+                }
+            } else {
+                filefound = name.startsWith("part-");
+            }
+        }
+        return filefound ? outputFiles[0].getPath() : null;
+    }
+}

Added: pig/trunk/test/org/apache/pig/test/TestStoreLocal.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreLocal.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreLocal.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestStoreLocal.java Wed Jan 21 06:09:06 
2015
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.Properties;
+
+import org.apache.pig.PigServer;
+import org.junit.Before;
+
+public class TestStoreLocal extends TestStoreBase {
+    @Before
+    public void setUp() throws Exception {
+        mode = Util.getLocalTestMode();
+        super.setUp();
+    }
+
+    @Override
+    protected void setupPigServer() throws Exception {
+        Properties props = new Properties();
+        ps = new PigServer(Util.getLocalTestMode(), props);
+    }
+}

Added: pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java (added)
+++ pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java Wed Jan 21 
06:09:06 2015
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.pig.PigConfiguration;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.test.TestPOPartialAggPlan;
+import org.junit.Test;
+
+public class TestPOPartialAggPlanTez extends TestPOPartialAggPlan {
+    @Test
+    public void testNoMapAggProp() throws Exception{
+        //test with pig.exec.mapPartAgg not set
+        String query = getGByQuery();
+
+        TezPlanContainer tezPlanContainer = 
TezUtil.buildTezPlanContainer(query, pc);
+        assertEquals(tezPlanContainer.size(), 1);
+
+        assertNull("POPartialAgg should be 
absent",findPOPartialAgg(tezPlanContainer));
+    }
+
+    @Test
+    public void testMapAggPropFalse() throws Exception{
+        //test with pig.exec.mapPartAgg set to false
+        String query = getGByQuery();
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"false");
+        TezPlanContainer tezPlanContainer = 
TezUtil.buildTezPlanContainer(query, pc);
+        assertEquals(tezPlanContainer.size(), 1);
+
+        assertNull("POPartialAgg should be absent", 
findPOPartialAgg(tezPlanContainer));
+    }
+
+    @Test
+    public void testMapAggPropTrue() throws Exception{
+        //test with pig.exec.mapPartAgg to true
+        String query = getGByQuery();
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
+        TezPlanContainer tezPlanContainer = 
TezUtil.buildTezPlanContainer(query, pc);
+        assertEquals(tezPlanContainer.size(), 1);
+
+        assertNotNull("POPartialAgg should be 
present",findPOPartialAgg(tezPlanContainer));
+
+    }
+
+    @Test
+    public void testMapAggNoAggFunc() throws Exception{
+        //no agg func, so there should not be a POPartial
+        String query = "l = load 'x' as (a,b,c);" +
+                "g = group l by a;" +
+                "f = foreach g generate group;";
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
+        TezPlanContainer tezPlanContainer = 
TezUtil.buildTezPlanContainer(query, pc);
+        assertEquals(tezPlanContainer.size(), 1);
+
+        assertNull("POPartialAgg should be 
absent",findPOPartialAgg(tezPlanContainer));
+    }
+
+    @Test
+    public void testMapAggNotCombinable() throws Exception{
+        //not combinable, so there should not be a POPartial
+        String query = "l = load 'x' as (a,b,c);" +
+                "g = group l by a;" +
+                "f = foreach g generate group, COUNT(l.b), l.b;";
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
+        TezPlanContainer tezPlanContainer = 
TezUtil.buildTezPlanContainer(query, pc);
+        assertEquals(tezPlanContainer.size(), 1);
+
+        assertNull("POPartialAgg should be absent", 
findPOPartialAgg(tezPlanContainer));
+    }
+
+    private PhysicalOperator findPOPartialAgg(TezPlanContainer 
tezPlanContainer) {
+        for (TezPlanContainerNode node : tezPlanContainer) {
+            TezOperPlan tezPlan = node.getTezOperPlan();
+            for (TezOperator tezOper : tezPlan) {
+                PhysicalOperator partialAgg = findPOPartialAgg(tezOper.plan);
+                if (partialAgg != null) {
+                    return partialAgg;
+                }
+            }
+        }
+        return null;
+    }
+}

Added: pig/trunk/test/org/apache/pig/tez/TezUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TezUtil.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TezUtil.java (added)
+++ pig/trunk/test/org/apache/pig/tez/TezUtil.java Wed Jan 21 06:09:06 2015
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.test.Util;
+
+public class TezUtil {
+    public static TezPlanContainer buildTezPlanContainer(String query, 
PigContext pc) throws Exception {
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+        PhysicalPlan pp = Util.buildPhysicalPlanFromNewLP(lp, pc);
+        TezPlanContainer tezPlanContainer = buildTezPlanWithOptimizer(pp, pc);
+        return tezPlanContainer;
+    }
+
+    public static TezPlanContainer buildTezPlanWithOptimizer(PhysicalPlan pp, 
PigContext pc) throws Exception {
+        MapRedUtil.checkLeafIsStore(pp, pc);
+        TezLauncher launcher = new TezLauncher();
+        return launcher.compile(pp, pc);
+    }
+}

Modified: pig/trunk/test/tez-local-tests
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/tez-local-tests?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/tez-local-tests (original)
+++ pig/trunk/test/tez-local-tests Wed Jan 21 06:09:06 2015
@@ -80,3 +80,6 @@
 **/TestRank3.java
 **/TestScalarAliasesLocal.java
 **/TestPigStatsTez.java
+**/TestStoreLocal.java
+**/TestPOPartialAggPlanTez.java
+**/TestMultiQueryLocal.java


Reply via email to