----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/40743/#review109469 -----------------------------------------------------------
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (line 229) <https://reviews.apache.org/r/40743/#comment169028> Change boolean noCombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false); to boolean noCombiner = Boolean.parseBoolean(pc.getProperties().getProperty( PigConfiguration.PIG_EXEC_NO_COMBINER, "false")); we get the configuration from the PigContext not the configuration. we can do the same modification to noSecondaryKey and isMultiQuery. src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java (line 64) <https://reviews.apache.org/r/40743/#comment169035> please don't remove this kind of code if (LOG.isDebugEnabled()) { LOG.debug("LocalRearrangeFunction in " + t); } developer uses it to print message to log file. users will not see it. src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (line 195) <https://reviews.apache.org/r/40743/#comment170589> why not adding following code: // Ensure output is consistent with the output of KeyValueFunction result.append(t.get(0)); Tuple valueTuple = tf.newTuple(); for (Object o : ((Tuple) r.result).getAll()) { if (!o.equals(key)) { valueTuple.append(o); } } result.append(valueTuple); I think returning r is ok. src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkCombinerOptimizer.java (line 63) <https://reviews.apache.org/r/40743/#comment169259> rename SparkCombinerOptimizer to CombinerOptimizer. This class is in spark package. No need to enphasize SparkCombinerCombinerOptimizer. test/org/apache/pig/test/TestCombiner.java (line 263) <https://reviews.apache.org/r/40743/#comment169036> It is nice to add new unit tests but can we ensure that it passes in tez and mr mode? src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (line 29) <https://reviews.apache.org/r/40743/#comment170582> remove the comment. src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PreCombinerLocalRearrangeConverter.java (line 35) <https://reviews.apache.org/r/40743/#comment170583> PreCombinerLocalRearrangeConverter 's code is almost same as LocalRearrangeConverter's code. You can reuse LocalRearrangeConverter. If i miss something, please tell me. src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkCombinerOptimizer.java (line 87) <https://reviews.apache.org/r/40743/#comment170585> After reading your code? I found that following should be: // Output: // foreach (using algebraicOp.Final) // -> reduceBy (uses algebraicOp.Intermediate) // -> localRearrange // -> foreach (using algebraicOp.Initial) - kelly zhang On Dec. 9, 2015, 5:49 a.m., Pallavi Rao wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/40743/ > ----------------------------------------------------------- > > (Updated Dec. 9, 2015, 5:49 a.m.) > > > Review request for pig, Mohit Sabharwal and Xuefu Zhang. > > > Bugs: PIG-4709 > https://issues.apache.org/jira/browse/PIG-4709 > > > Repository: pig-git > > > Description > ------- > > Currently, the GROUPBY operator of PIG is mapped by Spark's CoGroup. When the > grouped data is consumed by subsequent operations to perform algebraic > operations, this is sub-optimal as there is lot of shuffle traffic. > The Spark Plan must be optimized to use reduceBy, where possible, so that a > combiner is used. > > Introduced a combiner optimizer that does the following: > // Checks for algebraic operations and if they exist. > // Replaces global rearrange (cogroup) with reduceBy as follows: > // Input: > // foreach (using algebraicOp) > // -> packager > // -> globalRearrange > // -> localRearrange > // Output: > // foreach (using algebraicOp.Final) > // -> reduceBy (uses algebraicOp.Intermediate) > // -> foreach (using algebraicOp.Initial) > // -> localRearrange > > > Diffs > ----- > > > src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java > f8c1658 > > src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java > aca347d > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java > a4dbadd > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java > 5f74992 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java > 9ce0492 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PreCombinerLocalRearrangeConverter.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkCombinerOptimizer.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java > 6b66ca1 > > src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java > 546d91e > test/org/apache/pig/test/TestCombiner.java df44293 > > Diff: https://reviews.apache.org/r/40743/diff/ > > > Testing > ------- > > The patch unblocked one UT in TestCombiner. Added another UT in the same > class. Also did some manual testing. > > > Thanks, > > Pallavi Rao > >
