----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32031/#review78021 -----------------------------------------------------------
Thanks for patch, Praveen! I had a few questions. Can't we just set "endOfAllInput" to true when we encounter the last tuple in CollectedGroupFunction (instead of the passing a special arg to poCollectedGroup.getNextTuple()) ? i.e. poCollectedGroup.getPlans().get(0).endOfAllInput = true src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java <https://reviews.apache.org/r/32031/#comment126402> From Rohini's comment on PIG-4193, doesn't change to POCollectedGroup need to go into a separate patch? src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java <https://reviews.apache.org/r/32031/#comment126400> Do we necessarily need to coalesce to a single partition in order to count number of elements in the RDD ? Also, we later do processing one partition at a time, using mapPartitions: rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true).rdd(); Isn't the count computed inaccurate ? (it represents all elements in rdd, not just all elements in a partition - and mapParitions process one partition at a time) Also, are we processing one partition at a time (mapParitions vs map) here mostly for efficiency ? src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java <https://reviews.apache.org/r/32031/#comment126404> Call this "done" instead of "proceed" ? -- latter seems unintuitive term for end of computation. src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java <https://reviews.apache.org/r/32031/#comment126403> let's call this "count" or just "i" ? "current_val" seems misleading name for a counter. src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java <https://reviews.apache.org/r/32031/#comment126401> Please use LOG.error("Message " + e, e); instead. src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java <https://reviews.apache.org/r/32031/#comment126406> Please fix this comment. There is no "mapper" in our case. src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java <https://reviews.apache.org/r/32031/#comment126405> let's make POCollectedGroup.getStreamCloseResult() public and call here instead of duplicating. - Mohit Sabharwal On March 13, 2015, 10:42 a.m., Praveen Rachabattuni wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32031/ > ----------------------------------------------------------- > > (Updated March 13, 2015, 10:42 a.m.) > > > Review request for pig, liyun zhang and Mohit Sabharwal. > > > Bugs: PIG-4193 > https://issues.apache.org/jira/browse/PIG-4193 > > > Repository: pig-git > > > Description > ------- > > Moved getNextTuple(boolean proceed) method from POCollectedGroup to > POCollectedGroupSpark. > > Collected group when used with mr performs group operation in the mapside > after making sure all data for same key exists on single map. This behaviour > in spark is achieved by a single map on function using POCollectedGroup > operator. > > TODO: > - Avoid using rdd.count() in CollectedGroupConverter. > > > Diffs > ----- > > > src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java > 7f2f18e52e083b3e8e90ba02d07f12bcbc9be859 > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java > ca7a45f33320064e22628b40b34be7b9f7b07c36 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java > 3d04ba11855c39960e00d6f51b66654d1c70ebad > > src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POCollectedGroupSpark.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java > PRE-CREATION > > Diff: https://reviews.apache.org/r/32031/diff/ > > > Testing > ------- > > Tested TestCollectedGroup and do not have any new successes or failures. > > > Thanks, > > Praveen Rachabattuni > >
