[
https://issues.apache.org/jira/browse/PIG-4231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Carlos Balduz updated PIG-4231:
-------------------------------
Status: Patch Available (was: In Progress)
diff --git
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
index ca7a45f..7310967 100644
--- src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
+++ src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
@@ -22,6 +22,7 @@ import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPack
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
@@ -30,6 +31,7 @@ import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOpe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -37,6 +39,7 @@ import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOpe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
@@ -46,6 +49,7 @@ import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConvert
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.POConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
@@ -57,7 +61,6 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.SparkStats;
-
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
@@ -136,6 +139,8 @@ public class SparkLauncher extends Launcher {
convertMap.put(POSplit.class, new SplitConverter());
convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
+ convertMap.put(POCounter.class, new CounterConverter());
+ convertMap.put(PORank.class, new RankConverter());
Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey,
RDD<Tuple>>();
@@ -281,4 +286,4 @@ public class SparkLauncher extends Launcher {
// TODO Auto-generated method stub
}
-}
+}
\ No newline at end of file
> Make rank work with Spark
> -------------------------
>
> Key: PIG-4231
> URL: https://issues.apache.org/jira/browse/PIG-4231
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Carlos Balduz
> Assignee: Carlos Balduz
> Labels: spork
>
> Rank does not work with Spark since PORank and POCounter have not been
> implemented yet.
> Pig Stack Trace
> ---------------
> ERROR 0: java.lang.IllegalArgumentException: Spork unsupported
> PhysicalOperator: (Name: DATA: POCounter[tuple] - scope-146 Operator Key:
> scope-146)
> org.apache.pig.backend.executionengine.ExecException: ERROR 0:
> java.lang.IllegalArgumentException: Spork unsupported PhysicalOperator:
> (Name: DATA: POCounter[tuple] - scope-146 Operator Key: scope-146)
> at
> org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:285)
> at org.apache.pig.PigServer.launchPlan(PigServer.java:1378)
> at
> org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1363)
> at org.apache.pig.PigServer.execute(PigServer.java:1352)
> at org.apache.pig.PigServer.executeBatch(PigServer.java:403)
> at org.apache.pig.PigServer.executeBatch(PigServer.java:386)
> at
> org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:170)
> at
> org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:233)
> at
> org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:204)
> at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
> at org.apache.pig.Main.run(Main.java:482)
> at org.apache.pig.Main.main(Main.java:164)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)