Author: szita
Date: Mon May 29 21:45:33 2017
New Revision: 1796703

URL: http://svn.apache.org/viewvc?rev=1796703&view=rev
Log:
PIG-5207: BugFix e2e tests fail on spark (szita)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1796703&r1=1796702&r2=1796703&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 29 21:45:33 2017
@@ -109,6 +109,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5207: BugFix e2e tests fail on spark (szita)
+
 PIG-5194: HiveUDF fails with Spark exec type (szita)
 
 PIG-5231: PigStorage with -schema may produce inconsistent outputs with more 
fields (knoguchi)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1796703&r1=1796702&r2=1796703&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
 Mon May 29 21:45:33 2017
@@ -24,6 +24,7 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,8 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 
+import com.google.common.collect.HashBiMap;
+
 /**
  *
  * The base class for all types of physical plans.
@@ -304,6 +307,16 @@ public class PhysicalPlan extends Operat
             }
         }
 
+        //Fix order of edges in mToEdges lists
+        Map<PhysicalOperator, PhysicalOperator> invertedMatches = 
HashBiMap.create(matches).inverse();
+        for (PhysicalOperator newOp : clone.mToEdges.keySet()) {
+            List<PhysicalOperator> newList = clone.mToEdges.get(newOp);
+            if (newList.size() > 1) {
+                List<PhysicalOperator> originalList = 
this.mToEdges.get(invertedMatches.get(newOp));
+                Collections.sort(newList, new 
EdgeOrderHelper(originalList,invertedMatches));
+            }
+        }
+
         return clone;
     }
 
@@ -315,4 +328,21 @@ public class PhysicalPlan extends Operat
     {
         opmap = null;
     }
+
+
+    private static class EdgeOrderHelper implements 
Comparator<PhysicalOperator> {
+
+        private final Map<PhysicalOperator, PhysicalOperator> invertedMatches;
+        private final List<PhysicalOperator> originalList;
+
+        public EdgeOrderHelper(List<PhysicalOperator> originalList, 
Map<PhysicalOperator, PhysicalOperator> invertedMatches) {
+            this.originalList = originalList;
+            this.invertedMatches = invertedMatches;
+        }
+
+        @Override
+        public int compare(PhysicalOperator o1, PhysicalOperator o2) {
+            return originalList.indexOf(invertedMatches.get(o1)) - 
originalList.indexOf(invertedMatches.get(o2));
+        }
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1796703&r1=1796702&r2=1796703&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
 Mon May 29 21:45:33 2017
@@ -296,8 +296,13 @@ public class CombinerOptimizer extends S
             );
             newProj.setResultType(DataType.BAG);
 
-            PhysicalOperator udfInput = 
pplan.getPredecessors(combineUdf).get(0);
-            pplan.disconnect(udfInput, combineUdf);
+            for (PhysicalOperator originalUdfInput : 
pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0])) {
+                if (pplan.getPredecessors(originalUdfInput) != null) {
+                    pplan.trimAbove(originalUdfInput);
+                }
+                pplan.remove(originalUdfInput);
+            }
+
             pplan.add(newProj);
             pplan.connect(newProj, combineUdf);
             i++;


Reply via email to