[jira] [Commented] (SPARK-19116) LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file
[ https://issues.apache.org/jira/browse/SPARK-19116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16115176#comment-16115176 ] Shea Parkes commented on SPARK-19116: - Apologies for not responding earlier. I'm struggling to recall all the nuances of my original issue here, but your response does seem to be an acceptable answer. It's not intuitive to me, and we've pretty much gone over to explicit broadcast joining in all of our production work, but if this is as intended, then I'm fine closing this down... > LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file > - > > Key: SPARK-19116 > URL: https://issues.apache.org/jira/browse/SPARK-19116 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.1, 2.0.2 > Environment: Python 3.5.x > Windows 10 >Reporter: Shea Parkes > > We're having some modestly severe issues with broadcast join inference, and > I've been chasing them through the join heuristics in the catalyst engine. > I've made it as far as I can, and I've hit upon something that does not make > any sense to me. > I thought that loading from parquet would be a RelationPlan, which would just > use the sum of default sizeInBytes for each column times the number of rows. > But this trivial example shows that I am not correct: > {code} > import pyspark.sql.functions as F > df_range = session.range(100).select(F.col('id').cast('integer')) > df_range.write.parquet('c:/scratch/hundred_integers.parquet') > df_parquet = session.read.parquet('c:/scratch/hundred_integers.parquet') > df_parquet.explain(True) > # Expected sizeInBytes > integer_default_sizeinbytes = 4 > print(df_parquet.count() * integer_default_sizeinbytes) # = 400 > # Inferred sizeInBytes > print(df_parquet._jdf.logicalPlan().statistics().sizeInBytes()) # = 2318 > # For posterity (Didn't really expect this to match anything above) > print(df_range._jdf.logicalPlan().statistics().sizeInBytes()) # = 600 > {code} > And here's the results of explain(True) on df_parquet: > {code} > In [456]: == Parsed Logical Plan == > Relation[id#794] parquet > == Analyzed Logical Plan == > id: int > Relation[id#794] parquet > == Optimized Logical Plan == > Relation[id#794] parquet > == Physical Plan == > *BatchedScan parquet [id#794] Format: ParquetFormat, InputPaths: > file:/c:/scratch/hundred_integers.parquet, PartitionFilters: [], > PushedFilters: [], ReadSchema: struct > {code} > So basically, I'm not understanding well how the size of the parquet file is > being estimated. I don't expect it to be extremely accurate, but empirically > it's so inaccurate that we're having to mess with autoBroadcastJoinThreshold > way too much. (It's not always too high like the example above, it's often > way too low.) > Without deeper understanding, I'm considering a result of 2318 instead of 400 > to be a bug. My apologies if I'm missing something obvious. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20683) Make table uncache chaining optional
[ https://issues.apache.org/jira/browse/SPARK-20683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16003864#comment-16003864 ] Shea Parkes commented on SPARK-20683: - For anyone that found this issue and just wants to revert to the old behavior in their own fork, the following change in {{sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala}} worked for us: {code} - if (cd.plan.find(_.sameResult(plan)).isDefined) { + if (cd.plan.sameResult(plan)) { {code} > Make table uncache chaining optional > > > Key: SPARK-20683 > URL: https://issues.apache.org/jira/browse/SPARK-20683 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 > Environment: Not particularly environment sensitive. > Encountered/tested on Linux and Windows. >Reporter: Shea Parkes > > A recent change was made in SPARK-19765 that causes table uncaching to chain. > That is, if table B is a child of table A, and they are both cached, now > uncaching table A will automatically uncache table B. > At first I did not understand the need for this, but when reading the unit > tests, I see that it is likely that many people do not keep named references > to the child table (e.g. B). Perhaps B is just made and cached as some part > of data exploration. In that situation, it makes sense for B to > automatically be uncached when you are finished with A. > However, we commonly utilize a different design pattern that is now harmed by > this automatic uncaching. It is common for us to cache table A to then make > two, independent children tables (e.g. B and C). Once those two child tables > are realized and cached, we'd then uncache table A (as it was no longer > needed and could be quite large). After this change now, when we uncache > table A, we suddenly lose our cached status on both table B and C (which is > quite frustrating). All of these tables are often quite large, and we view > what we're doing as mindful memory management. We are maintaining named > references to B and C at all times, so we can always uncache them ourselves > when it makes sense. > Would it be acceptable/feasible to make this table uncache chaining optional? > I would be fine if the default is for the chaining to happen, as long as we > can turn it off via parameters. > If acceptable, I can try to work towards making the required changes. I am > most comfortable in Python (and would want the optional parameter surfaced in > Python), but have found the places required to make this change in Scala > (since I reverted the functionality in a private fork already). Any help > would be greatly appreciated however. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20683) Make table uncache chaining optional
Shea Parkes created SPARK-20683: --- Summary: Make table uncache chaining optional Key: SPARK-20683 URL: https://issues.apache.org/jira/browse/SPARK-20683 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.1 Environment: Not particularly environment sensitive. Encountered/tested on Linux and Windows. Reporter: Shea Parkes A recent change was made in SPARK-19765 that causes table uncaching to chain. That is, if table B is a child of table A, and they are both cached, now uncaching table A will automatically uncache table B. At first I did not understand the need for this, but when reading the unit tests, I see that it is likely that many people do not keep named references to the child table (e.g. B). Perhaps B is just made and cached as some part of data exploration. In that situation, it makes sense for B to automatically be uncached when you are finished with A. However, we commonly utilize a different design pattern that is now harmed by this automatic uncaching. It is common for us to cache table A to then make two, independent children tables (e.g. B and C). Once those two child tables are realized and cached, we'd then uncache table A (as it was no longer needed and could be quite large). After this change now, when we uncache table A, we suddenly lose our cached status on both table B and C (which is quite frustrating). All of these tables are often quite large, and we view what we're doing as mindful memory management. We are maintaining named references to B and C at all times, so we can always uncache them ourselves when it makes sense. Would it be acceptable/feasible to make this table uncache chaining optional? I would be fine if the default is for the chaining to happen, as long as we can turn it off via parameters. If acceptable, I can try to work towards making the required changes. I am most comfortable in Python (and would want the optional parameter surfaced in Python), but have found the places required to make this change in Scala (since I reverted the functionality in a private fork already). Any help would be greatly appreciated however. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928179#comment-15928179 ] Shea Parkes edited comment on SPARK-12261 at 3/16/17 2:38 PM: -- I simply added the following to the end: {code} for _ in iterator: pass {code} This will run through the rest of iterator (until the StopIteration exception like normal). Depending how you're making pyspark importable, you might need to make this change inside a zipped copy of pyspark as well (e.g. in the binary distributions downloadable from Spark's home page). was (Author: shea.parkes): I simply added the following to the end: {code} for _ in iterator: pass {code} This will run through the rest of iterator (until the StopIteration exception like normal). Depending how you're making pyspark importable, you might need to make this change inside a zipped copy of pyspark as well (e.g. in the binary distributions available). > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write error > Then i ran the same code for a small text file, this time .take() worked fine. > How can i solve this problem? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928179#comment-15928179 ] Shea Parkes edited comment on SPARK-12261 at 3/16/17 2:38 PM: -- I simply added the following to the end: {code} for _ in iterator: pass {code} This will run through the rest of iterator (until the StopIteration exception like normal). Depending how you're making pyspark importable, you might need to make this change inside a zipped copy of pyspark as well (e.g. in the binary distributions available). was (Author: shea.parkes): I simply added the following to the end: for _ in iterator: pass This will run through the rest of iterator (until the StopIteration exception like normal). Depending how you're making pyspark importable, you might need to make this change inside a zipped copy of pyspark as well (e.g. in the binary distributions available). > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write error > Then i ran the same code for a small text file, this time .take() worked fine. > How can i solve this problem? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928179#comment-15928179 ] Shea Parkes commented on SPARK-12261: - I simply added the following to the end: for _ in iterator: pass This will run through the rest of iterator (until the StopIteration exception like normal). Depending how you're making pyspark importable, you might need to make this change inside a zipped copy of pyspark as well (e.g. in the binary distributions available). > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write error > Then i ran the same code for a small text file, this time .take() worked fine. > How can i solve this problem? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18541) Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API
[ https://issues.apache.org/jira/browse/SPARK-18541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866393#comment-15866393 ] Shea Parkes commented on SPARK-18541: - Thank you very much! > Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management > in pyspark SQL API > > > Key: SPARK-18541 > URL: https://issues.apache.org/jira/browse/SPARK-18541 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 2.0.2 > Environment: all >Reporter: Shea Parkes >Assignee: Shea Parkes >Priority: Minor > Labels: newbie > Fix For: 2.2.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > In the Scala SQL API, you can pass in new metadata when you alias a field. > That functionality is not available in the Python API. Right now, you have > to painfully utilize {{SparkSession.createDataFrame}} to manipulate the > metadata for even a single column. I would propose to add the following > method to {{pyspark.sql.Column}}: > {code} > def aliasWithMetadata(self, name, metadata): > """ > Make a new Column that has the provided alias and metadata. > Metadata will be processed with json.dumps() > """ > _context = pyspark.SparkContext._active_spark_context > _metadata_str = json.dumps(metadata) > _metadata_jvm = > _context._jvm.org.apache.spark.sql.types.Metadata.fromJson(_metadata_str) > _new_java_column = getattr(self._jc, 'as')(name, _metadata_jvm) > return Column(_new_java_column) > {code} > I can likely complete this request myself if there is any interest for it. > Just have to dust off my knowledge of doctest and the location of the python > tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19116) LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file
Shea Parkes created SPARK-19116: --- Summary: LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file Key: SPARK-19116 URL: https://issues.apache.org/jira/browse/SPARK-19116 Project: Spark Issue Type: Bug Components: PySpark, SQL Affects Versions: 2.0.2, 2.0.1 Environment: Python 3.5.x Windows 10 Reporter: Shea Parkes We're having some modestly severe issues with broadcast join inference, and I've been chasing them through the join heuristics in the catalyst engine. I've made it as far as I can, and I've hit upon something that does not make any sense to me. I thought that loading from parquet would be a RelationPlan, which would just use the sum of default sizeInBytes for each column times the number of rows. But this trivial example shows that I am not correct: {code} import pyspark.sql.functions as F df_range = session.range(100).select(F.col('id').cast('integer')) df_range.write.parquet('c:/scratch/hundred_integers.parquet') df_parquet = session.read.parquet('c:/scratch/hundred_integers.parquet') df_parquet.explain(True) # Expected sizeInBytes integer_default_sizeinbytes = 4 print(df_parquet.count() * integer_default_sizeinbytes) # = 400 # Inferred sizeInBytes print(df_parquet._jdf.logicalPlan().statistics().sizeInBytes()) # = 2318 # For posterity (Didn't really expect this to match anything above) print(df_range._jdf.logicalPlan().statistics().sizeInBytes()) # = 600 {code} And here's the results of explain(True) on df_parquet: {code} In [456]: == Parsed Logical Plan == Relation[id#794] parquet == Analyzed Logical Plan == id: int Relation[id#794] parquet == Optimized Logical Plan == Relation[id#794] parquet == Physical Plan == *BatchedScan parquet [id#794] Format: ParquetFormat, InputPaths: file:/c:/scratch/hundred_integers.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} So basically, I'm not understanding well how the size of the parquet file is being estimated. I don't expect it to be extremely accurate, but empirically it's so inaccurate that we're having to mess with autoBroadcastJoinThreshold way too much. (It's not always too high like the example above, it's often way too low.) Without deeper understanding, I'm considering a result of 2318 instead of 400 to be a bug. My apologies if I'm missing something obvious. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18541) Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API
[ https://issues.apache.org/jira/browse/SPARK-18541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703755#comment-15703755 ] Shea Parkes commented on SPARK-18541: - Yea, I originally did {{aliasWithMetadata}} because I could monkey-patch without conflicts, but upon reflection, just changing the existing {{alias}} method to accept a {{metadata}} keyword argument should work fine. I'll see about getting a pull request up soon. > Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management > in pyspark SQL API > > > Key: SPARK-18541 > URL: https://issues.apache.org/jira/browse/SPARK-18541 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 2.0.2 > Environment: all >Reporter: Shea Parkes >Priority: Minor > Labels: newbie > Original Estimate: 24h > Remaining Estimate: 24h > > In the Scala SQL API, you can pass in new metadata when you alias a field. > That functionality is not available in the Python API. Right now, you have > to painfully utilize {{SparkSession.createDataFrame}} to manipulate the > metadata for even a single column. I would propose to add the following > method to {{pyspark.sql.Column}}: > {code} > def aliasWithMetadata(self, name, metadata): > """ > Make a new Column that has the provided alias and metadata. > Metadata will be processed with json.dumps() > """ > _context = pyspark.SparkContext._active_spark_context > _metadata_str = json.dumps(metadata) > _metadata_jvm = > _context._jvm.org.apache.spark.sql.types.Metadata.fromJson(_metadata_str) > _new_java_column = getattr(self._jc, 'as')(name, _metadata_jvm) > return Column(_new_java_column) > {code} > I can likely complete this request myself if there is any interest for it. > Just have to dust off my knowledge of doctest and the location of the python > tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18541) Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API
Shea Parkes created SPARK-18541: --- Summary: Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API Key: SPARK-18541 URL: https://issues.apache.org/jira/browse/SPARK-18541 Project: Spark Issue Type: Improvement Components: PySpark, SQL Affects Versions: 2.0.2 Environment: all Reporter: Shea Parkes Priority: Minor In the Scala SQL API, you can pass in new metadata when you alias a field. That functionality is not available in the Python API. Right now, you have to painfully utilize {{SparkSession.createDataFrame}} to manipulate the metadata for even a single column. I would propose to add the following method to {{pyspark.sql.Column}}: {code} def aliasWithMetadata(self, name, metadata): """ Make a new Column that has the provided alias and metadata. Metadata will be processed with json.dumps() """ _context = pyspark.SparkContext._active_spark_context _metadata_str = json.dumps(metadata) _metadata_jvm = _context._jvm.org.apache.spark.sql.types.Metadata.fromJson(_metadata_str) _new_java_column = getattr(self._jc, 'as')(name, _metadata_jvm) return Column(_new_java_column) {code} I can likely complete this request myself if there is any interest for it. Just have to dust off my knowledge of doctest and the location of the python tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2141) Add sc.getPersistentRDDs() to PySpark
[ https://issues.apache.org/jira/browse/SPARK-2141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15665137#comment-15665137 ] Shea Parkes commented on SPARK-2141: This would have been nice to have today. We wanted to clean up some cached interim RDDs from a mllib model fit, but there were no references to them in the python interpreter. To get around this, we had to reach into the _jsc attribute and utilize the java-based getPersistentRDDs to get references to the dang RDDs (just long enough to unpersist them). > Add sc.getPersistentRDDs() to PySpark > - > > Key: SPARK-2141 > URL: https://issues.apache.org/jira/browse/SPARK-2141 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 1.0.0 >Reporter: Nicholas Chammas >Assignee: Kan Zhang > > PySpark does not appear to have {{sc.getPersistentRDDs()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629182#comment-15629182 ] Shea Parkes commented on SPARK-12261: - I'm still maintaining the two-line bandaid to {{takeUpToNumLeft()}} in v2.0.x for our purposes (and it's still working). Given that it has helped another person now, would you accept a patch with that bandaid into the mainline code? > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write error > Then i ran the same code for a small text file, this time .take() worked fine. > How can i solve this problem? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17998) Reading Parquet files coalesces parts into too few in-memory partitions
[ https://issues.apache.org/jira/browse/SPARK-17998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15590680#comment-15590680 ] Shea Parkes commented on SPARK-17998: - That definitely answers it. I would say the default of 128MB makes things a little uncomfortable when paired with pyspark and the default python worker memory, but now that I understand it we can tune it for our workflow appropriately. Thanks again and I'll close this down. > Reading Parquet files coalesces parts into too few in-memory partitions > --- > > Key: SPARK-17998 > URL: https://issues.apache.org/jira/browse/SPARK-17998 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.0, 2.0.1 > Environment: Spark Standalone Cluster (not "local mode") > Windows 10 and Windows 7 > Python 3.x >Reporter: Shea Parkes > > Reading a parquet ~file into a DataFrame is resulting in far too few > in-memory partitions. In prior versions of Spark, the resulting DataFrame > would have a number of partitions often equal to the number of parts in the > parquet folder. > Here's a minimal reproducible sample: > {quote} > df_first = session.range(start=1, end=1, numPartitions=13) > assert df_first.rdd.getNumPartitions() == 13 > assert session._sc.defaultParallelism == 6 > path_scrap = r"c:\scratch\scrap.parquet" > df_first.write.parquet(path_scrap) > df_second = session.read.parquet(path_scrap) > print(df_second.rdd.getNumPartitions()) > {quote} > The above shows only 7 partitions in the DataFrame that was created by > reading the Parquet back into memory for me. Why is it no longer just the > number of part files in the Parquet folder? (Which is 13 in the example > above.) > I'm filing this as a bug because it has gotten so bad that we can't work with > the underlying RDD without first repartitioning the DataFrame, which is > costly and wasteful. I really doubt this was the intended effect of moving > to Spark 2.0. > I've tried to research where the number of in-memory partitions is > determined, but my Scala skills have proven in-adequate. I'd be happy to dig > further if someone could point me in the right direction... -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-17998) Reading Parquet files coalesces parts into too few in-memory partitions
[ https://issues.apache.org/jira/browse/SPARK-17998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shea Parkes closed SPARK-17998. --- Resolution: Information Provided > Reading Parquet files coalesces parts into too few in-memory partitions > --- > > Key: SPARK-17998 > URL: https://issues.apache.org/jira/browse/SPARK-17998 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.0.0, 2.0.1 > Environment: Spark Standalone Cluster (not "local mode") > Windows 10 and Windows 7 > Python 3.x >Reporter: Shea Parkes > > Reading a parquet ~file into a DataFrame is resulting in far too few > in-memory partitions. In prior versions of Spark, the resulting DataFrame > would have a number of partitions often equal to the number of parts in the > parquet folder. > Here's a minimal reproducible sample: > {quote} > df_first = session.range(start=1, end=1, numPartitions=13) > assert df_first.rdd.getNumPartitions() == 13 > assert session._sc.defaultParallelism == 6 > path_scrap = r"c:\scratch\scrap.parquet" > df_first.write.parquet(path_scrap) > df_second = session.read.parquet(path_scrap) > print(df_second.rdd.getNumPartitions()) > {quote} > The above shows only 7 partitions in the DataFrame that was created by > reading the Parquet back into memory for me. Why is it no longer just the > number of part files in the Parquet folder? (Which is 13 in the example > above.) > I'm filing this as a bug because it has gotten so bad that we can't work with > the underlying RDD without first repartitioning the DataFrame, which is > costly and wasteful. I really doubt this was the intended effect of moving > to Spark 2.0. > I've tried to research where the number of in-memory partitions is > determined, but my Scala skills have proven in-adequate. I'd be happy to dig > further if someone could point me in the right direction... -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17998) Reading Parquet files coalesces parts into too few in-memory partitions
Shea Parkes created SPARK-17998: --- Summary: Reading Parquet files coalesces parts into too few in-memory partitions Key: SPARK-17998 URL: https://issues.apache.org/jira/browse/SPARK-17998 Project: Spark Issue Type: Bug Components: PySpark, SQL Affects Versions: 2.0.1, 2.0.0 Environment: Spark Standalone Cluster (not "local mode") Windows 10 and Windows 7 Python 3.x Reporter: Shea Parkes Reading a parquet ~file into a DataFrame is resulting in far too few in-memory partitions. In prior versions of Spark, the resulting DataFrame would have a number of partitions often equal to the number of parts in the parquet folder. Here's a minimal reproducible sample: {quote} df_first = session.range(start=1, end=1, numPartitions=13) assert df_first.rdd.getNumPartitions() == 13 assert session._sc.defaultParallelism == 6 path_scrap = r"c:\scratch\scrap.parquet" df_first.write.parquet(path_scrap) df_second = session.read.parquet(path_scrap) print(df_second.rdd.getNumPartitions()) {quote} The above shows only 7 partitions in the DataFrame that was created by reading the Parquet back into memory for me. Why is it no longer just the number of part files in the Parquet folder? (Which is 13 in the example above.) I'm filing this as a bug because it has gotten so bad that we can't work with the underlying RDD without first repartitioning the DataFrame, which is costly and wasteful. I really doubt this was the intended effect of moving to Spark 2.0. I've tried to research where the number of in-memory partitions is determined, but my Scala skills have proven in-adequate. I'd be happy to dig further if someone could point me in the right direction... -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17218) Caching a DataFrame with >200 columns ~nulls the contents
[ https://issues.apache.org/jira/browse/SPARK-17218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shea Parkes updated SPARK-17218: Description: Caching a DataFrame with >200 columns causes the contents to be ~nulled. This is quite a painful bug for us and caused us to place all sorts of bandaid bypasses in our production work recently. Minimally reproducible example: {code} from pyspark.sql import SQLContext import tempfile sqlContext = SQLContext(sc) path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet' list_df_varnames = [] list_df_values = [] for i in range(210): list_df_varnames.append('var'+str(i)) list_df_values.append(str(i)) test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames) test_df.show() # Still looks okay print(test_df.collect()) # Still looks okay test_df.cache() # When everything goes awry test_df.show() # All values have been ~nulled print(test_df.collect()) # Still looks okay # Serialize and read back from parquet now test_df.write.parquet(path_fail_parquet) loaded_df = sqlContext.read.parquet(path_fail_parquet) loaded_df.show() # All values have been ~nulled print(loaded_df.collect()) # All values have been ~nulled {code} As shown in the example above, the underlying RDD seems to survive the caching, but as soon as we serialize to parquet the data corruption becomes complete. This is occurring on Windows 10 with Python 3.5.x. We're running a Spark Standalone cluster. Everything works fine with <200 columns/fields. We have Kyro serialization turned on at the moment, but the same error manifested when we turned it off. I will try to get this tested on Spark 2.0.0 in the near future, but I generally steer clear of x.0.0 releases as best I can. I tried to search for another issue related to this and came up with nothing. My apologies if I missed it; there doesn't seem to be a good combination of keywords to describe this glitch. Happy to provide more details. was: Caching a DataFrame with >200 columns causes the contents to be ~nulled. This is quite a painful bug for us and caused us to place all sorts of bandaid bypasses in our production work recently. Minimally reproducible example: {code:python} from pyspark.sql import SQLContext import tempfile sqlContext = SQLContext(sc) path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet' list_df_varnames = [] list_df_values = [] for i in range(210): list_df_varnames.append('var'+str(i)) list_df_values.append(str(i)) test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames) test_df.show() # Still looks okay print(test_df.collect()) # Still looks okay test_df.cache() # When everything goes awry test_df.show() # All values have been ~nulled print(test_df.collect()) # Still looks okay # Serialize and read back from parquet now test_df.write.parquet(path_fail_parquet) loaded_df = sqlContext.read.parquet(path_fail_parquet) loaded_df.show() # All values have been ~nulled print(loaded_df.collect()) # All values have been ~nulled {code} As shown in the example above, the underlying RDD seems to survive the caching, but as soon as we serialize to parquet the data corruption becomes complete. This is occurring on Windows 10 with Python 3.5.x. We're running a Spark Standalone cluster. Everything works fine with <200 columns/fields. We have Kyro serialization turned on at the moment, but the same error manifested when we turned it off. I will try to get this tested on Spark 2.0.0 in the near future, but I generally steer clear of x.0.0 releases as best I can. I tried to search for another issue related to this and came up with nothing. My apologies if I missed it; there doesn't seem to be a good combination of keywords to describe this glitch. Happy to provide more details. > Caching a DataFrame with >200 columns ~nulls the contents > - > > Key: SPARK-17218 > URL: https://issues.apache.org/jira/browse/SPARK-17218 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.2 > Environment: Microsoft Windows 10 > Python v3.5.x > Standalone Spark Cluster >Reporter: Shea Parkes > > Caching a DataFrame with >200 columns causes the contents to be ~nulled. > This is quite a painful bug for us and caused us to place all sorts of > bandaid bypasses in our production work recently. > Minimally reproducible example: > {code} > from pyspark.sql import SQLContext > import tempfile > sqlContext = SQLContext(sc) > path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet' > list_df_varnames = [] > list_df_values = [] > for i in range(210): > list_df_varnames.append('var'+str(i)) > list_df_values.append(str(i)) > test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames) > test_df.show() # St
[jira] [Created] (SPARK-17218) Caching a DataFrame with >200 columns ~nulls the contents
Shea Parkes created SPARK-17218: --- Summary: Caching a DataFrame with >200 columns ~nulls the contents Key: SPARK-17218 URL: https://issues.apache.org/jira/browse/SPARK-17218 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 1.6.2 Environment: Microsoft Windows 10 Python v3.5.x Reporter: Shea Parkes Caching a DataFrame with >200 columns causes the contents to be ~nulled. This is quite a painful bug for us and caused us to place all sorts of bandaid bypasses in our production work recently. Minimally reproducible example: {code:python} from pyspark.sql import SQLContext import tempfile sqlContext = SQLContext(sc) path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet' list_df_varnames = [] list_df_values = [] for i in range(210): list_df_varnames.append('var'+str(i)) list_df_values.append(str(i)) test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames) test_df.show() # Still looks okay print(test_df.collect()) # Still looks okay test_df.cache() # When everything goes awry test_df.show() # All values have been ~nulled print(test_df.collect()) # Still looks okay # Serialize and read back from parquet now test_df.write.parquet(path_fail_parquet) loaded_df = sqlContext.read.parquet(path_fail_parquet) loaded_df.show() # All values have been ~nulled print(loaded_df.collect()) # All values have been ~nulled {code} As shown in the example above, the underlying RDD seems to survive the caching, but as soon as we serialize to parquet the data corruption becomes complete. This is occurring on Windows 10 with Python 3.5.x. We're running a Spark Standalone cluster. Everything works fine with <200 columns/fields. We have Kyro serialization turned on at the moment, but the same error manifested when we turned it off. I will try to get this tested on Spark 2.0.0 in the near future, but I generally steer clear of x.0.0 releases as best I can. I tried to search for another issue related to this and came up with nothing. My apologies if I missed it; there doesn't seem to be a good combination of keywords to describe this glitch. Happy to provide more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17218) Caching a DataFrame with >200 columns ~nulls the contents
[ https://issues.apache.org/jira/browse/SPARK-17218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shea Parkes updated SPARK-17218: Environment: Microsoft Windows 10 Python v3.5.x Standalone Spark Cluster was: Microsoft Windows 10 Python v3.5.x > Caching a DataFrame with >200 columns ~nulls the contents > - > > Key: SPARK-17218 > URL: https://issues.apache.org/jira/browse/SPARK-17218 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.2 > Environment: Microsoft Windows 10 > Python v3.5.x > Standalone Spark Cluster >Reporter: Shea Parkes > > Caching a DataFrame with >200 columns causes the contents to be ~nulled. > This is quite a painful bug for us and caused us to place all sorts of > bandaid bypasses in our production work recently. > Minimally reproducible example: > {code:python} > from pyspark.sql import SQLContext > import tempfile > sqlContext = SQLContext(sc) > path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet' > list_df_varnames = [] > list_df_values = [] > for i in range(210): > list_df_varnames.append('var'+str(i)) > list_df_values.append(str(i)) > test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames) > test_df.show() # Still looks okay > print(test_df.collect()) # Still looks okay > test_df.cache() # When everything goes awry > test_df.show() # All values have been ~nulled > print(test_df.collect()) # Still looks okay > # Serialize and read back from parquet now > test_df.write.parquet(path_fail_parquet) > loaded_df = sqlContext.read.parquet(path_fail_parquet) > loaded_df.show() # All values have been ~nulled > print(loaded_df.collect()) # All values have been ~nulled > {code} > As shown in the example above, the underlying RDD seems to survive the > caching, but as soon as we serialize to parquet the data corruption becomes > complete. > This is occurring on Windows 10 with Python 3.5.x. We're running a Spark > Standalone cluster. Everything works fine with <200 columns/fields. We have > Kyro serialization turned on at the moment, but the same error manifested > when we turned it off. > I will try to get this tested on Spark 2.0.0 in the near future, but I > generally steer clear of x.0.0 releases as best I can. > I tried to search for another issue related to this and came up with nothing. > My apologies if I missed it; there doesn't seem to be a good combination of > keywords to describe this glitch. > Happy to provide more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394508#comment-15394508 ] Shea Parkes commented on SPARK-12261: - I still can't get this bug to reproduce reliably locally, but I can confirm that my proposed bandaid of exhausting the iterator at the end of {{takeUpToNumLeft()}} made the error go away entirely in manual testing. Would the Spark maintainers be willing to accept such a band-aid into the main codebase, or is this something I'd just need to maintain in our own Spark distributions? (I already maintain a couple other modifications related to Windows mess.) A more root cause fix would require more Scala knowledge than I have. I'd likely propose to define a new constant (e.g. {{SpecialLengths.PYTHON_WORKER_EXITING}}) and put logic into Scala that would make it immediately stop sending new data over the socket... > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write error > Then i ran the same code for a small text file, this time .take() worked fine. > How can i solve this problem? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391731#comment-15391731 ] Shea Parkes commented on SPARK-12261: - Also, I added extensive logging to {{worker.py}} in my environment to figure out much of this. In my experience, {{worker.py}} never crashes, it just {{sys.exit(-1)}} itself. > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write error > Then i ran the same code for a small text file, this time .take() worked fine. > How can i solve this problem? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391728#comment-15391728 ] Shea Parkes commented on SPARK-12261: - Alright, I've been spending time off and on for a week on this. I think I better understand what's going on, but don't yet really have it nailed down. I'm going to try and write down what I understand is going on here to get my thoughts in order. I've been focusing on {{branch-1.6}} since that's what we have in production. When calling {{RDD.take()}} in {{rdd.py}}, it does not push the request down to a ~scala implementation. Instead, it defines a closure/generator ({{takeUpToNumLeft()}}) and pushes that into a {{RDD.mapPartitions()}}. {{takeUpToNumLeft()}} does **not** exhaust the iterator that it is given; it yields only as many items as requested and then exits. Next is the interplay between {{worker.py}} and {{PythonRDD.scala}}. These two files setup bi-directional streams to each other and communicate some gnarly state back and forth. The important part is what happens in {{worker.py}} when the provided generator (i.e. {{takeUpToNumLeft()}} does not exhaust the stream of data being provided by {{PythonRDD.scala}}. When this happens, {{worker.py}} sends a {{SpecialLengths.END_OF_DATA_SECTION}}, followed by any accumulators, and then sends a **second** {{SpecialLengths.END_OF_DATA_SECTION}} and then kills itself. I'm much better at Python than Scala, so I'm now trying to understand what happens when {{PythonRDD.scala}} receives that second {{SpecialLengths.END_OF_DATA_SECTION}}. In particular, I don't see anywhere that {{PythonRDD.scala}} would treat the second {{SpecialLengths.END_OF_DATA_SECTION}} any different. This means that {{PythonRDD.scala}} would go on to try and read the accumulator information from the stream again, but {{worker.py}} would have already exited, so it doesn't find anything. {{PythonRDD.scala}} actually fails when trying to **send** data as I understand it though, which is where my understanding of Scala is a little rough. If I'm assuming the code in {{PythonRDD.scala}} works similar to a Python generator, then I'm assuming it's acting something like a co-routine and isn't stopping when it gets the second {{SpecialLengths.END_OF_DATA_SECTION}}. This still doesn't explain why this works 95% of the time for us (i.e. we only get intermittent failures). Also another track to chase is that Windows is treated differently in {{PythonWorkerFactory.scala}}, so I'm occasionally trying to figure out if {{worker.py}} being ran as a subprocess makes any difference... So I'm going to try and spend more time with understanding what triggers more data emission from {{PythonRDD.scala}} and why it keeps emitting after a second {{SpecialLengths.END_OF_DATA_SECTION}}. A simple band-aid would be to alter {{takeUpToNumLeft()}} in {{rdd.py}} to exhaust the iterator provided, but I'm not sure that's a root cause fix. Was it intentional to allow {{RDD.mapPartitions()}} to accept generators that did not exhaust their streams? > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write e
[jira] [Commented] (SPARK-12261) pyspark crash for large dataset
[ https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15380846#comment-15380846 ] Shea Parkes commented on SPARK-12261: - I believe I'm hitting the same bug. I'm also running on Windows in Standalone Cluster mode. Unfortunately, the error is non-deterministic (i.e. I've had no luck in creating an always reproducible scenario). I promise I have plenty of RAM, and I'm not working with *that* big of data. I too can try and capture better logging. As Chris hinted to though, which logs are you looking for? I can grab logs from the master, workers, and application, but it doesn't appear obvious where to grab logs from the python workers that are spawned. I'm happy to read documentation (and code) to try and chase down appropriate logs, but I haven't found any helpful directions yet. > pyspark crash for large dataset > --- > > Key: SPARK-12261 > URL: https://issues.apache.org/jira/browse/SPARK-12261 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.5.2 > Environment: windows >Reporter: zihao > > I tried to import a local text(over 100mb) file via textFile in pyspark, when > i ran data.take(), it failed and gave error messages including: > 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; > aborting job > Traceback (most recent call last): > File "E:/spark_python/test3.py", line 9, in > lines.take(5) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, > in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line > 916, in runJob > port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, > partitions) > File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in > __call__ > answer, self.gateway_client, self.target_id, self.name) > File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line > 36, in deco > return f(*a, **kw) > File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in > get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 > (TID 0, localhost): java.net.SocketException: Connection reset by peer: > socket write error > Then i ran the same code for a small text file, this time .take() worked fine. > How can i solve this problem? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13842) Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType
[ https://issues.apache.org/jira/browse/SPARK-13842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15231491#comment-15231491 ] Shea Parkes commented on SPARK-13842: - Pull request is available (https://github.com/apache/spark/pull/12251). I did go ahead and make the {{names}} and {{_needSerializeAnyField}} attributes lazy while I was at it. I'll try to ping you guys appropriately on there. > Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType > -- > > Key: SPARK-13842 > URL: https://issues.apache.org/jira/browse/SPARK-13842 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Shea Parkes >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > It would be nice to consider adding \_\_iter\_\_ and \_\_getitem\_\_ to > {{pyspark.sql.types.StructType}}. Here are some simplistic suggestions: > {code} > def __iter__(self): > """Iterate the fields upon request.""" > return iter(self.fields) > def __getitem__(self, key): > """Return the corresponding StructField""" > _fields_dict = dict(zip(self.names, self.fields)) > try: > return _fields_dict[key] > except KeyError: > raise KeyError('No field named {}'.format(key)) > {code} > I realize the latter might be a touch more controversial since there could be > name collisions. Still, I doubt there are that many in practice and it would > be quite nice to work with. > Privately, I have more extensive metadata extraction methods overlaid on this > class, but I imagine the rest of what I have done might go too far for the > common user. If this request gains traction though, I'll share those other > layers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13842) Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType
[ https://issues.apache.org/jira/browse/SPARK-13842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15231201#comment-15231201 ] Shea Parkes commented on SPARK-13842: - I'm willing to give it a first pass. Need to go dig up what Python testing framework you guys are using, but that shouldn't be too hard. Unless anyone objects, I'd like to move StructType.names and StructType._needSerializeAnyField to properties at the same time. Should be a seamless refactor and cut down on the likelihood of future errors. Might even get to it tonight. Thanks! > Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType > -- > > Key: SPARK-13842 > URL: https://issues.apache.org/jira/browse/SPARK-13842 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 1.6.1 >Reporter: Shea Parkes >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > It would be nice to consider adding \_\_iter\_\_ and \_\_getitem\_\_ to > {{pyspark.sql.types.StructType}}. Here are some simplistic suggestions: > {code} > def __iter__(self): > """Iterate the fields upon request.""" > return iter(self.fields) > def __getitem__(self, key): > """Return the corresponding StructField""" > _fields_dict = dict(zip(self.names, self.fields)) > try: > return _fields_dict[key] > except KeyError: > raise KeyError('No field named {}'.format(key)) > {code} > I realize the latter might be a touch more controversial since there could be > name collisions. Still, I doubt there are that many in practice and it would > be quite nice to work with. > Privately, I have more extensive metadata extraction methods overlaid on this > class, but I imagine the rest of what I have done might go too far for the > common user. If this request gains traction though, I'll share those other > layers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13842) Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType
Shea Parkes created SPARK-13842: --- Summary: Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType Key: SPARK-13842 URL: https://issues.apache.org/jira/browse/SPARK-13842 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 1.6.1 Reporter: Shea Parkes Priority: Minor It would be nice to consider adding \_\_iter\_\_ and \_\_getitem\_\_ to {{pyspark.sql.types.StructType}}. Here are some simplistic suggestions: {code} def __iter__(self): """Iterate the fields upon request.""" return iter(self.fields) def __getitem__(self, key): """Return the corresponding StructField""" _fields_dict = dict(zip(self.names, self.fields)) try: return _fields_dict[key] except KeyError: raise KeyError('No field named {}'.format(key)) {code} I realize the latter might be a touch more controversial since there could be name collisions. Still, I doubt there are that many in practice and it would be quite nice to work with. Privately, I have more extensive metadata extraction methods overlaid on this class, but I imagine the rest of what I have done might go too far for the common user. If this request gains traction though, I'll share those other layers. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
[ https://issues.apache.org/jira/browse/SPARK-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14941953#comment-14941953 ] Shea Parkes commented on SPARK-10847: - My apologies, I just read your patch and see you made it work even with Pythonic Nulls. You rule sir; thanks a bunch. > Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure > > > Key: SPARK-10847 > URL: https://issues.apache.org/jira/browse/SPARK-10847 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 1.5.0 > Environment: Windows 7 > java version "1.8.0_60" (64bit) > Python 3.4.x > Standalone cluster mode (not local[n]; a full local cluster) >Reporter: Shea Parkes >Priority: Minor > > If the optional metadata passed to `pyspark.sql.types.StructField` includes a > pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a > very cryptic/unhelpful error. > Here is a minimal reproducible example: > {code:none} > # Assumes sc exists > import pyspark.sql.types as types > sqlContext = SQLContext(sc) > literal_metadata = types.StructType([ > types.StructField( > 'name', > types.StringType(), > nullable=True, > metadata={'comment': 'From accounting system.'} > ), > types.StructField( > 'age', > types.IntegerType(), > nullable=True, > metadata={'comment': None} > ), > ]) > literal_rdd = sc.parallelize([ > ['Bob', 34], > ['Dan', 42], > ]) > print(literal_rdd.take(2)) > failed_dataframe = sqlContext.createDataFrame( > literal_rdd, > literal_metadata, > ) > {code} > This produces the following ~stacktrace: > {noformat} > Traceback (most recent call last): > File "", line 1, in > File "", line 28, in > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", > line 408, in createDataFrame > jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", > line 538, in __call__ > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py", > line 36, in deco > return f(*a, **kw) > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o757.applySchemaToPythonRDD. > : java.lang.RuntimeException: Do not support type class scala.Tuple2. > at > org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160) > at > org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127) > at > org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173) > at > org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) > at > org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148) > at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96) > at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961) > at > org.apache.spark.sql.SQLContext.applySchemaToPythonRDD(SQLContext.scala:970) > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > at java.lang.reflect.Method.invoke(Unknown Source) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Unknown Source) > {noformat} > I believe the most important line of the traceback is this one: >
[jira] [Commented] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
[ https://issues.apache.org/jira/browse/SPARK-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14941952#comment-14941952 ] Shea Parkes commented on SPARK-10847: - I appreciate your assistance! I think your proposal is an improvement, but I think it would be better if the failure was triggered upon the creation of the StructType object - that's where the error actually occurred. The distance between the definition of the metadata and the import was much larger in my project; I think your new error message would still have me looking for NULL values in my data (instead of my metadata). That's likely a part of my unfamiliarity of Scala, but I chased as far down the pyspark code as I could go and didn't figure it out without trial and error. I realize this might mean traversing an arbitrary dictionary in the StructType initialization looking for unallowed types, which might be unacceptable. It would still be much more in line with "Crash Early, Crash Often" philosophy if it were possible to bomb at the creation of the metadata. Thanks again for the assistance! > Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure > > > Key: SPARK-10847 > URL: https://issues.apache.org/jira/browse/SPARK-10847 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 1.5.0 > Environment: Windows 7 > java version "1.8.0_60" (64bit) > Python 3.4.x > Standalone cluster mode (not local[n]; a full local cluster) >Reporter: Shea Parkes >Priority: Minor > > If the optional metadata passed to `pyspark.sql.types.StructField` includes a > pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a > very cryptic/unhelpful error. > Here is a minimal reproducible example: > {code:none} > # Assumes sc exists > import pyspark.sql.types as types > sqlContext = SQLContext(sc) > literal_metadata = types.StructType([ > types.StructField( > 'name', > types.StringType(), > nullable=True, > metadata={'comment': 'From accounting system.'} > ), > types.StructField( > 'age', > types.IntegerType(), > nullable=True, > metadata={'comment': None} > ), > ]) > literal_rdd = sc.parallelize([ > ['Bob', 34], > ['Dan', 42], > ]) > print(literal_rdd.take(2)) > failed_dataframe = sqlContext.createDataFrame( > literal_rdd, > literal_metadata, > ) > {code} > This produces the following ~stacktrace: > {noformat} > Traceback (most recent call last): > File "", line 1, in > File "", line 28, in > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", > line 408, in createDataFrame > jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", > line 538, in __call__ > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py", > line 36, in deco > return f(*a, **kw) > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o757.applySchemaToPythonRDD. > : java.lang.RuntimeException: Do not support type class scala.Tuple2. > at > org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160) > at > org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127) > at > org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173) > at > org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) > at > org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148) > at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96) > at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961) > at > org.apache.spark.
[jira] [Commented] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
[ https://issues.apache.org/jira/browse/SPARK-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933924#comment-14933924 ] Shea Parkes commented on SPARK-10847: - This issue caused me to learn enough about Scala only to learn that the exception still wasn't helpful once I even knew what a scala.Tuple2 was. I'm not planning on doing any further work on this, so to the extent you were waiting to avoid duplication of efforts with me, feel free to go ahead and knock it out. I'm not entirely familiar with the contribution guidelines, but I'm sure you can work them out. In case it wasn't clear above, the line that triggers the error is: {code:none} metadata={'comment': None} {code} Thanks for the interest! > Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure > > > Key: SPARK-10847 > URL: https://issues.apache.org/jira/browse/SPARK-10847 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 1.5.0 > Environment: Windows 7 > java version "1.8.0_60" (64bit) > Python 3.4.x > Standalone cluster mode (not local[n]; a full local cluster) >Reporter: Shea Parkes >Priority: Minor > > If the optional metadata passed to `pyspark.sql.types.StructField` includes a > pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a > very cryptic/unhelpful error. > Here is a minimal reproducible example: > {code:none} > # Assumes sc exists > import pyspark.sql.types as types > sqlContext = SQLContext(sc) > literal_metadata = types.StructType([ > types.StructField( > 'name', > types.StringType(), > nullable=True, > metadata={'comment': 'From accounting system.'} > ), > types.StructField( > 'age', > types.IntegerType(), > nullable=True, > metadata={'comment': None} > ), > ]) > literal_rdd = sc.parallelize([ > ['Bob', 34], > ['Dan', 42], > ]) > print(literal_rdd.take(2)) > failed_dataframe = sqlContext.createDataFrame( > literal_rdd, > literal_metadata, > ) > {code} > This produces the following ~stacktrace: > {noformat} > Traceback (most recent call last): > File "", line 1, in > File "", line 28, in > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", > line 408, in createDataFrame > jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", > line 538, in __call__ > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py", > line 36, in deco > return f(*a, **kw) > File > "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o757.applySchemaToPythonRDD. > : java.lang.RuntimeException: Do not support type class scala.Tuple2. > at > org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160) > at > org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127) > at > org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173) > at > org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) > at > org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148) > at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96) > at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961) > at > org.apache.spark.sql.SQLContext.applySchemaToPythonRDD(SQLContext.scala:970) > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > at java.lang.reflect.Method.invoke(Unknown Source) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.Refle
[jira] [Created] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
Shea Parkes created SPARK-10847: --- Summary: Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure Key: SPARK-10847 URL: https://issues.apache.org/jira/browse/SPARK-10847 Project: Spark Issue Type: Bug Components: PySpark, SQL Affects Versions: 1.5.0 Environment: Windows 7 java version "1.8.0_60" (64bit) Python 3.4.x Standalone cluster mode (not local[n]; a full local cluster) Reporter: Shea Parkes Priority: Minor If the optional metadata passed to `pyspark.sql.types.StructField` includes a pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a very cryptic/unhelpful error. Here is a minimal reproducible example: {code:none} # Assumes sc exists import pyspark.sql.types as types sqlContext = SQLContext(sc) literal_metadata = types.StructType([ types.StructField( 'name', types.StringType(), nullable=True, metadata={'comment': 'From accounting system.'} ), types.StructField( 'age', types.IntegerType(), nullable=True, metadata={'comment': None} ), ]) literal_rdd = sc.parallelize([ ['Bob', 34], ['Dan', 42], ]) print(literal_rdd.take(2)) failed_dataframe = sqlContext.createDataFrame( literal_rdd, literal_metadata, ) {code} This produces the following ~stacktrace: {noformat} Traceback (most recent call last): File "", line 1, in File "", line 28, in File "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py", line 408, in createDataFrame jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) File "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__ File "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py", line 36, in deco return f(*a, **kw) File "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o757.applySchemaToPythonRDD. : java.lang.RuntimeException: Do not support type class scala.Tuple2. at org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160) at org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127) at org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173) at org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) at org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148) at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96) at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961) at org.apache.spark.sql.SQLContext.applySchemaToPythonRDD(SQLContext.scala:970) at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) {noformat} I believe the most important line of the traceback is this one: {noformat} py4j.protocol.Py4JJavaError: An error occurred while calling o757.applySchemaToPythonRDD. : java.lang.RuntimeException: Do not support type class scala.Tuple2. {noformat} But it wasn't enough for me to figure out the problem; I had to steadily simplify my program until I could identify what caused the problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@sp