Author: knoguchi
Date: Fri Jan 25 21:51:40 2019
New Revision: 1852183

URL: http://svn.apache.org/viewvc?rev=1852183&view=rev
Log:
PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1852183&r1=1852182&r2=1852183&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jan 25 21:51:40 2019
@@ -87,6 +87,7 @@ PIG-5251: Bump joda-time to 2.9.9 (dbist
 OPTIMIZATIONS
  
 BUG FIXES
+PIG-5372: SAMPLE/RANDOM(udf) before skewed join failing with NPE (knoguchi)
 
 PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1852183&r1=1852182&r2=1852183&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 Fri Jan 25 21:51:40 2019
@@ -112,8 +112,6 @@ public class SkewedPartitioner extends P
     @Override
     public void setConf(Configuration job) {
         conf = job;
-        PigMapReduce.sJobConfInternal.set(conf);
-        PigMapReduce.sJobConf = conf;
     }
 
     @Override

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1852183&r1=1852182&r2=1852183&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 Fri Jan 25 21:51:40 2019
@@ -93,10 +93,10 @@ public class MapRedUtil {
             conf.set("yarn.resourcemanager.principal", 
mapConf.get("yarn.resourcemanager.principal"));
         }
 
-        if (PigMapReduce.sJobConfInternal.get().get("fs.file.impl")!=null)
-            conf.set("fs.file.impl", 
PigMapReduce.sJobConfInternal.get().get("fs.file.impl"));
-        if (PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")!=null)
-            conf.set("fs.hdfs.impl", 
PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl"));
+        if (mapConf.get("fs.file.impl")!=null)
+            conf.set("fs.file.impl", mapConf.get("fs.file.impl"));
+        if (mapConf.get("fs.hdfs.impl")!=null)
+            conf.set("fs.hdfs.impl", mapConf.get("fs.hdfs.impl"));
 
         copyTmpFileConfigurationValues(PigMapReduce.sJobConfInternal.get(), 
conf);
 

Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1852183&r1=1852182&r2=1852183&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Fri Jan 25 21:51:40 
2019
@@ -207,7 +207,6 @@ public class TestSkewedJoin {
         assertEquals(0, count);
     }
 
-
     @Test
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
@@ -354,7 +353,7 @@ public class TestSkewedJoin {
         try {
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             {
-                pigServer.registerQuery("C = join A by id, B by id using 
'skewed';");
+                pigServer.registerQuery("C = join A by id, B by id using 
'skewed' parallel 2;");
                 Iterator<Tuple> iter = pigServer.openIterator("C");
 
                 while(iter.hasNext()) {
@@ -375,7 +374,7 @@ public class TestSkewedJoin {
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as 
(id,name);");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
         {
-            pigServer.registerQuery("C = join A by id left, B by id using 
'skewed';");
+            pigServer.registerQuery("C = join A by id left, B by id using 
'skewed' parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
 
             while(iter.hasNext()) {
@@ -383,7 +382,7 @@ public class TestSkewedJoin {
             }
         }
         {
-            pigServer.registerQuery("C = join A by id right, B by id using 
'skewed';");
+            pigServer.registerQuery("C = join A by id right, B by id using 
'skewed' parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
 
             while(iter.hasNext()) {
@@ -391,7 +390,7 @@ public class TestSkewedJoin {
             }
         }
         {
-            pigServer.registerQuery("C = join A by id full, B by id using 
'skewed';");
+            pigServer.registerQuery("C = join A by id full, B by id using 
'skewed' parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("C");
 
             while(iter.hasNext()) {
@@ -413,7 +412,7 @@ public class TestSkewedJoin {
 
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = 
BagFactory.getInstance().newDefaultBag();
         {
-            pigServer.registerQuery("E = join C by id, D by id using 
'skewed';");
+            pigServer.registerQuery("E = join C by id, D by id using 'skewed' 
parallel 2;");
             Iterator<Tuple> iter = pigServer.openIterator("E");
 
             while(iter.hasNext()) {
@@ -487,7 +486,7 @@ public class TestSkewedJoin {
         pigServer.registerQuery("a = load 'left.dat' as (nums:chararray);");
         pigServer.registerQuery("b = load 'right.dat' as 
(number:chararray,text:chararray);");
         pigServer.registerQuery("c = filter a by nums == '7';");
-        pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number 
USING 'skewed';");
+        pigServer.registerQuery("d = join c by nums LEFT OUTER, b by number 
USING 'skewed' parallel 2;");
 
         Iterator<Tuple> iter = pigServer.openIterator("d");
 
@@ -515,7 +514,7 @@ public class TestSkewedJoin {
 
         pigServer.registerQuery("a = load 'foo' as (nums:chararray);");
         pigServer.registerQuery("b = load 'foo' as (nums:chararray);");
-        pigServer.registerQuery("d = join a by nums, b by nums USING 
'skewed';");
+        pigServer.registerQuery("d = join a by nums, b by nums USING 'skewed' 
parallel 2;");
 
         Iterator<Tuple> iter = pigServer.openIterator("d");
         int count = 0;
@@ -569,7 +568,7 @@ public class TestSkewedJoin {
           "exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" +
           "missing = LOAD '/non/existing/directory' AS (a:long);" +
           "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, 
COUNT_STAR($1);" +
-          "joined = JOIN exists BY a, missing BY a USING 'skewed';";
+          "joined = JOIN exists BY a, missing BY a USING 'skewed' parallel 2;";
 
         String logFile = Util.createTempFileDelOnExit("tmp", 
".log").getAbsolutePath();
         Logger logger = Logger.getLogger("org.apache.pig");
@@ -619,4 +618,34 @@ public class TestSkewedJoin {
         }
     }
 
+    // PIG-5372
+    @Test
+    public void testSkewedJoinWithRANDOMudf() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, 
n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, 
name);");
+        pigServer.registerQuery("A2 = FOREACH A GENERATE id, RANDOM() as 
randnum;");
+
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join A2 by id, B by id using 'skewed' 
parallel 2;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join A2 by id, B by id;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        assertTrue(dbfrj.size()>0);
+        assertTrue(dbshj.size()>0);
+        assertEquals(dbfrj.size(), dbshj.size());
+    }
+
+
 }


Reply via email to