Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Mon May 29 15:00:39 
2017
@@ -41,11 +41,13 @@ import org.apache.commons.lang3.ArrayUti
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.OperatorPlan;
@@ -58,6 +60,7 @@ import org.apache.pig.tools.pigstats.Pig
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.spark.SparkJobStats;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
@@ -207,12 +210,13 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-            if (execType.toString().startsWith("tez")) {
-                assertEquals(1, stats.getNumberJobs());
-                assertEquals(stats.getJobGraph().size(), 1);
-            } else {
+            if (execType.equals("mapreduce")) {
                 assertEquals(2, stats.getNumberJobs());
                 assertEquals(stats.getJobGraph().size(), 2);
+            } else {
+                // Tez and Spark
+                assertEquals(1, stats.getNumberJobs());
+                assertEquals(stats.getJobGraph().size(), 1);
             }
 
             Configuration conf = 
ConfigurationUtil.toConfiguration(stats.getPigProperties());
@@ -274,6 +278,10 @@ public class TestPigRunner {
                 assertEquals(stats.getJobGraph().size(), 1);
                 // 5 vertices
                 
assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
+            } else if (execType.equals("spark")) {
+                // In spark mode,the number of spark job is calculated by the 
number of POStore.
+                // 1 POStore generates 1 spark job.
+                assertEquals(stats.getJobGraph().size(), 1);
             } else {
                 assertEquals(stats.getJobGraph().size(), 4);
             }
@@ -294,7 +302,12 @@ public class TestPigRunner {
                 //       Need to investigate
                 // assertEquals("B", ((JobStats) 
stats.getJobGraph().getPredecessors(
                 //        js).get(0)).getAlias());
+            } else if (execType.equals("spark")) {
+                assertEquals("A,B", ((JobStats) 
stats.getJobGraph().getSources().get(
+                        0)).getAlias());
+                // TODO: alias is not set for 
sample-aggregation/partition/sort job.
             } else {
+
                 assertEquals("A", ((JobStats) 
stats.getJobGraph().getSources().get(
                         0)).getAlias());
                 assertEquals("B", ((JobStats) 
stats.getJobGraph().getPredecessors(
@@ -323,7 +336,14 @@ public class TestPigRunner {
             String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertTrue(stats.getJobGraph().size() == 1);
+            if (execType.equals("spark")) {
+                // In spark mode,the number of spark job is calculated by the 
number of POStore.
+                // 2 POStore generates 2 spark jobs.
+                assertTrue(stats.getJobGraph().size() == 2);
+            } else {
+                assertTrue(stats.getJobGraph().size() == 1);
+            }
+
             // Each output file should include the following:
             // output:
             //   1\t2\t3\n
@@ -372,7 +392,13 @@ public class TestPigRunner {
             String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertEquals(stats.getJobGraph().size(), 1);
+            if (execType.equals("spark")) {
+                // In spark mode,the number of spark job is calculated by the 
number of POStore.
+                // 2 POStore generates 2 spark jobs.
+                assertEquals(stats.getJobGraph().size(), 2);
+            } else {
+                assertEquals(stats.getJobGraph().size(), 1);
+            }
 
             // Each output file should include the following:
             // output:
@@ -430,11 +456,15 @@ public class TestPigRunner {
         w.close();
 
         try {
-            String[] args = { "-x", execType, PIG_FILE };
-            PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
+          String[] args = null;
+          args = new String[]{"-x", execType, PIG_FILE};
+          PigStats stats =  PigRunner.run(args, new 
TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
             if (Util.isMapredExecType(cluster.getExecType())) {
                 assertEquals(3, stats.getJobGraph().size());
+            } else if (Util.isSparkExecType(cluster.getExecType())) {
+                // One for each store and 3 for join.
+                assertEquals(4, stats.getJobGraph().size());
             } else {
                 assertEquals(1, stats.getJobGraph().size());
             }
@@ -475,10 +505,14 @@ public class TestPigRunner {
             });
             assertEquals(5, inputStats.get(0).getNumberRecords());
             assertEquals(3, inputStats.get(1).getNumberRecords());
-            // For mapreduce, since hdfs bytes read includes replicated tables 
bytes read is wrong
             // Since Tez does has only one load per job its values are correct
+            // the result of inputStats in spark mode is also correct
             if (!Util.isMapredExecType(cluster.getExecType())) {
                 assertEquals(30, inputStats.get(0).getBytes());
+            }
+
+            //TODO PIG-5240:Fix TestPigRunner#simpleMultiQueryTest3 in spark 
mode for wrong inputStats
+            if (!Util.isMapredExecType(cluster.getExecType()) && 
!Util.isSparkExecType(cluster.getExecType())) {
                 assertEquals(18, inputStats.get(1).getBytes());
             }
         } finally {
@@ -504,17 +538,17 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, null);
             Iterator<JobStats> iter = stats.getJobGraph().iterator();
             while (iter.hasNext()) {
-                 JobStats js=iter.next();
-                 if (execType.equals("tez")) {
-                     assertEquals(js.getState().name(), "FAILED");
-                 } else {
-                     if(js.getState().name().equals("FAILED")) {
-                         List<Operator> 
ops=stats.getJobGraph().getSuccessors(js);
-                         for(Operator op : ops ) {
-                             
assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
-                         }
-                     }
-                 }
+                JobStats js=iter.next();
+                if (execType.equals("mapreduce")) {
+                    if (js.getState().name().equals("FAILED")) {
+                        List<Operator> ops = 
stats.getJobGraph().getSuccessors(js);
+                        for (Operator op : ops) {
+                            assertEquals(((JobStats) 
op).getState().toString(), "UNKNOWN");
+                        }
+                    }
+                } else {
+                    assertEquals(js.getState().name(), "FAILED");
+                }
             }
         } finally {
             new File(PIG_FILE).delete();
@@ -723,8 +757,14 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-
-            assertEquals(1, stats.getNumberJobs());
+            //In spark mode, one POStore will generate a spark action(spark 
job).
+            //In this case, the sparkplan has 1 sparkOperator(after multiquery 
optimization) but has 2 POStores
+            //which generate 2 spark actions(spark jobs).
+            if (execType.equals("spark")) {
+                assertEquals(2, stats.getNumberJobs());
+            } else {
+                assertEquals(1, stats.getNumberJobs());
+            }
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
             for (OutputStats outstats : outputs) {
@@ -835,7 +875,14 @@ public class TestPigRunner {
 
             assertTrue(!stats.isSuccessful());
             assertTrue(stats.getReturnCode() != 0);
-            assertTrue(stats.getOutputStats().size() == 0);
+            if (execType.equals("spark")) {
+                //Currently, even if failed, spark engine will add a failed 
OutputStats,
+                // see: SparkPigStats.addFailJobStats()
+                assertTrue(stats.getOutputStats().size() == 1);
+                assertTrue(stats.getOutputStats().get(0).isSuccessful() == 
false);
+            } else {
+                assertTrue(stats.getOutputStats().size() == 0);
+            }
 
         } finally {
             new File(PIG_FILE).delete();
@@ -858,7 +905,14 @@ public class TestPigRunner {
 
             assertTrue(!stats.isSuccessful());
             assertTrue(stats.getReturnCode() != 0);
-            assertTrue(stats.getOutputStats().size() == 0);
+            //Currently, even if failed, spark engine will add a failed 
OutputStats,
+            // see: SparkPigStats.addFailJobStats()
+            if (execType.equals("spark")) {
+                assertTrue(stats.getOutputStats().size() == 1);
+                assertTrue(stats.getOutputStats().get(0).isSuccessful() == 
false);
+            } else {
+                assertTrue(stats.getOutputStats().size() == 0);
+            }
 
         } finally {
             new File(PIG_FILE).delete();
@@ -919,8 +973,14 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-
-            assertEquals(1, stats.getNumberJobs());
+            //In spark mode, one POStore will generate a spark action(spark 
job).
+            //In this case, the sparkplan has 1 sparkOperator(after multiquery 
optimization) but has 2 POStores
+            //which generate 2 spark actions(spark jobs).
+            if (execType.equals("spark")) {
+                assertEquals(2, stats.getNumberJobs());
+            } else {
+                assertEquals(1, stats.getNumberJobs());
+            }
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
             for (OutputStats outstats : outputs) {
@@ -968,7 +1028,13 @@ public class TestPigRunner {
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(1, outputs.size());
             OutputStats outstats = outputs.get(0);
-            assertEquals(9, outstats.getNumberRecords());
+            //In spark mode, if pig.disable.counter = true, the number of 
records of the
+            //output are not calculated.
+            if (execType.equals("spark")) {
+                assertEquals(-1, outstats.getNumberRecords());
+            } else {
+                assertEquals(9, outstats.getNumberRecords());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -1008,6 +1074,28 @@ public class TestPigRunner {
                         MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
                 assertEquals(new 
File(INPUT_FILE).length(),counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
                         MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+            } else if (execType.equals("spark")) {
+                /** Uncomment code until changes of PIG-4788 are merged to 
master
+                //There are 2 spark jobs because of 2 POStore although the 
spark plan is optimized by multiquery optimization.
+                List<JobStats> jobs = stats.getJobGraph().getJobList();
+                JobStats firstJob = jobs.get(0);
+                JobStats secondJob = jobs.get(1);
+                //the hdfs_bytes_read of two spark jobs are same(because the 
two spark jobs have same poLoad), we only
+                //use one of those to compare with expected hdfs_bytes_read(30)
+                //we count the hdfs_bytes_written of the two spark jobs to 
calculate the total hdfs_bytes_written
+                long hdfs_bytes_read = 0;
+                long hdfs_bytes_written = 0;
+
+                hdfs_bytes_read += 
firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_READ).getValue();
+                hdfs_bytes_written += 
firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
+                hdfs_bytes_written += 
secondJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
+
+                assertEquals(30, hdfs_bytes_read);
+                assertEquals(20, hdfs_bytes_written);
+                 **/
             } else {
                 Counters counter= 
((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
                 assertEquals(5, 
counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
@@ -1050,8 +1138,12 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-
-            assertEquals(1, stats.getNumberJobs());
+            if (execType.equals("spark")) {
+                //2 POStore generates 2 spark jobs
+                assertEquals(2, stats.getNumberJobs());
+            } else {
+                assertEquals(1, stats.getNumberJobs());
+            }
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
             if (execType.equals("tez")) {
@@ -1072,7 +1164,11 @@ public class TestPigRunner {
             List<InputStats> inputs = stats.getInputStats();
             assertEquals(1, inputs.size());
             InputStats instats = inputs.get(0);
-            assertEquals(5, instats.getNumberRecords());
+            if (execType.equals("spark")) {
+                assertEquals(-1, instats.getNumberRecords());
+            } else {
+                assertEquals(5, instats.getNumberRecords());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);

Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Mon May 29 15:00:39 
2017
@@ -62,6 +62,7 @@ import org.apache.pig.impl.util.Properti
 import org.apache.pig.impl.util.Utils;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -547,9 +548,8 @@ public class TestPigServer {
     public void testExplainXmlComplex() throws Throwable {
         // TODO: Explain XML output is not supported in non-MR mode. Remove the
         // following condition once it's implemented in Tez.
-        if (cluster.getExecType() != ExecType.MAPREDUCE) {
-            return;
-        }
+        String execType = cluster.getExecType().toString().toLowerCase();
+        Assume.assumeTrue("Skip this test for TEZ", 
Util.isMapredExecType(cluster.getExecType()) || 
Util.isSparkExecType(cluster.getExecType()));
         PigServer pig = new PigServer(cluster.getExecType(), properties);
         pig.registerQuery("a = load 'a' as (site: chararray, count: int, 
itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: 
float, m: map[]) } ) ;") ;
         pig.registerQuery("b = foreach a generate site, count, 
FLATTEN(itemCounts);") ;
@@ -574,6 +574,55 @@ public class TestPigServer {
         assertEquals(1, physicalPlan.getLength());
         assertTrue(physicalPlan.item(0).getTextContent().contains("Not 
Supported"));
 
+
+        if (execType.equals(ExecType.MAPREDUCE.name().toLowerCase())){
+            verifyExplainXmlComplexMR(doc);
+        } else if (execType.equals(MiniGenericCluster.EXECTYPE_SPARK)){
+            verifyExplainXmlComplexSpark(doc);
+        }
+
+
+    }
+
+    private void verifyExplainXmlComplexSpark(Document doc) {
+        NodeList stores = doc.getElementsByTagName("POStore");
+        assertEquals(1, stores.getLength());
+
+        NodeList groups = doc.getElementsByTagName("POJoinGroupSpark");
+        assertEquals(2, groups.getLength());
+
+        Node innerGroup = groups.item(1);
+
+        NodeList groupChildren = innerGroup.getChildNodes();
+
+        int foreachCount = 0;
+        int castCount = 0;
+        int loadCount = 0;
+
+        for (int i = 0; i < groupChildren.getLength(); i++) {
+            Node node = groupChildren.item(i);
+            if (node.getNodeName().equals("POForEach")){
+                ++foreachCount;
+                NodeList foreachNodes = node.getChildNodes();
+                for (int j = 0; j < foreachNodes.getLength(); j++) {
+                    Node innerNode = foreachNodes.item(j);
+                    if (innerNode.getNodeName().equals("alias")){
+                        assertEquals("b",innerNode.getTextContent());
+                    }else if (innerNode.getNodeName().equals("POCast")){
+                        ++castCount;
+                    }else if (innerNode.getNodeName().equals("POLoad")) {
+                        ++loadCount;
+                    }
+                }
+            }
+        }
+
+        assertEquals(1,foreachCount);
+        assertEquals(3,castCount);
+        assertEquals(1,loadCount);
+    }
+
+    private void verifyExplainXmlComplexMR(Document doc) {
         //Verify we have two loads and one is temporary
         NodeList loads = doc.getElementsByTagName("POLoad");
         assertEquals(2, loads.getLength());

Modified: pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java Mon May 29 
15:00:39 2017
@@ -255,6 +255,16 @@ public class TestPigServerLocal {
             _testSkipParseInRegisterForBatch(false, 8, 4);
             _testSkipParseInRegisterForBatch(true, 5, 1);
             _testParseBatchWithScripting(5, 1);
+        } else if (Util.getLocalTestMode().toString().startsWith("SPARK")) {
+            // 6 = 4 (Once per registerQuery) + 2 (SortConverter , 
PigRecordReader)
+            // 4 (Once per registerQuery)
+            _testSkipParseInRegisterForBatch(false, 6, 4);
+
+            // 3 = 1 (registerQuery) + 2 (SortConverter, PigRecordReader)
+            // 1 (registerQuery)
+            _testSkipParseInRegisterForBatch(true, 3, 1);
+
+            _testParseBatchWithScripting(3, 1);
         } else {
             // numTimesInitiated = 10. 4 (once per registerQuery) + 6 
(launchPlan->RandomSampleLoader,
             // InputSizeReducerEstimator, getSplits->RandomSampleLoader,

Modified: pig/trunk/test/org/apache/pig/test/TestProjectRange.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestProjectRange.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestProjectRange.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestProjectRange.java Mon May 29 
15:00:39 2017
@@ -644,13 +644,14 @@ public class TestProjectRange  {
 
         Iterator<Tuple> it = pigServer.openIterator("f");
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            "(10,{(10,20,30,40,50)})",
-                            "(11,{(11,21,31,41,51)})",
-                    });
-        Util.checkQueryOutputs(it, expectedRes);
+        String[] expectedRes =
+                new String[] {
+                        "(10,{(10,20,30,40,50)})",
+                        "(11,{(11,21,31,41,51)})",
+                };
+        Schema s = pigServer.dumpSchema("f");
+        Util.checkQueryOutputs(it, 
expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     /**
@@ -732,12 +733,14 @@ public class TestProjectRange  {
 
         Iterator<Tuple> it = pigServer.openIterator("f");
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            "(1,{(11,21,31,41,51),(10,20,30,40,50)})",
-                    });
-        Util.checkQueryOutputs(it, expectedRes);
+        String[] expectedRes =
+                new String[] {
+                        "(1,{(11,21,31,41,51),(10,20,30,40,50)})",
+                };
+        Schema s = pigServer.dumpSchema("f");
+        Util.checkQueryOutputs(it, 
expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
+
     }
 
     private LOSort checkNumExpressionPlansForSort(LogicalPlan lp, int 
numPlans, boolean[] isAsc) {
@@ -925,8 +928,8 @@ public class TestProjectRange  {
             "  g = group l1 by   .. c,  l2 by .. c;"
             ;
         String expectedSchStr = "grp: (a: int,b: long,c: int)," +
-                       "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
-                       "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
+                "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
+                "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";
 
         Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
         compileAndCompareSchema(expectedSch, query, "g");
@@ -938,14 +941,15 @@ public class TestProjectRange  {
 
         Util.registerMultiLineQuery(pigServer, query);
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            
"((10,20,30),{(10,20,30,40,50)},{(10,20,30,40,50)})",
-                            
"((11,21,31),{(11,21,31,41,51)},{(11,21,31,41,51)})",
-                    });
+        String[] expectedRes =
+                new String[] {
+                        "((10,20,30),{(10,20,30,40,50)},{(10,20,30,40,50)})",
+                        "((11,21,31),{(11,21,31,41,51)},{(11,21,31,41,51)})",
+                };
         Iterator<Tuple> it = pigServer.openIterator("g");
-        Util.checkQueryOutputs(it, expectedRes);
+        Schema s = pigServer.dumpSchema("g");
+        Util.checkQueryOutputs(it, 
expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     /**
@@ -1005,14 +1009,15 @@ public class TestProjectRange  {
 
         Util.registerMultiLineQuery(pigServer, query);
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            
"((30,30,40,50),{(10,20,30,40,50)},{(10,20,30,40,50)})",
-                            
"((32,31,41,51),{(11,21,31,41,51)},{(11,21,31,41,51)})",
-                    });
+        String[] expectedRes =
+                new String[] {
+                        
"((30,30,40,50),{(10,20,30,40,50)},{(10,20,30,40,50)})",
+                        
"((32,31,41,51),{(11,21,31,41,51)},{(11,21,31,41,51)})",
+                };
         Iterator<Tuple> it = pigServer.openIterator("g");
-        Util.checkQueryOutputs(it, expectedRes);
+        Schema s = pigServer.dumpSchema("g");
+        Util.checkQueryOutputs(it,expectedRes, 
org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1056,14 +1061,15 @@ public class TestProjectRange  {
 
         Util.registerMultiLineQuery(pigServer, query);
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            "((30,40,50),{(10,20L,30,40,50)})",
-                            "((31,41,51),{(11,21L,31,41,51)})",
-                    });
+        String[] expectedRes =
+                new String[] {
+                        "((30,40,50),{(10,20L,30,40,50)})",
+                        "((31,41,51),{(11,21L,31,41,51)})",
+                };
         Iterator<Tuple> it = pigServer.openIterator("lim");
-        Util.checkQueryOutputsAfterSort(it, expectedRes);
+        Schema s = pigServer.dumpSchema("lim");
+        Util.checkQueryOutputs(it, 
expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
 
@@ -1118,14 +1124,15 @@ public class TestProjectRange  {
 
         Util.registerMultiLineQuery(pigServer, query);
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            "((30,40,50),{(10,20,30,40,50)})",
-                            "((31,41,51),{(11,21,31,41,51)})",
-                    });
+        String[] expectedRes =
+                new String[] {
+                        "((30,40,50),{(10,20,30,40,50)})",
+                        "((31,41,51),{(11,21,31,41,51)})",
+                };
         Iterator<Tuple> it = pigServer.openIterator("g");
-        Util.checkQueryOutputs(it, expectedRes);
+        Schema s = pigServer.dumpSchema("g");
+        Util.checkQueryOutputs(it, expectedRes, 
org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     private void setAliasesToNull(Schema schema) {
@@ -1157,14 +1164,15 @@ public class TestProjectRange  {
 
         Util.registerMultiLineQuery(pigServer, query);
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            "(10,20,30,40,50,10,20,30,40,50)",
-                            "(11,21,31,41,51,11,21,31,41,51)",
-                    });
+        String[] expectedRes =
+                new String[] {
+                        "(10,20,30,40,50,10,20,30,40,50)",
+                        "(11,21,31,41,51,11,21,31,41,51)",
+                };
         Iterator<Tuple> it = pigServer.openIterator("j");
-        Util.checkQueryOutputs(it, expectedRes);
+        Schema s = pigServer.dumpSchema("j");
+        Util.checkQueryOutputs(it, expectedRes, 
org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1185,14 +1193,15 @@ public class TestProjectRange  {
 
         Util.registerMultiLineQuery(pigServer, query);
 
-        List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
-                    new String[] {
-                            "(10,20,30,40,50,10,20,30,40,50)",
-                            "(11,21,31,41,51,11,21,31,41,51)",
-                    });
+        String[] expectedRes =
+                new String[] {
+                        "(10,20,30,40,50,10,20,30,40,50)",
+                        "(11,21,31,41,51,11,21,31,41,51)",
+                };
         Iterator<Tuple> it = pigServer.openIterator("j");
-        Util.checkQueryOutputs(it, expectedRes);
+        Schema s = pigServer.dumpSchema("j");
+        Util.checkQueryOutputs(it, expectedRes, 
org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1204,7 +1213,7 @@ public class TestProjectRange  {
             "  l2 = load '" + INP_FILE_5FIELDS +  "';" +
             "  g = cogroup l1 by  ($0 ..  ),  l2 by ($0 .. );";
         Util.checkExceptionMessage(query, "g", "Cogroup/Group by '*' or 'x..' 
" +
-                       "(range of columns to the end) " +
+                "(range of columns to the end) " +
                         "is only allowed if the input has a schema");
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Mon May 29 15:00:39 
2017
@@ -554,27 +554,9 @@ public class TestPruneColumn {
         pigServer.registerQuery("C = cogroup A by $1, B by $1;");
         pigServer.registerQuery("D = foreach C generate AVG($1.$1);");
         Iterator<Tuple> iter = pigServer.openIterator("D");
-
-        assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-
-        assertEquals(1, t.size());
-        assertNull(t.get(0));
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
-
-        assertEquals(1, t.size());
-        assertEquals("2.0", t.get(0).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
-
-        assertEquals(1, t.size());
-        assertEquals("5.0", t.get(0).toString());
-
-        assertFalse(iter.hasNext());
-
+        String[] expectedRes = new String[]{"()","(2.0)","(5.0)"};
+        Schema s = pigServer.dumpSchema("D");
+        
Util.checkQueryOutputsAfterSortRecursive(iter,expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: 
$0"}));
     }
 
@@ -596,26 +578,19 @@ public class TestPruneColumn {
 
     @Test
     public void testCoGroup3() throws Exception {
-        pigServer.registerQuery("A = load '"+ 
Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, 
a1:int, a2);");
+        pigServer.registerQuery("A = load '" + 
Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, 
a1:int, a2);");
         pigServer.registerQuery("B = group A by $1;");
         pigServer.registerQuery("C = foreach B generate $1, '1';");
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-
-        assertEquals(2, t.size());
-        assertEquals("{(1,2,3)}", t.get(0).toString());
-        assertEquals("1", t.get(1).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
 
-        assertEquals(2, t.size());
-        assertEquals("{(2,5,2)}", t.get(0).toString());
-        assertEquals("1", t.get(1).toString());
-
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "({(1,2,3)},1)",
+                "({(2,5,2)},1)"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -629,27 +604,14 @@ public class TestPruneColumn {
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-
-        assertEquals(2, t.size());
-        assertEquals("{}", t.get(0).toString());
-        assertEquals("{(1)}", t.get(1).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
-
-        assertEquals(2, t.size());
-        assertEquals("{(2)}", t.get(0).toString());
-        assertEquals("{(2)}", t.get(1).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
 
-        assertEquals(2, t.size());
-        assertEquals("{(5)}", t.get(0).toString());
-        assertEquals("{}", t.get(1).toString());
-
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "({},{(1)})",
+                "({(2)},{(2)})",
+                "({(5)},{})"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -755,22 +717,12 @@ public class TestPruneColumn {
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-
-        assertEquals(3, t.size());
-        assertEquals("{}", t.get(0).toString());
-        assertEquals("1", t.get(1).toString());
-        assertEquals("1", t.get(2).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
 
-        assertEquals(3, t.size());
-        assertEquals("{(1,2,3)}", t.get(0).toString());
-        assertEquals("2", t.get(1).toString());
-        assertEquals("2", t.get(2).toString());
-
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "({},1,1)",
+                "({(1,2,3)},2,2)"
+        };
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -784,24 +736,13 @@ public class TestPruneColumn {
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-
-        assertEquals(4, t.size());
-        assertEquals("1", t.get(0).toString());
-        assertEquals("2", t.get(1).toString());
-        assertEquals("3", t.get(2).toString());
-        assertEquals("{(2)}", t.get(3).toString());
 
-        assertTrue(iter.hasNext());
-        t = iter.next();
-
-        assertEquals(4, t.size());
-        assertEquals("2", t.get(0).toString());
-        assertEquals("5", t.get(1).toString());
-        assertEquals("2", t.get(2).toString());
-        assertEquals("{}", t.get(3).toString());
-
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "(1,2,3,{(2)})",
+                "(2,5,2,{})"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
+            Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -900,20 +841,13 @@ public class TestPruneColumn {
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-
-        assertEquals(2, t.size());
-        assertEquals("1", t.get(0).toString());
-        assertEquals("1", t.get(1).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
 
-        assertEquals(2, t.size());
-        assertEquals("2", t.get(0).toString());
-        assertEquals("2", t.get(1).toString());
-
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "(1,1)",
+                "(2,2)"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), 
+             Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, 
$2",
             "Columns pruned for B: $1"}));
@@ -1022,20 +956,14 @@ public class TestPruneColumn {
         pigServer.registerQuery("B = group A by *;");
         pigServer.registerQuery("C = foreach B generate $0;");
         Iterator<Tuple> iter = pigServer.openIterator("C");
-
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
 
-        assertEquals(1, t.size());
-        assertEquals("(1,2,3)", t.get(0).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
-
-        assertEquals(1, t.size());
-        assertEquals("(2,5,2)", t.get(0).toString());
-
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "((1,2,3))",
+                "((2,5,2))"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -1578,21 +1506,16 @@ public class TestPruneColumn {
         Iterator<Tuple> iter = pigServer.openIterator("G");
 
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-        assertEquals("({(1)},1)", t.toString());
-        
-        assertTrue(iter.hasNext());
-        t = iter.next();
-        assertEquals("({(2),(2)},2)", t.toString());
-        
-        assertTrue(iter.hasNext());
-        t = iter.next();
-        assertEquals("({(3),(3),(3)},3)", t.toString());
 
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "({(1)},1)",
+                "({(2),(2)},2)",
+                "({(3),(3),(3)},3)"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")),
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: 
$1"}));
-        
         
pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
     }
 
@@ -1604,26 +1527,15 @@ public class TestPruneColumn {
         pigServer.registerQuery("D = foreach C generate $0, $1;");
 
         Iterator<Tuple> iter = pigServer.openIterator("D");
-
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-        assertEquals(2, t.size());
-        assertEquals("1", t.get(0).toString());
-        assertEquals("{}", t.get(1).toString());
 
-        assertTrue(iter.hasNext());
-        t = iter.next();
-        assertEquals(2, t.size());
-        assertEquals("2", t.get(0).toString());
-        assertEquals("{(1,2,3)}", t.get(1).toString());
-
-        assertTrue(iter.hasNext());
-        t = iter.next();
-        assertEquals(2, t.size());
-        assertEquals("5", t.get(0).toString());
-        assertEquals("{(2,5,2)}", t.get(1).toString());
-
-        assertFalse(iter.hasNext());
+        String[] expected = new String[] {
+                "(1,{})",
+                "(2,{(1,2,3)})",
+                "(5,{(2,5,2)})"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D"))
+                , Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: 
$0"}));
     }
@@ -1947,12 +1859,14 @@ public class TestPruneColumn {
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
         assertTrue(iter.hasNext());
-        Tuple t = iter.next();
-        assertEquals("(1,2,3,1)", t.toString());
 
-        assertTrue(iter.hasNext());
-        t = iter.next();
-        assertEquals("(2,5,2,2)", t.toString());
+        String[] expected = new String[] {
+                "(1,2,3,1)",
+                "(2,5,2,2)"
+        };
+        Util.checkQueryOutputs(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+                Util.isSparkExecType(Util.getLocalTestMode()));
+
 
         assertTrue(emptyLogFileMessage());
     }

Modified: pig/trunk/test/org/apache/pig/test/TestRank1.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank1.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestRank1.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestRank1.java Mon May 29 15:00:39 2017
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;

Modified: pig/trunk/test/org/apache/pig/test/TestRank2.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank2.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestRank2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestRank2.java Mon May 29 15:00:39 2017
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;

Modified: pig/trunk/test/org/apache/pig/test/TestRank3.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank3.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestRank3.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestRank3.java Mon May 29 15:00:39 2017
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;

Modified: pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSecondarySort.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Mon May 29 
15:00:39 2017
@@ -18,7 +18,6 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -37,6 +36,7 @@ import org.apache.pig.impl.io.FileLocali
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.VisitorException;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -237,11 +237,11 @@ public abstract class TestSecondarySort
         pigServer
                 .registerQuery("D = foreach C { E = limit A 10; F = E.a1; G = 
DISTINCT F; generate group, COUNT(G);};");
         Iterator<Tuple> iter = pigServer.openIterator("D");
-        assertTrue(iter.hasNext());
-        assertEquals("(2,1)", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("(1,2)", iter.next().toString());
-        assertFalse(iter.hasNext());
+        String[] expectedRes = new String[]{"(2,1)","(1,2)"};
+        Schema s = pigServer.dumpSchema("D");
+        Util.checkQueryOutputs(iter,expectedRes,org.apache
+                .pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
         Util.deleteFile(cluster, file1ClusterPath);
         Util.deleteFile(cluster, file2ClusterPath);
     }
@@ -265,11 +265,14 @@ public abstract class TestSecondarySort
         pigServer.registerQuery("B = group A by $0 parallel 2;");
         pigServer.registerQuery("C = foreach B { D = distinct A; generate 
group, D;};");
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        assertTrue(iter.hasNext());
-        assertEquals("(2,{(2,3,4)})", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("(1,{(1,2,3),(1,2,4),(1,3,4)})", iter.next().toString());
-        assertFalse(iter.hasNext());
+        Schema s = pigServer.dumpSchema("C");
+        String expected[] = {
+                "(2,{(2,3,4)})",
+                "(1,{(1,2,3),(1,2,4),(1,3,4)})"
+        };
+        Util.checkQueryOutputs(iter, expected, org.apache
+                .pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(Util.getLocalTestMode()));
         Util.deleteFile(cluster, clusterPath);
     }
 
@@ -359,15 +362,10 @@ public abstract class TestSecondarySort
         pigServer.registerQuery("D = ORDER C BY group;");
         pigServer.registerQuery("E = foreach D { F = limit A 10; G = ORDER F 
BY a2; generate group, COUNT(G);};");
         Iterator<Tuple> iter = pigServer.openIterator("E");
-        assertTrue(iter.hasNext());
-        assertEquals("((1,2),4)", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("((1,3),1)", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("((1,4),0)", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("((2,3),1)", iter.next().toString());
-        assertFalse(iter.hasNext());
+        Schema s = pigServer.dumpSchema("E");
+        String[] expectedRes = new 
String[]{"((1,2),4)","((1,3),1)","((1,4),0)","((2,3),1)"};
+        Util.checkQueryOutputs(iter, 
expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+                       Util.isSparkExecType(cluster.getExecType()));
         Util.deleteFile(cluster, clusterPath1);
         Util.deleteFile(cluster, clusterPath2);
     }
@@ -515,6 +513,8 @@ public abstract class TestSecondarySort
     @Test
     // Once custom partitioner is used, we cannot use secondary key optimizer, 
see PIG-3827
     public void testCustomPartitionerWithSort() throws Exception {
+        Assume.assumeFalse("Skip this test for Spark", 
Util.isSparkExecType(cluster.getExecType()));
+
         File tmpFile1 = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
         ps1.println("1\t2\t3");

Modified: pig/trunk/test/org/apache/pig/test/TestStoreBase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Mon May 29 15:00:39 
2017
@@ -144,24 +144,66 @@ public abstract class TestStoreBase {
         String outputFileName2 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
 
         Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
-        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", 
Boolean.TRUE);
-        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", 
Boolean.TRUE);
-        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", 
Boolean.FALSE);
-        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", 
Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", 
Boolean.FALSE);
+        if (mode.toString().startsWith("SPARK")) {
+            filesToVerify.put(outputFileName1 + 
"_cleanupOnFailure_succeeded1", Boolean.TRUE);
+            filesToVerify.put(outputFileName2 + 
"_cleanupOnFailure_succeeded2", Boolean.TRUE);
+            filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", 
Boolean.FALSE);
+            filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", 
Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", 
Boolean.TRUE);
+             /* A = load xx;
+             store A into '1.out' using DummyStore('true','1');   -- first job 
should fail
+             store A into '2.out' using DummyStore('false','1');  -- second 
job should success
+             After multiQuery optimization the spark plan will be:
+           Split - scope-14
+            |   |
+            |   a: Store(hdfs://1.out:myudfs.DummyStore('true','1')) - scope-4
+            |   |
+            |   a: Store(hdfs://2.out:myudfs.DummyStore('false','1')) - scope-7
+            |
+            |---a: 
Load(hdfs://zly2.sh.intel.com:8020/user/root/multiStore.txt:org.apache.pig.builtin.PigStorage)
 - scope-0------
+            In current code base, once the first job fails, the second job 
will not be executed.
+            the FILE_SETUPJOB_CALLED of second job will not exist.
+            I explain more detail in PIG-4243
+            */
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", 
Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + 
"1", Boolean.TRUE);
+                /*
+            In current code base, once the first job fails, the second job 
will not be executed.
+            the FILE_SETUPTASK_CALLED of second job will not exist.
+            */
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + 
"2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"2", Boolean.FALSE);
+            // OutputCommitter.abortTask will not be invoked in spark mode. 
Detail see SPARK-7953
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"2", Boolean.FALSE);
+            // OutputCommitter.abortJob will not be invoked in spark mode. 
Detail see SPARK-7953
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"2", Boolean.FALSE);
+        } else {
+            filesToVerify.put(outputFileName1 + 
"_cleanupOnFailure_succeeded1", Boolean.TRUE);
+            filesToVerify.put(outputFileName2 + 
"_cleanupOnFailure_succeeded2", Boolean.TRUE);
+            filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", 
Boolean.FALSE);
+            filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", 
Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", 
Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", 
Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + 
"1", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + 
"2", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"2", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"2", Boolean.FALSE);
+        }
 
         String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
 

Modified: pig/trunk/test/org/apache/pig/test/TezMiniCluster.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TezMiniCluster.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/test/org/apache/pig/test/TezMiniCluster.java Mon May 29 15:00:39 
2017
@@ -41,19 +41,12 @@ import org.apache.tez.dag.api.TezConfigu
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
-public class TezMiniCluster extends MiniGenericCluster {
-    private static final File CONF_DIR = new File("build/classes");
+public class TezMiniCluster extends YarnMiniCluster {
+
     private static final File TEZ_LIB_DIR = new File("build/ivy/lib/Pig");
     private static final File TEZ_CONF_FILE = new File(CONF_DIR, 
"tez-site.xml");
-    private static final File CORE_CONF_FILE = new File(CONF_DIR, 
"core-site.xml");
-    private static final File HDFS_CONF_FILE = new File(CONF_DIR, 
"hdfs-site.xml");
-    private static final File MAPRED_CONF_FILE = new File(CONF_DIR, 
"mapred-site.xml");
-    private static final File YARN_CONF_FILE = new File(CONF_DIR, 
"yarn-site.xml");
-    private static final ExecType TEZ = new TezExecType();
 
-    protected MiniMRYarnCluster m_mr = null;
-    private Configuration m_dfs_conf = null;
-    private Configuration m_mr_conf = null;
+    private static final ExecType TEZ = new TezExecType();
 
     @Override
     public ExecType getExecType() {
@@ -61,66 +54,9 @@ public class TezMiniCluster extends Mini
     }
 
     @Override
-    public void setupMiniDfsAndMrClusters() {
+    protected void setupMiniDfsAndMrClusters() {
+        super.setupMiniDfsAndMrClusters();
         try {
-            deleteConfFiles();
-            CONF_DIR.mkdirs();
-
-            // Build mini DFS cluster
-            Configuration hdfsConf = new Configuration();
-            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
-                    .numDataNodes(2)
-                    .format(true)
-                    .racks(null)
-                    .build();
-            m_fileSys = m_dfs.getFileSystem();
-            m_dfs_conf = m_dfs.getConfiguration(0);
-            //Create user home directory
-            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
-
-            // Write core-site.xml
-            Configuration core_site = new Configuration(false);
-            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, 
m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
-            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
-
-            Configuration hdfs_site = new Configuration(false);
-            for (Entry<String, String> conf : m_dfs_conf) {
-                if 
(ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), 
"programatically")) {
-                    hdfs_site.set(conf.getKey(), 
m_dfs_conf.getRaw(conf.getKey()));
-                }
-            }
-            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
-
-            // Build mini YARN cluster
-            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
-            m_mr.init(m_dfs_conf);
-            m_mr.start();
-            m_mr_conf = m_mr.getConfig();
-            File libDir = new File(System.getProperty("ivy.lib.dir", 
"build/ivy/lib/Pig"));
-            File classesDir = new File(System.getProperty("build.classes", 
"build/classes"));
-            File testClassesDir = new 
File(System.getProperty("test.build.classes", "test/build/classes"));
-            String classpath = libDir.getAbsolutePath() + "/*"
-                    + File.pathSeparator + classesDir.getAbsolutePath()
-                    + File.pathSeparator + testClassesDir.getAbsolutePath();
-            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, 
classpath);
-            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
-            m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
-
-            Configuration mapred_site = new Configuration(false);
-            Configuration yarn_site = new Configuration(false);
-            for (Entry<String, String> conf : m_mr_conf) {
-                if 
(ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), 
"programatically")) {
-                    if (conf.getKey().contains("yarn")) {
-                        yarn_site.set(conf.getKey(), 
m_mr_conf.getRaw(conf.getKey()));
-                    } else if (!conf.getKey().startsWith("dfs")){
-                        mapred_site.set(conf.getKey(), 
m_mr_conf.getRaw(conf.getKey()));
-                    }
-                }
-            }
-
-            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
-            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
-
             // Write tez-site.xml
             Configuration tez_conf = new Configuration(false);
             // TODO PIG-3659 - Remove this once memory management is fixed
@@ -150,12 +86,9 @@ public class TezMiniCluster extends Mini
                 }
             }
 
-            m_conf = m_mr_conf;
             // Turn FetchOptimizer off so that we can actually test Tez
             m_conf.set(PigConfiguration.PIG_OPT_FETCH, 
System.getProperty("test.opt.fetch", "false"));
 
-            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
-            System.setProperty("hadoop.log.dir", "build/test/logs");
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -169,29 +102,15 @@ public class TezMiniCluster extends Mini
 
     @Override
     protected void shutdownMiniMrClusters() {
-        deleteConfFiles();
-        if (m_mr != null) {
-            m_mr.stop();
-            m_mr = null;
-        }
+        super.shutdownMiniMrClusters();
     }
 
-    private void deleteConfFiles() {
+    @Override
+    protected void deleteConfFiles() {
+        super.deleteConfFiles();
         if(TEZ_CONF_FILE.exists()) {
             TEZ_CONF_FILE.delete();
         }
-        if(CORE_CONF_FILE.exists()) {
-            CORE_CONF_FILE.delete();
-        }
-        if(HDFS_CONF_FILE.exists()) {
-            HDFS_CONF_FILE.delete();
-        }
-        if(MAPRED_CONF_FILE.exists()) {
-            MAPRED_CONF_FILE.delete();
-        }
-        if(YARN_CONF_FILE.exists()) {
-            YARN_CONF_FILE.delete();
-        }
     }
 
     static public Launcher getLauncher() {

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Mon May 29 15:00:39 2017
@@ -585,6 +585,29 @@ public class Util {
          checkQueryOutputsAfterSort(actualResList, expectedResList);
      }
 
+    /**
+     * Helper function to check if the result of Pig Query is in line with 
expected results.
+     * It sorts actual and expected results before comparison.
+     * The tuple size in the tuple list can vary. Pass by a two-dimension 
array, it will be converted to be a tuple list.
+     * e.g.  expectedTwoDimensionObjects is [{{10, "will_join", 10, 
"will_join"}, {11, "will_not_join", null}, {null, 12, "will_not_join"}}],
+     * the field size of these 3 tuples are [4,3,3]
+     *
+     * @param actualResultsIt
+     * @param expectedTwoDimensionObjects represents a tuple list, in which 
the tuple can have variable size.
+     */
+    static public void checkQueryOutputsAfterSort(Iterator<Tuple> 
actualResultsIt,
+                                                  Object[][] 
expectedTwoDimensionObjects) {
+        List<Tuple> expectedResTupleList = new ArrayList<Tuple>();
+        for (int i = 0; i < expectedTwoDimensionObjects.length; ++i) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            for (int j = 0; j < expectedTwoDimensionObjects[i].length; ++j) {
+                t.append(expectedTwoDimensionObjects[i][j]);
+            }
+            expectedResTupleList.add(t);
+        }
+        checkQueryOutputsAfterSort(actualResultsIt, expectedResTupleList);
+    }
+
      static public void checkQueryOutputsAfterSort(
             List<Tuple> actualResList, List<Tuple> expectedResList) {
          Collections.sort(actualResList);
@@ -592,7 +615,7 @@ public class Util {
 
          Assert.assertEquals("Comparing actual and expected results. ",
                  expectedResList, actualResList);
-
+         
     }
 
     /**
@@ -757,7 +780,8 @@ public class Util {
         if(Util.WINDOWS){
             filename = filename.replace('\\','/');
         }
-        if (context.getExecType() == ExecType.MAPREDUCE || 
context.getExecType().name().equals("TEZ")) {
+        if (context.getExecType() == ExecType.MAPREDUCE || 
context.getExecType().name().equals("TEZ") ||
+                context.getExecType().name().equals("SPARK")) {
             return FileLocalizer.hadoopify(filename, context);
         } else if (context.getExecType().isLocal()) {
             return filename;
@@ -1195,7 +1219,7 @@ public class Util {
     }
 
 
-    static private void convertBagToSortedBag(Tuple t) {
+    static public void convertBagToSortedBag(Tuple t) {
         for (int i=0;i<t.size();i++) {
            Object obj = null;
            try {
@@ -1310,6 +1334,72 @@ public class Util {
         return false;
     }
 
+    public static boolean isSparkExecType(ExecType execType) {
+        if (execType.name().toLowerCase().startsWith("spark")) {
+            return true;
+        }
+
+        return false;
+    }
+
+    public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, 
boolean toSort){
+        if( toSort == true) {
+            for (Tuple t : actualResList) {
+                Util.convertBagToSortedBag(t);
+            }
+            Collections.sort(actualResList);
+        }
+    }
+
+    public static void checkQueryOutputs(Iterator<Tuple> actualResults, 
List<Tuple> expectedResults, boolean checkAfterSort) {
+        if (checkAfterSort) {
+            checkQueryOutputsAfterSort(actualResults, expectedResults);
+        } else {
+            checkQueryOutputs(actualResults, expectedResults);
+        }
+    }
+
+    static public void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
+                                         String[] expectedResArray, 
LogicalSchema schema, boolean
+            checkAfterSort) throws IOException {
+        if (checkAfterSort) {
+            checkQueryOutputsAfterSortRecursive(actualResultsIt,
+                    expectedResArray, schema);
+        } else {
+            checkQueryOutputs(actualResultsIt,
+                    expectedResArray, schema);
+        }
+    }
+
+    static void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
+                                         String[] expectedResArray, 
LogicalSchema schema) throws IOException {
+        LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, 
DataType.TUPLE);
+        ResourceFieldSchema rfs = new ResourceFieldSchema(fs);
+
+        LoadCaster caster = new Utf8StorageConverter();
+        List<Tuple> actualResList = new ArrayList<Tuple>();
+        while (actualResultsIt.hasNext()) {
+            actualResList.add(actualResultsIt.next());
+        }
+
+        List<Tuple> expectedResList = new ArrayList<Tuple>();
+        for (String str : expectedResArray) {
+            Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs);
+            expectedResList.add(newTuple);
+        }
+
+        for (Tuple t : actualResList) {
+            convertBagToSortedBag(t);
+        }
+
+        for (Tuple t : expectedResList) {
+            convertBagToSortedBag(t);
+        }
+
+        Assert.assertEquals("Comparing actual and expected results. ",
+                expectedResList, actualResList);
+    }
+
     public static void assertParallelValues(long defaultParallel,
                                              long requestedParallel,
                                              long estimatedParallel,
@@ -1399,11 +1489,15 @@ public class Util {
 
     public static ExecType getLocalTestMode() throws Exception {
         String execType = System.getProperty("test.exec.type");
-        if (execType!=null && execType.equals("tez")) {
-            return ExecTypeProvider.fromString("tez_local");
-        } else {
-            return ExecTypeProvider.fromString("local");
+        if (execType != null) {
+            if (execType.equals("tez")) {
+                return ExecTypeProvider.fromString("tez_local");
+            } else if (execType.equals("spark")) {
+                return ExecTypeProvider.fromString("spark_local");
+            }
         }
+
+        return ExecTypeProvider.fromString("local");
     }
 
     public static void createLogAppender(String appenderName, Writer writer, 
Class...clazzes) {

Added: pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java (added)
+++ pig/trunk/test/org/apache/pig/test/YarnMiniCluster.java Mon May 29 15:00:39 
2017
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+
+public abstract class YarnMiniCluster extends MiniGenericCluster {
+    protected static final File CONF_DIR = new File("build/classes");
+    protected static final File CORE_CONF_FILE = new File(CONF_DIR, 
"core-site.xml");
+    protected static final File HDFS_CONF_FILE = new File(CONF_DIR, 
"hdfs-site.xml");
+    protected static final File MAPRED_CONF_FILE = new File(CONF_DIR, 
"mapred-site.xml");
+    protected static final File YARN_CONF_FILE = new File(CONF_DIR, 
"yarn-site.xml");
+
+
+    protected Configuration m_dfs_conf = null;
+    protected MiniMRYarnCluster m_mr = null;
+    protected Configuration m_mr_conf = null;
+
+
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+        try {
+            deleteConfFiles();
+            CONF_DIR.mkdirs();
+
+            // Build mini DFS cluster
+            Configuration hdfsConf = new Configuration();
+            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+                    .numDataNodes(2)
+                    .format(true)
+                    .racks(null)
+                    .build();
+            m_fileSys = m_dfs.getFileSystem();
+            m_dfs_conf = m_dfs.getConfiguration(0);
+
+            //Create user home directory
+            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+            // Write core-site.xml
+            Configuration core_site = new Configuration(false);
+            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, 
m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+            Configuration hdfs_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_dfs_conf) {
+                if 
(ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), 
"programatically")) {
+                    hdfs_site.set(conf.getKey(), 
m_dfs_conf.getRaw(conf.getKey()));
+                }
+            }
+            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+            // Build mini YARN cluster
+            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+            m_mr.init(m_dfs_conf);
+            m_mr.start();
+            m_mr_conf = m_mr.getConfig();
+            File libDir = new File(System.getProperty("ivy.lib.dir", 
"build/ivy/lib/Pig"));
+            File classesDir = new File(System.getProperty("build.classes", 
"build/classes"));
+            File testClassesDir = new 
File(System.getProperty("test.build.classes", "test/build/classes"));
+            String classpath = libDir.getAbsolutePath() + "/*"
+                + File.pathSeparator + classesDir.getAbsolutePath()
+                + File.pathSeparator + testClassesDir.getAbsolutePath();
+            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, 
classpath);
+            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
+            m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
+
+            Configuration mapred_site = new Configuration(false);
+            Configuration yarn_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_mr_conf) {
+                if 
(ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), 
"programatically")) {
+                    if (conf.getKey().contains("yarn")) {
+                        yarn_site.set(conf.getKey(), 
m_mr_conf.getRaw(conf.getKey()));
+                    } else if (!conf.getKey().startsWith("dfs")) {
+                        mapred_site.set(conf.getKey(), 
m_mr_conf.getRaw(conf.getKey()));
+                    }
+                }
+            }
+
+            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+            m_conf = m_mr_conf;
+            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+            System.setProperty("hadoop.log.dir", "build/test/logs");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+
+        }
+    }
+
+    protected void deleteConfFiles() {
+        if(CORE_CONF_FILE.exists()) {
+            CORE_CONF_FILE.delete();
+        }
+        if(HDFS_CONF_FILE.exists()) {
+            HDFS_CONF_FILE.delete();
+        }
+        if(MAPRED_CONF_FILE.exists()) {
+            MAPRED_CONF_FILE.delete();
+        }
+        if(YARN_CONF_FILE.exists()) {
+            YARN_CONF_FILE.delete();
+        }
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        deleteConfFiles();
+        if (m_mr != null) {
+            m_mr.stop();
+            m_mr = null;
+        }
+    }
+}
\ No newline at end of file

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
 (added)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
 Mon May 29 15:00:39 2017
@@ -0,0 +1,71 @@
+digraph plan {
+compound=true;
+node [shape=rect];
+s487399236_in [label="", style=invis, height=0, width=0];
+s487399236_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399236 {
+label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b;
+s0_in [label="", style=invis, height=0, width=0];
+s0_out [label="", style=invis, height=0, width=0];
+subgraph cluster_0 {
+label="spark"; style="filled"; fillcolor="white"labelloc=b;
+487399275 [label="a: 
Load(file:///tmp/input,\norg.apache.pig.builtin.PigStorage)", style="filled", 
fillcolor="gray"];
+s487399268_in [label="", style=invis, height=0, width=0];
+s487399268_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399268 {
+label="a: New For Each(false,false)[bag]"labelloc=b;
+487399274 [label="Project[bytearray][0]"];
+487399273 [label="Cast[int]"];
+487399274 -> 487399273
+s487399268_in -> 487399274 [style=invis];
+487399270 [label="Cast[int]"];
+487399271 [label="Project[bytearray][1]"];
+487399271 -> 487399270
+s487399268_in -> 487399271 [style=invis];
+};
+487399273 -> s487399268_out [style=invis];
+487399270 -> s487399268_out [style=invis];
+487399267 [label="a: 
Store(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", 
style="filled", fillcolor="gray"];
+487399275 -> s487399268_in [lhead=cluster_487399268]
+s487399268_out -> 487399267
+s0_in -> 487399275 [style=invis];
+};
+487399267 -> s0_out [style=invis];
+s487399236_in -> s0_in [style=invis];
+};
+s0_out -> s487399236_out [style=invis];
+s487399235_in [label="", style=invis, height=0, width=0];
+s487399235_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399235 {
+label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b;
+s1_in [label="", style=invis, height=0, width=0];
+s1_out [label="", style=invis, height=0, width=0];
+subgraph cluster_1 {
+label="spark"; style="filled"; fillcolor="white"labelloc=b;
+s487399238_in [label="", style=invis, height=0, width=0];
+s487399238_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399238 {
+label="b: New For Each(false,false)[bag]"labelloc=b;
+487399244 [label="Project[bytearray][0]"];
+487399243 [label="Cast[int]"];
+487399244 -> 487399243
+s487399238_in -> 487399244 [style=invis];
+487399241 [label="Project[bytearray][1]"];
+487399240 [label="Cast[int]"];
+487399241 -> 487399240
+s487399238_in -> 487399241 [style=invis];
+};
+487399243 -> s487399238_out [style=invis];
+487399240 -> s487399238_out [style=invis];
+487399237 [label="b: 
Store(file:///tmp/pigoutput1,\norg.apache.pig.builtin.PigStorage)", 
style="filled", fillcolor="gray"];
+487399266 [label="b: 
Load(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", 
style="filled", fillcolor="gray"];
+s487399238_out -> 487399237
+487399266 -> s487399238_in [lhead=cluster_487399238]
+s1_in -> 487399266 [style=invis];
+};
+487399237 -> s1_out [style=invis];
+s487399235_in -> s1_in [style=invis];
+};
+s1_out -> s487399235_out [style=invis];
+s487399236_out -> s487399235_in [lhead=cluster_487399235]
+}

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
 (added)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
 Mon May 29 15:00:39 2017
@@ -0,0 +1,33 @@
+#--------------------------------------------------
+# Spark Plan
+#--------------------------------------------------
+
+Spark node scope-18
+a: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-8
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - 
scope-0--------
+
+Spark node scope-19
+b: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-17
+|
+|---b: New For Each(false,false)[bag] - scope-16
+    |   |
+    |   Cast[int] - scope-11
+    |   |
+    |   |---Project[bytearray][0] - scope-10
+    |   |
+    |   Cast[int] - scope-14
+    |   |
+    |   |---Project[bytearray][1] - scope-13
+    |
+    |---b: Load(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - 
scope-9--------

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
 (added)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
 Mon May 29 15:00:39 2017
@@ -0,0 +1,49 @@
+<sparkPlan>
+  <sparkNode scope="38">
+    <POStore scope="28">
+      <alias>a</alias>
+      <storeFile>file:///tmp/pigoutput</storeFile>
+      <isTmpStore>false</isTmpStore>
+      <POForEach scope="27">
+        <alias>a</alias>
+        <POCast scope="22">
+          <alias>x</alias>
+          <POProject scope="21"/>
+        </POCast>
+        <POCast scope="25">
+          <alias>y</alias>
+          <POProject scope="24"/>
+        </POCast>
+        <POLoad scope="20">
+          <alias>a</alias>
+          <loadFile>file:///tmp/input</loadFile>
+          <isTmpLoad>false</isTmpLoad>
+        </POLoad>
+      </POForEach>
+    </POStore>
+  </sparkNode>
+  <sparkNode scope="39">
+    <POStore scope="37">
+      <alias>b</alias>
+      <storeFile>file:///tmp/pigoutput1</storeFile>
+      <isTmpStore>false</isTmpStore>
+      <POForEach scope="36">
+        <alias>b</alias>
+        <POCast scope="31">
+          <alias>x</alias>
+          <POProject scope="30"/>
+        </POCast>
+        <POCast scope="34">
+          <alias>y</alias>
+          <POProject scope="33"/>
+        </POCast>
+        <POLoad scope="29">
+          <alias>b</alias>
+          <loadFile>file:///tmp/pigoutput</loadFile>
+          <isTmpLoad>false</isTmpLoad>
+        </POLoad>
+      </POForEach>
+    </POStore>
+  </sparkNode>
+</sparkPlan>
+


Reply via email to