[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349524#comment-15349524 ] Sean Owen commented on SPARK-15861: --- Does this resolve the problem / answer the question? > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331975#comment-15331975 ] Bryan Cutler commented on SPARK-15861: -- If you change your function to this {noformat} def to_np(data): return [np.array(list(data))] {noformat} I think you would get what you expect, but this is probably not a good way to go about it. I feel like you should be aggregating your lists into numpy arrays instead, but someone else might know better. > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328417#comment-15328417 ] Bryan Cutler commented on SPARK-15861: -- {{mapPartitions}} will expect the function to return a sequence, that's what you are referring to right? > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328380#comment-15328380 ] Greg Bowyer commented on SPARK-15861: - ... Hum from my end-users testing it does not seem to fail if the map function does not return a valid sequence > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328245#comment-15328245 ] Bryan Cutler commented on SPARK-15861: -- [~gbow...@fastmail.co.uk] {{mapPartitions}} expects a function the takes an iterator as input then outputs an iterable sequence, and your function in the example is actually providing this. I think what is going on here is your function will map the iterator to a numpy array, that internally will be something like {noformat}array([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]){noformat} for the first partition, then {{collect}} will iterate over that sequence and return each element, which will also be a numpy array, so you get {noformat}array([0, 1, 2, 3, 4]), array([5, 6, 7, 8, 9])) {noformat} for the first 2 elements and so on.. I believe this is working as it is supposed to, and in general, {{mapPartitions}} will not usually give the same result as {{map}} - it will fail if the function does not return a valid sequence. The documentation could perhaps be a little clearer in that regard. > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15325007#comment-15325007 ] Sean Owen commented on SPARK-15861: --- Got it. That does look odd. I doubt the explanation is that "mapPartitions works like map in this case" but I also don't know enough Python to know what would make the difference. What's an example of something that does work as expected? your snippet is cut off at the end of your patch description. > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324917#comment-15324917 ] Greg Bowyer commented on SPARK-15861: - Minor patch for usability here (its not a great patch) https://github.com/GregBowyer/spark/pull/1 > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324865#comment-15324865 ] Greg Bowyer commented on SPARK-15861: - So the documentation and use case really suggests that the output would be like so {code} [ array([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]), array([[10, 11, 12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24]]) ] {code} Where my original partitions are preserved > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows, 2) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15861) pyspark mapPartitions with none generator functions / functors
[ https://issues.apache.org/jira/browse/SPARK-15861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324131#comment-15324131 ] Sean Owen commented on SPARK-15861: --- I may be missing the punchline, but what is the issue here? the result looks like what I'd expect. > pyspark mapPartitions with none generator functions / functors > -- > > Key: SPARK-15861 > URL: https://issues.apache.org/jira/browse/SPARK-15861 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Greg Bowyer >Priority: Minor > > Hi all, it appears that the method `rdd.mapPartitions` does odd things if it > is fed a normal subroutine. > For instance, lets say we have the following > {code} > rows = range(25) > rows = [rows[i:i+5] for i in range(0, len(rows), 5)] > rdd = sc.parallelize(rows) > def to_np(data): > return np.array(list(data)) > rdd.mapPartitions(to_np).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > rdd.mapPartitions(to_np, preservePartitioning=True).collect() > ... > [array([0, 1, 2, 3, 4]), > array([5, 6, 7, 8, 9]), > array([10, 11, 12, 13, 14]), > array([15, 16, 17, 18, 19]), > array([20, 21, 22, 23, 24])] > {code} > This basically makes the provided function that did return act like the end > user called {code}rdd.map{code} > I think that maybe a check should be put in to call > {code}inspect.isgeneratorfunction{code} > ? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org