[jira] [Commented] (SPARK-19116) LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file

2017-08-04 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16115176#comment-16115176
 ] 

Shea Parkes commented on SPARK-19116:
-

Apologies for not responding earlier.  I'm struggling to recall all the nuances 
of my original issue here, but your response does seem to be an acceptable 
answer.

It's not intuitive to me, and we've pretty much gone over to explicit broadcast 
joining in all of our production work, but if this is as intended, then I'm 
fine closing this down...

> LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file
> -
>
> Key: SPARK-19116
> URL: https://issues.apache.org/jira/browse/SPARK-19116
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.0.1, 2.0.2
> Environment: Python 3.5.x
> Windows 10
>Reporter: Shea Parkes
>
> We're having some modestly severe issues with broadcast join inference, and 
> I've been chasing them through the join heuristics in the catalyst engine.  
> I've made it as far as I can, and I've hit upon something that does not make 
> any sense to me.
> I thought that loading from parquet would be a RelationPlan, which would just 
> use the sum of default sizeInBytes for each column times the number of rows.  
> But this trivial example shows that I am not correct:
> {code}
> import pyspark.sql.functions as F
> df_range = session.range(100).select(F.col('id').cast('integer'))
> df_range.write.parquet('c:/scratch/hundred_integers.parquet')
> df_parquet = session.read.parquet('c:/scratch/hundred_integers.parquet')
> df_parquet.explain(True)
> # Expected sizeInBytes
> integer_default_sizeinbytes = 4
> print(df_parquet.count() * integer_default_sizeinbytes)  # = 400
> # Inferred sizeInBytes
> print(df_parquet._jdf.logicalPlan().statistics().sizeInBytes())  # = 2318
> # For posterity (Didn't really expect this to match anything above)
> print(df_range._jdf.logicalPlan().statistics().sizeInBytes())  # = 600
> {code}
> And here's the results of explain(True) on df_parquet:
> {code}
> In [456]: == Parsed Logical Plan ==
> Relation[id#794] parquet
> == Analyzed Logical Plan ==
> id: int
> Relation[id#794] parquet
> == Optimized Logical Plan ==
> Relation[id#794] parquet
> == Physical Plan ==
> *BatchedScan parquet [id#794] Format: ParquetFormat, InputPaths: 
> file:/c:/scratch/hundred_integers.parquet, PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}
> So basically, I'm not understanding well how the size of the parquet file is 
> being estimated.  I don't expect it to be extremely accurate, but empirically 
> it's so inaccurate that we're having to mess with autoBroadcastJoinThreshold 
> way too much.  (It's not always too high like the example above, it's often 
> way too low.)
> Without deeper understanding, I'm considering a result of 2318 instead of 400 
> to be a bug.  My apologies if I'm missing something obvious.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20683) Make table uncache chaining optional

2017-05-09 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16003864#comment-16003864
 ] 

Shea Parkes commented on SPARK-20683:
-

For anyone that found this issue and just wants to revert to the old behavior 
in their own fork, the following change in 
{{sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala}} 
worked for us:

{code}
-  if (cd.plan.find(_.sameResult(plan)).isDefined) {
+  if (cd.plan.sameResult(plan)) {
{code}

> Make table uncache chaining optional
> 
>
> Key: SPARK-20683
> URL: https://issues.apache.org/jira/browse/SPARK-20683
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
> Environment: Not particularly environment sensitive.  
> Encountered/tested on Linux and Windows.
>Reporter: Shea Parkes
>
> A recent change was made in SPARK-19765 that causes table uncaching to chain. 
>  That is, if table B is a child of table A, and they are both cached, now 
> uncaching table A will automatically uncache table B.
> At first I did not understand the need for this, but when reading the unit 
> tests, I see that it is likely that many people do not keep named references 
> to the child table (e.g. B).  Perhaps B is just made and cached as some part 
> of data exploration.  In that situation, it makes sense for B to 
> automatically be uncached when you are finished with A.
> However, we commonly utilize a different design pattern that is now harmed by 
> this automatic uncaching.  It is common for us to cache table A to then make 
> two, independent children tables (e.g. B and C).  Once those two child tables 
> are realized and cached, we'd then uncache table A (as it was no longer 
> needed and could be quite large).  After this change now, when we uncache 
> table A, we suddenly lose our cached status on both table B and C (which is 
> quite frustrating).  All of these tables are often quite large, and we view 
> what we're doing as mindful memory management.  We are maintaining named 
> references to B and C at all times, so we can always uncache them ourselves 
> when it makes sense.
> Would it be acceptable/feasible to make this table uncache chaining optional? 
>  I would be fine if the default is for the chaining to happen, as long as we 
> can turn it off via parameters.
> If acceptable, I can try to work towards making the required changes.  I am 
> most comfortable in Python (and would want the optional parameter surfaced in 
> Python), but have found the places required to make this change in Scala 
> (since I reverted the functionality in a private fork already).  Any help 
> would be greatly appreciated however.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20683) Make table uncache chaining optional

2017-05-09 Thread Shea Parkes (JIRA)
Shea Parkes created SPARK-20683:
---

 Summary: Make table uncache chaining optional
 Key: SPARK-20683
 URL: https://issues.apache.org/jira/browse/SPARK-20683
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.1
 Environment: Not particularly environment sensitive.  
Encountered/tested on Linux and Windows.
Reporter: Shea Parkes


A recent change was made in SPARK-19765 that causes table uncaching to chain.  
That is, if table B is a child of table A, and they are both cached, now 
uncaching table A will automatically uncache table B.

At first I did not understand the need for this, but when reading the unit 
tests, I see that it is likely that many people do not keep named references to 
the child table (e.g. B).  Perhaps B is just made and cached as some part of 
data exploration.  In that situation, it makes sense for B to automatically be 
uncached when you are finished with A.

However, we commonly utilize a different design pattern that is now harmed by 
this automatic uncaching.  It is common for us to cache table A to then make 
two, independent children tables (e.g. B and C).  Once those two child tables 
are realized and cached, we'd then uncache table A (as it was no longer needed 
and could be quite large).  After this change now, when we uncache table A, we 
suddenly lose our cached status on both table B and C (which is quite 
frustrating).  All of these tables are often quite large, and we view what 
we're doing as mindful memory management.  We are maintaining named references 
to B and C at all times, so we can always uncache them ourselves when it makes 
sense.

Would it be acceptable/feasible to make this table uncache chaining optional?  
I would be fine if the default is for the chaining to happen, as long as we can 
turn it off via parameters.

If acceptable, I can try to work towards making the required changes.  I am 
most comfortable in Python (and would want the optional parameter surfaced in 
Python), but have found the places required to make this change in Scala (since 
I reverted the functionality in a private fork already).  Any help would be 
greatly appreciated however.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12261) pyspark crash for large dataset

2017-03-16 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928179#comment-15928179
 ] 

Shea Parkes edited comment on SPARK-12261 at 3/16/17 2:38 PM:
--

I simply added the following to the end:

{code}
for _ in iterator:
pass
{code}

This will run through the rest of iterator (until the StopIteration exception 
like normal).

Depending how you're making pyspark importable, you might need to make this 
change inside a zipped copy of pyspark as well (e.g. in the binary 
distributions downloadable from Spark's home page).


was (Author: shea.parkes):
I simply added the following to the end:

{code}
for _ in iterator:
pass
{code}

This will run through the rest of iterator (until the StopIteration exception 
like normal).

Depending how you're making pyspark importable, you might need to make this 
change inside a zipped copy of pyspark as well (e.g. in the binary 
distributions available).

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12261) pyspark crash for large dataset

2017-03-16 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928179#comment-15928179
 ] 

Shea Parkes edited comment on SPARK-12261 at 3/16/17 2:38 PM:
--

I simply added the following to the end:

{code}
for _ in iterator:
pass
{code}

This will run through the rest of iterator (until the StopIteration exception 
like normal).

Depending how you're making pyspark importable, you might need to make this 
change inside a zipped copy of pyspark as well (e.g. in the binary 
distributions available).


was (Author: shea.parkes):
I simply added the following to the end:

for _ in iterator:
pass

This will run through the rest of iterator (until the StopIteration exception 
like normal).

Depending how you're making pyspark importable, you might need to make this 
change inside a zipped copy of pyspark as well (e.g. in the binary 
distributions available).

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12261) pyspark crash for large dataset

2017-03-16 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928179#comment-15928179
 ] 

Shea Parkes commented on SPARK-12261:
-

I simply added the following to the end:

for _ in iterator:
pass

This will run through the rest of iterator (until the StopIteration exception 
like normal).

Depending how you're making pyspark importable, you might need to make this 
change inside a zipped copy of pyspark as well (e.g. in the binary 
distributions available).

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18541) Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API

2017-02-14 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866393#comment-15866393
 ] 

Shea Parkes commented on SPARK-18541:
-

Thank you very much!

> Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management 
> in pyspark SQL API
> 
>
> Key: SPARK-18541
> URL: https://issues.apache.org/jira/browse/SPARK-18541
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 2.0.2
> Environment: all
>Reporter: Shea Parkes
>Assignee: Shea Parkes
>Priority: Minor
>  Labels: newbie
> Fix For: 2.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> In the Scala SQL API, you can pass in new metadata when you alias a field.  
> That functionality is not available in the Python API.   Right now, you have 
> to painfully utilize {{SparkSession.createDataFrame}} to manipulate the 
> metadata for even a single column.  I would propose to add the following 
> method to {{pyspark.sql.Column}}:
> {code}
> def aliasWithMetadata(self, name, metadata):
> """
> Make a new Column that has the provided alias and metadata.
> Metadata will be processed with json.dumps()
> """
> _context = pyspark.SparkContext._active_spark_context
> _metadata_str = json.dumps(metadata)
> _metadata_jvm = 
> _context._jvm.org.apache.spark.sql.types.Metadata.fromJson(_metadata_str)
> _new_java_column = getattr(self._jc, 'as')(name, _metadata_jvm)
> return Column(_new_java_column)
> {code}
> I can likely complete this request myself if there is any interest for it.  
> Just have to dust off my knowledge of doctest and the location of the python 
> tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19116) LogicalPlan.statistics.sizeInBytes wrong for trivial parquet file

2017-01-07 Thread Shea Parkes (JIRA)
Shea Parkes created SPARK-19116:
---

 Summary: LogicalPlan.statistics.sizeInBytes wrong for trivial 
parquet file
 Key: SPARK-19116
 URL: https://issues.apache.org/jira/browse/SPARK-19116
 Project: Spark
  Issue Type: Bug
  Components: PySpark, SQL
Affects Versions: 2.0.2, 2.0.1
 Environment: Python 3.5.x
Windows 10
Reporter: Shea Parkes


We're having some modestly severe issues with broadcast join inference, and 
I've been chasing them through the join heuristics in the catalyst engine.  
I've made it as far as I can, and I've hit upon something that does not make 
any sense to me.

I thought that loading from parquet would be a RelationPlan, which would just 
use the sum of default sizeInBytes for each column times the number of rows.  
But this trivial example shows that I am not correct:

{code}
import pyspark.sql.functions as F

df_range = session.range(100).select(F.col('id').cast('integer'))
df_range.write.parquet('c:/scratch/hundred_integers.parquet')

df_parquet = session.read.parquet('c:/scratch/hundred_integers.parquet')
df_parquet.explain(True)

# Expected sizeInBytes
integer_default_sizeinbytes = 4
print(df_parquet.count() * integer_default_sizeinbytes)  # = 400

# Inferred sizeInBytes
print(df_parquet._jdf.logicalPlan().statistics().sizeInBytes())  # = 2318

# For posterity (Didn't really expect this to match anything above)
print(df_range._jdf.logicalPlan().statistics().sizeInBytes())  # = 600
{code}

And here's the results of explain(True) on df_parquet:
{code}
In [456]: == Parsed Logical Plan ==
Relation[id#794] parquet

== Analyzed Logical Plan ==
id: int
Relation[id#794] parquet

== Optimized Logical Plan ==
Relation[id#794] parquet

== Physical Plan ==
*BatchedScan parquet [id#794] Format: ParquetFormat, InputPaths: 
file:/c:/scratch/hundred_integers.parquet, PartitionFilters: [], PushedFilters: 
[], ReadSchema: struct
{code}

So basically, I'm not understanding well how the size of the parquet file is 
being estimated.  I don't expect it to be extremely accurate, but empirically 
it's so inaccurate that we're having to mess with autoBroadcastJoinThreshold 
way too much.  (It's not always too high like the example above, it's often way 
too low.)

Without deeper understanding, I'm considering a result of 2318 instead of 400 
to be a bug.  My apologies if I'm missing something obvious.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18541) Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API

2016-11-28 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703755#comment-15703755
 ] 

Shea Parkes commented on SPARK-18541:
-

Yea, I originally did {{aliasWithMetadata}} because I could monkey-patch 
without conflicts, but upon reflection, just changing the existing {{alias}} 
method to accept a {{metadata}} keyword argument should work fine.  I'll see 
about getting a pull request up soon.

> Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management 
> in pyspark SQL API
> 
>
> Key: SPARK-18541
> URL: https://issues.apache.org/jira/browse/SPARK-18541
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 2.0.2
> Environment: all
>Reporter: Shea Parkes
>Priority: Minor
>  Labels: newbie
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> In the Scala SQL API, you can pass in new metadata when you alias a field.  
> That functionality is not available in the Python API.   Right now, you have 
> to painfully utilize {{SparkSession.createDataFrame}} to manipulate the 
> metadata for even a single column.  I would propose to add the following 
> method to {{pyspark.sql.Column}}:
> {code}
> def aliasWithMetadata(self, name, metadata):
> """
> Make a new Column that has the provided alias and metadata.
> Metadata will be processed with json.dumps()
> """
> _context = pyspark.SparkContext._active_spark_context
> _metadata_str = json.dumps(metadata)
> _metadata_jvm = 
> _context._jvm.org.apache.spark.sql.types.Metadata.fromJson(_metadata_str)
> _new_java_column = getattr(self._jc, 'as')(name, _metadata_jvm)
> return Column(_new_java_column)
> {code}
> I can likely complete this request myself if there is any interest for it.  
> Just have to dust off my knowledge of doctest and the location of the python 
> tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18541) Add pyspark.sql.Column.aliasWithMetadata to allow dynamic metadata management in pyspark SQL API

2016-11-22 Thread Shea Parkes (JIRA)
Shea Parkes created SPARK-18541:
---

 Summary: Add pyspark.sql.Column.aliasWithMetadata to allow dynamic 
metadata management in pyspark SQL API
 Key: SPARK-18541
 URL: https://issues.apache.org/jira/browse/SPARK-18541
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, SQL
Affects Versions: 2.0.2
 Environment: all
Reporter: Shea Parkes
Priority: Minor


In the Scala SQL API, you can pass in new metadata when you alias a field.  
That functionality is not available in the Python API.   Right now, you have to 
painfully utilize {{SparkSession.createDataFrame}} to manipulate the metadata 
for even a single column.  I would propose to add the following method to 
{{pyspark.sql.Column}}:

{code}
def aliasWithMetadata(self, name, metadata):
"""
Make a new Column that has the provided alias and metadata.
Metadata will be processed with json.dumps()
"""
_context = pyspark.SparkContext._active_spark_context
_metadata_str = json.dumps(metadata)
_metadata_jvm = 
_context._jvm.org.apache.spark.sql.types.Metadata.fromJson(_metadata_str)
_new_java_column = getattr(self._jc, 'as')(name, _metadata_jvm)
return Column(_new_java_column)
{code}

I can likely complete this request myself if there is any interest for it.  
Just have to dust off my knowledge of doctest and the location of the python 
tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2141) Add sc.getPersistentRDDs() to PySpark

2016-11-14 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15665137#comment-15665137
 ] 

Shea Parkes commented on SPARK-2141:


This would have been nice to have today.  We wanted to clean up some cached 
interim RDDs from a mllib model fit, but there were no references to them in 
the python interpreter.  To get around this, we had to reach into the _jsc 
attribute and utilize the java-based getPersistentRDDs to get references to the 
dang RDDs (just long enough to unpersist them).

> Add sc.getPersistentRDDs() to PySpark
> -
>
> Key: SPARK-2141
> URL: https://issues.apache.org/jira/browse/SPARK-2141
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 1.0.0
>Reporter: Nicholas Chammas
>Assignee: Kan Zhang
>
> PySpark does not appear to have {{sc.getPersistentRDDs()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12261) pyspark crash for large dataset

2016-11-02 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15629182#comment-15629182
 ] 

Shea Parkes commented on SPARK-12261:
-

I'm still maintaining the two-line bandaid to {{takeUpToNumLeft()}} in v2.0.x 
for our purposes (and it's still working).  Given that it has helped another 
person now, would you accept a patch with that bandaid into the mainline code?

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17998) Reading Parquet files coalesces parts into too few in-memory partitions

2016-10-19 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15590680#comment-15590680
 ] 

Shea Parkes commented on SPARK-17998:
-

That definitely answers it.  I would say the default of 128MB makes things a 
little uncomfortable when paired with pyspark and the default python worker 
memory, but now that I understand it we can tune it for our workflow 
appropriately.

Thanks again and I'll close this down.

> Reading Parquet files coalesces parts into too few in-memory partitions
> ---
>
> Key: SPARK-17998
> URL: https://issues.apache.org/jira/browse/SPARK-17998
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.0.0, 2.0.1
> Environment: Spark Standalone Cluster (not "local mode")
> Windows 10 and Windows 7
> Python 3.x
>Reporter: Shea Parkes
>
> Reading a parquet ~file into a DataFrame is resulting in far too few 
> in-memory partitions.  In prior versions of Spark, the resulting DataFrame 
> would have a number of partitions often equal to the number of parts in the 
> parquet folder.
> Here's a minimal reproducible sample:
> {quote}
> df_first = session.range(start=1, end=1, numPartitions=13)
> assert df_first.rdd.getNumPartitions() == 13
> assert session._sc.defaultParallelism == 6
> path_scrap = r"c:\scratch\scrap.parquet"
> df_first.write.parquet(path_scrap)
> df_second = session.read.parquet(path_scrap)
> print(df_second.rdd.getNumPartitions())
> {quote}
> The above shows only 7 partitions in the DataFrame that was created by 
> reading the Parquet back into memory for me.  Why is it no longer just the 
> number of part files in the Parquet folder?  (Which is 13 in the example 
> above.)
> I'm filing this as a bug because it has gotten so bad that we can't work with 
> the underlying RDD without first repartitioning the DataFrame, which is 
> costly and wasteful.  I really doubt this was the intended effect of moving 
> to Spark 2.0.
> I've tried to research where the number of in-memory partitions is 
> determined, but my Scala skills have proven in-adequate.  I'd be happy to dig 
> further if someone could point me in the right direction...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-17998) Reading Parquet files coalesces parts into too few in-memory partitions

2016-10-19 Thread Shea Parkes (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shea Parkes closed SPARK-17998.
---
Resolution: Information Provided

> Reading Parquet files coalesces parts into too few in-memory partitions
> ---
>
> Key: SPARK-17998
> URL: https://issues.apache.org/jira/browse/SPARK-17998
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.0.0, 2.0.1
> Environment: Spark Standalone Cluster (not "local mode")
> Windows 10 and Windows 7
> Python 3.x
>Reporter: Shea Parkes
>
> Reading a parquet ~file into a DataFrame is resulting in far too few 
> in-memory partitions.  In prior versions of Spark, the resulting DataFrame 
> would have a number of partitions often equal to the number of parts in the 
> parquet folder.
> Here's a minimal reproducible sample:
> {quote}
> df_first = session.range(start=1, end=1, numPartitions=13)
> assert df_first.rdd.getNumPartitions() == 13
> assert session._sc.defaultParallelism == 6
> path_scrap = r"c:\scratch\scrap.parquet"
> df_first.write.parquet(path_scrap)
> df_second = session.read.parquet(path_scrap)
> print(df_second.rdd.getNumPartitions())
> {quote}
> The above shows only 7 partitions in the DataFrame that was created by 
> reading the Parquet back into memory for me.  Why is it no longer just the 
> number of part files in the Parquet folder?  (Which is 13 in the example 
> above.)
> I'm filing this as a bug because it has gotten so bad that we can't work with 
> the underlying RDD without first repartitioning the DataFrame, which is 
> costly and wasteful.  I really doubt this was the intended effect of moving 
> to Spark 2.0.
> I've tried to research where the number of in-memory partitions is 
> determined, but my Scala skills have proven in-adequate.  I'd be happy to dig 
> further if someone could point me in the right direction...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17998) Reading Parquet files coalesces parts into too few in-memory partitions

2016-10-18 Thread Shea Parkes (JIRA)
Shea Parkes created SPARK-17998:
---

 Summary: Reading Parquet files coalesces parts into too few 
in-memory partitions
 Key: SPARK-17998
 URL: https://issues.apache.org/jira/browse/SPARK-17998
 Project: Spark
  Issue Type: Bug
  Components: PySpark, SQL
Affects Versions: 2.0.1, 2.0.0
 Environment: Spark Standalone Cluster (not "local mode")
Windows 10 and Windows 7
Python 3.x
Reporter: Shea Parkes


Reading a parquet ~file into a DataFrame is resulting in far too few in-memory 
partitions.  In prior versions of Spark, the resulting DataFrame would have a 
number of partitions often equal to the number of parts in the parquet folder.

Here's a minimal reproducible sample:

{quote}
df_first = session.range(start=1, end=1, numPartitions=13)
assert df_first.rdd.getNumPartitions() == 13
assert session._sc.defaultParallelism == 6

path_scrap = r"c:\scratch\scrap.parquet"
df_first.write.parquet(path_scrap)
df_second = session.read.parquet(path_scrap)

print(df_second.rdd.getNumPartitions())
{quote}

The above shows only 7 partitions in the DataFrame that was created by reading 
the Parquet back into memory for me.  Why is it no longer just the number of 
part files in the Parquet folder?  (Which is 13 in the example above.)

I'm filing this as a bug because it has gotten so bad that we can't work with 
the underlying RDD without first repartitioning the DataFrame, which is costly 
and wasteful.  I really doubt this was the intended effect of moving to Spark 
2.0.

I've tried to research where the number of in-memory partitions is determined, 
but my Scala skills have proven in-adequate.  I'd be happy to dig further if 
someone could point me in the right direction...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17218) Caching a DataFrame with >200 columns ~nulls the contents

2016-08-24 Thread Shea Parkes (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shea Parkes updated SPARK-17218:

Description: 
Caching a DataFrame with >200 columns causes the contents to be ~nulled.  This 
is quite a painful bug for us and caused us to place all sorts of bandaid 
bypasses in our production work recently.

Minimally reproducible example:

{code}
from pyspark.sql import SQLContext
import tempfile

sqlContext = SQLContext(sc)
path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet'

list_df_varnames = []
list_df_values = []
for i in range(210):
list_df_varnames.append('var'+str(i))
list_df_values.append(str(i))

test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames)
test_df.show() # Still looks okay
print(test_df.collect()) # Still looks okay

test_df.cache() # When everything goes awry
test_df.show() # All values have been ~nulled
print(test_df.collect()) # Still looks okay

# Serialize and read back from parquet now
test_df.write.parquet(path_fail_parquet)
loaded_df = sqlContext.read.parquet(path_fail_parquet)

loaded_df.show() # All values have been ~nulled
print(loaded_df.collect()) # All values have been ~nulled
{code}

As shown in the example above, the underlying RDD seems to survive the caching, 
but as soon as we serialize to parquet the data corruption becomes complete.

This is occurring on Windows 10 with Python 3.5.x.  We're running a Spark 
Standalone cluster.  Everything works fine with <200 columns/fields.  We have 
Kyro serialization turned on at the moment, but the same error manifested when 
we turned it off.

I will try to get this tested on Spark 2.0.0 in the near future, but I 
generally steer clear of x.0.0 releases as best I can.

I tried to search for another issue related to this and came up with nothing.  
My apologies if I missed it; there doesn't seem to be a good combination of 
keywords to describe this glitch.

Happy to provide more details.

  was:
Caching a DataFrame with >200 columns causes the contents to be ~nulled.  This 
is quite a painful bug for us and caused us to place all sorts of bandaid 
bypasses in our production work recently.

Minimally reproducible example:

{code:python}
from pyspark.sql import SQLContext
import tempfile

sqlContext = SQLContext(sc)
path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet'

list_df_varnames = []
list_df_values = []
for i in range(210):
list_df_varnames.append('var'+str(i))
list_df_values.append(str(i))

test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames)
test_df.show() # Still looks okay
print(test_df.collect()) # Still looks okay

test_df.cache() # When everything goes awry
test_df.show() # All values have been ~nulled
print(test_df.collect()) # Still looks okay

# Serialize and read back from parquet now
test_df.write.parquet(path_fail_parquet)
loaded_df = sqlContext.read.parquet(path_fail_parquet)

loaded_df.show() # All values have been ~nulled
print(loaded_df.collect()) # All values have been ~nulled
{code}

As shown in the example above, the underlying RDD seems to survive the caching, 
but as soon as we serialize to parquet the data corruption becomes complete.

This is occurring on Windows 10 with Python 3.5.x.  We're running a Spark 
Standalone cluster.  Everything works fine with <200 columns/fields.  We have 
Kyro serialization turned on at the moment, but the same error manifested when 
we turned it off.

I will try to get this tested on Spark 2.0.0 in the near future, but I 
generally steer clear of x.0.0 releases as best I can.

I tried to search for another issue related to this and came up with nothing.  
My apologies if I missed it; there doesn't seem to be a good combination of 
keywords to describe this glitch.

Happy to provide more details.


> Caching a DataFrame with >200 columns ~nulls the contents
> -
>
> Key: SPARK-17218
> URL: https://issues.apache.org/jira/browse/SPARK-17218
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.6.2
> Environment: Microsoft Windows 10
> Python v3.5.x
> Standalone Spark Cluster
>Reporter: Shea Parkes
>
> Caching a DataFrame with >200 columns causes the contents to be ~nulled.  
> This is quite a painful bug for us and caused us to place all sorts of 
> bandaid bypasses in our production work recently.
> Minimally reproducible example:
> {code}
> from pyspark.sql import SQLContext
> import tempfile
> sqlContext = SQLContext(sc)
> path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet'
> list_df_varnames = []
> list_df_values = []
> for i in range(210):
> list_df_varnames.append('var'+str(i))
> list_df_values.append(str(i))
> test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames)
> test_df.show() # St

[jira] [Created] (SPARK-17218) Caching a DataFrame with >200 columns ~nulls the contents

2016-08-24 Thread Shea Parkes (JIRA)
Shea Parkes created SPARK-17218:
---

 Summary: Caching a DataFrame with >200 columns ~nulls the contents
 Key: SPARK-17218
 URL: https://issues.apache.org/jira/browse/SPARK-17218
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 1.6.2
 Environment: Microsoft Windows 10
Python v3.5.x
Reporter: Shea Parkes


Caching a DataFrame with >200 columns causes the contents to be ~nulled.  This 
is quite a painful bug for us and caused us to place all sorts of bandaid 
bypasses in our production work recently.

Minimally reproducible example:

{code:python}
from pyspark.sql import SQLContext
import tempfile

sqlContext = SQLContext(sc)
path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet'

list_df_varnames = []
list_df_values = []
for i in range(210):
list_df_varnames.append('var'+str(i))
list_df_values.append(str(i))

test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames)
test_df.show() # Still looks okay
print(test_df.collect()) # Still looks okay

test_df.cache() # When everything goes awry
test_df.show() # All values have been ~nulled
print(test_df.collect()) # Still looks okay

# Serialize and read back from parquet now
test_df.write.parquet(path_fail_parquet)
loaded_df = sqlContext.read.parquet(path_fail_parquet)

loaded_df.show() # All values have been ~nulled
print(loaded_df.collect()) # All values have been ~nulled
{code}

As shown in the example above, the underlying RDD seems to survive the caching, 
but as soon as we serialize to parquet the data corruption becomes complete.

This is occurring on Windows 10 with Python 3.5.x.  We're running a Spark 
Standalone cluster.  Everything works fine with <200 columns/fields.  We have 
Kyro serialization turned on at the moment, but the same error manifested when 
we turned it off.

I will try to get this tested on Spark 2.0.0 in the near future, but I 
generally steer clear of x.0.0 releases as best I can.

I tried to search for another issue related to this and came up with nothing.  
My apologies if I missed it; there doesn't seem to be a good combination of 
keywords to describe this glitch.

Happy to provide more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17218) Caching a DataFrame with >200 columns ~nulls the contents

2016-08-24 Thread Shea Parkes (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shea Parkes updated SPARK-17218:

Environment: 
Microsoft Windows 10
Python v3.5.x
Standalone Spark Cluster

  was:
Microsoft Windows 10
Python v3.5.x


> Caching a DataFrame with >200 columns ~nulls the contents
> -
>
> Key: SPARK-17218
> URL: https://issues.apache.org/jira/browse/SPARK-17218
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.6.2
> Environment: Microsoft Windows 10
> Python v3.5.x
> Standalone Spark Cluster
>Reporter: Shea Parkes
>
> Caching a DataFrame with >200 columns causes the contents to be ~nulled.  
> This is quite a painful bug for us and caused us to place all sorts of 
> bandaid bypasses in our production work recently.
> Minimally reproducible example:
> {code:python}
> from pyspark.sql import SQLContext
> import tempfile
> sqlContext = SQLContext(sc)
> path_fail_parquet = tempfile.mkdtemp() + '/fail_parquet.parquet'
> list_df_varnames = []
> list_df_values = []
> for i in range(210):
> list_df_varnames.append('var'+str(i))
> list_df_values.append(str(i))
> test_df = sqlContext.createDataFrame([list_df_values], list_df_varnames)
> test_df.show() # Still looks okay
> print(test_df.collect()) # Still looks okay
> test_df.cache() # When everything goes awry
> test_df.show() # All values have been ~nulled
> print(test_df.collect()) # Still looks okay
> # Serialize and read back from parquet now
> test_df.write.parquet(path_fail_parquet)
> loaded_df = sqlContext.read.parquet(path_fail_parquet)
> loaded_df.show() # All values have been ~nulled
> print(loaded_df.collect()) # All values have been ~nulled
> {code}
> As shown in the example above, the underlying RDD seems to survive the 
> caching, but as soon as we serialize to parquet the data corruption becomes 
> complete.
> This is occurring on Windows 10 with Python 3.5.x.  We're running a Spark 
> Standalone cluster.  Everything works fine with <200 columns/fields.  We have 
> Kyro serialization turned on at the moment, but the same error manifested 
> when we turned it off.
> I will try to get this tested on Spark 2.0.0 in the near future, but I 
> generally steer clear of x.0.0 releases as best I can.
> I tried to search for another issue related to this and came up with nothing. 
>  My apologies if I missed it; there doesn't seem to be a good combination of 
> keywords to describe this glitch.
> Happy to provide more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12261) pyspark crash for large dataset

2016-07-26 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394508#comment-15394508
 ] 

Shea Parkes commented on SPARK-12261:
-

I still can't get this bug to reproduce reliably locally, but I can confirm 
that my proposed bandaid of exhausting the iterator at the end of 
{{takeUpToNumLeft()}} made the error go away entirely in manual testing.  Would 
the Spark maintainers be willing to accept such a band-aid into the main 
codebase, or is this something I'd just need to maintain in our own Spark 
distributions? (I already maintain a couple other modifications related to 
Windows mess.)

A more root cause fix would require more Scala knowledge than I have.  I'd 
likely propose to define a new constant (e.g. 
{{SpecialLengths.PYTHON_WORKER_EXITING}}) and put logic into Scala that would 
make it immediately stop sending new data over the socket...

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12261) pyspark crash for large dataset

2016-07-25 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391731#comment-15391731
 ] 

Shea Parkes commented on SPARK-12261:
-

Also, I added extensive logging to {{worker.py}} in my environment to figure 
out much of this.   In my experience, {{worker.py}} never crashes, it just 
{{sys.exit(-1)}} itself.

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12261) pyspark crash for large dataset

2016-07-25 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391728#comment-15391728
 ] 

Shea Parkes commented on SPARK-12261:
-

Alright, I've been spending time off and on for a week on this.  I think I 
better understand what's going on, but don't yet really have it nailed down.  
I'm going to try and write down what I understand is going on here to get my 
thoughts in order.  I've been focusing on {{branch-1.6}} since that's what we 
have in production.

When calling {{RDD.take()}}  in {{rdd.py}}, it does not push the request down 
to a ~scala implementation.  Instead, it defines a closure/generator 
({{takeUpToNumLeft()}}) and  pushes that into a {{RDD.mapPartitions()}}.  
{{takeUpToNumLeft()}} does **not** exhaust the iterator that it is given; it 
yields only as many items as requested and then exits.

Next is the interplay between {{worker.py}} and {{PythonRDD.scala}}.  These two 
files setup bi-directional streams to each other and communicate some gnarly 
state back and forth.  The important part is what happens in {{worker.py}} when 
the provided generator (i.e.  {{takeUpToNumLeft()}} does not exhaust the stream 
of data being provided by {{PythonRDD.scala}}.  When this happens, 
{{worker.py}} sends a {{SpecialLengths.END_OF_DATA_SECTION}}, followed by any 
accumulators, and then sends a **second** 
{{SpecialLengths.END_OF_DATA_SECTION}} and then kills itself.

I'm much better at Python than Scala, so I'm now trying to understand what 
happens when {{PythonRDD.scala}} receives that second 
{{SpecialLengths.END_OF_DATA_SECTION}}.  In particular, I don't see anywhere 
that {{PythonRDD.scala}} would treat the second 
{{SpecialLengths.END_OF_DATA_SECTION}}  any different.  This means that 
{{PythonRDD.scala}} would go on to try and read the accumulator information 
from the stream again, but {{worker.py}} would have already exited, so it 
doesn't find anything.

{{PythonRDD.scala}} actually fails when trying to **send** data as I understand 
it though, which is where my understanding of Scala is a little rough.  If I'm 
assuming the code in {{PythonRDD.scala}} works similar to a Python generator, 
then I'm assuming it's acting something like a co-routine and isn't stopping 
when it gets the second {{SpecialLengths.END_OF_DATA_SECTION}}.

This still doesn't explain why this works 95% of the time for us (i.e. we only 
get intermittent failures).

Also another track to chase is that Windows is treated differently in 
{{PythonWorkerFactory.scala}}, so I'm occasionally trying to figure out if 
{{worker.py}} being ran as a subprocess makes any difference...

So I'm going to try and spend more time with understanding what triggers more 
data emission from {{PythonRDD.scala}} and why it keeps emitting after a second 
{{SpecialLengths.END_OF_DATA_SECTION}}.

A simple band-aid would be to alter {{takeUpToNumLeft()}} in {{rdd.py}} to 
exhaust the iterator provided, but I'm not sure that's a root cause fix.  Was 
it intentional to allow {{RDD.mapPartitions()}} to accept generators that did 
not exhaust their streams?

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write e

[jira] [Commented] (SPARK-12261) pyspark crash for large dataset

2016-07-16 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15380846#comment-15380846
 ] 

Shea Parkes commented on SPARK-12261:
-

I believe I'm hitting the same bug.  I'm also running on Windows in Standalone 
Cluster mode.  Unfortunately, the error is non-deterministic (i.e. I've had no 
luck in creating an always reproducible scenario).

I promise I have plenty of RAM, and I'm not working with *that* big of data.

I too can try and capture better logging.  As Chris hinted to though, which 
logs are you looking for?  I can grab logs from the master, workers, and 
application, but it doesn't appear obvious where to grab logs from the python 
workers that are spawned.

I'm happy to read documentation (and code) to try and chase down appropriate 
logs, but I haven't found any helpful directions yet.

> pyspark crash for large dataset
> ---
>
> Key: SPARK-12261
> URL: https://issues.apache.org/jira/browse/SPARK-12261
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.5.2
> Environment: windows
>Reporter: zihao
>
> I tried to import a local text(over 100mb) file via textFile in pyspark, when 
> i ran data.take(), it failed and gave error messages including:
> 15/12/10 17:17:43 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; 
> aborting job
> Traceback (most recent call last):
>   File "E:/spark_python/test3.py", line 9, in 
> lines.take(5)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1299, 
> in take
> res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\context.py", line 
> 916, in runJob
> port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
>   File "C:\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 813, in 
> __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "D:\spark\spark-1.5.2-bin-hadoop2.6\python\pyspark\sql\utils.py", line 
> 36, in deco
> return f(*a, **kw)
>   File "C:\Anaconda2\lib\site-packages\py4j\protocol.py", line 308, in 
> get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
> (TID 0, localhost): java.net.SocketException: Connection reset by peer: 
> socket write error
> Then i ran the same code for a small text file, this time .take() worked fine.
> How can i solve this problem?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13842) Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType

2016-04-07 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15231491#comment-15231491
 ] 

Shea Parkes commented on SPARK-13842:
-

Pull request is available (https://github.com/apache/spark/pull/12251).  I did 
go ahead and make the {{names}} and {{_needSerializeAnyField}} attributes lazy 
while I was at it.  I'll try to ping you guys appropriately on there.

> Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType
> --
>
> Key: SPARK-13842
> URL: https://issues.apache.org/jira/browse/SPARK-13842
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 1.6.1
>Reporter: Shea Parkes
>Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be nice to consider adding \_\_iter\_\_ and \_\_getitem\_\_ to 
> {{pyspark.sql.types.StructType}}.  Here are some simplistic suggestions:
> {code}
> def __iter__(self):
> """Iterate the fields upon request."""
> return iter(self.fields)
> def __getitem__(self, key):
> """Return the corresponding StructField"""
> _fields_dict = dict(zip(self.names, self.fields))
> try:
> return _fields_dict[key]
> except KeyError:
> raise KeyError('No field named {}'.format(key))
> {code}
> I realize the latter might be a touch more controversial since there could be 
> name collisions.  Still, I doubt there are that many in practice and it would 
> be quite nice to work with.
> Privately, I have more extensive metadata extraction methods overlaid on this 
> class, but I imagine the rest of what I have done might go too far for the 
> common user.  If this request gains traction though, I'll share those other 
> layers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13842) Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType

2016-04-07 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15231201#comment-15231201
 ] 

Shea Parkes commented on SPARK-13842:
-

I'm willing to give it a first pass.  Need to go dig up what Python testing 
framework you guys are using, but that shouldn't be too hard.

Unless anyone objects, I'd like to move StructType.names and 
StructType._needSerializeAnyField to properties at the same time.  Should be a 
seamless refactor and cut down on the likelihood of future errors.

Might even get to it tonight.

Thanks!

> Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType
> --
>
> Key: SPARK-13842
> URL: https://issues.apache.org/jira/browse/SPARK-13842
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 1.6.1
>Reporter: Shea Parkes
>Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be nice to consider adding \_\_iter\_\_ and \_\_getitem\_\_ to 
> {{pyspark.sql.types.StructType}}.  Here are some simplistic suggestions:
> {code}
> def __iter__(self):
> """Iterate the fields upon request."""
> return iter(self.fields)
> def __getitem__(self, key):
> """Return the corresponding StructField"""
> _fields_dict = dict(zip(self.names, self.fields))
> try:
> return _fields_dict[key]
> except KeyError:
> raise KeyError('No field named {}'.format(key))
> {code}
> I realize the latter might be a touch more controversial since there could be 
> name collisions.  Still, I doubt there are that many in practice and it would 
> be quite nice to work with.
> Privately, I have more extensive metadata extraction methods overlaid on this 
> class, but I imagine the rest of what I have done might go too far for the 
> common user.  If this request gains traction though, I'll share those other 
> layers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13842) Consider __iter__ and __getitem__ methods for pyspark.sql.types.StructType

2016-03-12 Thread Shea Parkes (JIRA)
Shea Parkes created SPARK-13842:
---

 Summary: Consider __iter__ and __getitem__ methods for 
pyspark.sql.types.StructType
 Key: SPARK-13842
 URL: https://issues.apache.org/jira/browse/SPARK-13842
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 1.6.1
Reporter: Shea Parkes
Priority: Minor


It would be nice to consider adding \_\_iter\_\_ and \_\_getitem\_\_ to 
{{pyspark.sql.types.StructType}}.  Here are some simplistic suggestions:

{code}
def __iter__(self):
"""Iterate the fields upon request."""
return iter(self.fields)

def __getitem__(self, key):
"""Return the corresponding StructField"""
_fields_dict = dict(zip(self.names, self.fields))
try:
return _fields_dict[key]
except KeyError:
raise KeyError('No field named {}'.format(key))
{code}

I realize the latter might be a touch more controversial since there could be 
name collisions.  Still, I doubt there are that many in practice and it would 
be quite nice to work with.

Privately, I have more extensive metadata extraction methods overlaid on this 
class, but I imagine the rest of what I have done might go too far for the 
common user.  If this request gains traction though, I'll share those other 
layers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure

2015-10-02 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14941953#comment-14941953
 ] 

Shea Parkes commented on SPARK-10847:
-

My apologies, I just read your patch and see you made it work even with 
Pythonic Nulls.  You rule sir; thanks a bunch.

> Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
> 
>
> Key: SPARK-10847
> URL: https://issues.apache.org/jira/browse/SPARK-10847
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 1.5.0
> Environment: Windows 7
> java version "1.8.0_60" (64bit)
> Python 3.4.x
> Standalone cluster mode (not local[n]; a full local cluster)
>Reporter: Shea Parkes
>Priority: Minor
>
> If the optional metadata passed to `pyspark.sql.types.StructField` includes a 
> pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a 
> very cryptic/unhelpful error.
> Here is a minimal reproducible example:
> {code:none}
> # Assumes sc exists
> import pyspark.sql.types as types
> sqlContext = SQLContext(sc)
> literal_metadata = types.StructType([
> types.StructField(
> 'name',
> types.StringType(),
> nullable=True,
> metadata={'comment': 'From accounting system.'}
> ),
> types.StructField(
> 'age',
> types.IntegerType(),
> nullable=True,
> metadata={'comment': None}
> ),
> ])
> literal_rdd = sc.parallelize([
> ['Bob', 34],
> ['Dan', 42],
> ])
> print(literal_rdd.take(2))
> failed_dataframe = sqlContext.createDataFrame(
> literal_rdd,
> literal_metadata,
> )
> {code}
> This produces the following ~stacktrace:
> {noformat}
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "", line 28, in 
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py",
>  line 408, in createDataFrame
> jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py",
>  line 538, in __call__
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py",
>  line 36, in deco
> return f(*a, **kw)
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o757.applySchemaToPythonRDD.
> : java.lang.RuntimeException: Do not support type class scala.Tuple2.
>   at 
> org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160)
>   at 
> org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127)
>   at 
> org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173)
>   at 
> org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
>   at 
> org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148)
>   at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96)
>   at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961)
>   at 
> org.apache.spark.sql.SQLContext.applySchemaToPythonRDD(SQLContext.scala:970)
>   at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>   at java.lang.reflect.Method.invoke(Unknown Source)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>   at py4j.Gateway.invoke(Gateway.java:259)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:207)
>   at java.lang.Thread.run(Unknown Source)
> {noformat}
> I believe the most important line of the traceback is this one:
>

[jira] [Commented] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure

2015-10-02 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14941952#comment-14941952
 ] 

Shea Parkes commented on SPARK-10847:
-

I appreciate your assistance!  I think your proposal is an improvement, but I 
think it would be better if the failure was triggered upon the creation of the 
StructType object - that's where the error actually occurred.

The distance between the definition of the metadata and the import was much 
larger in my project; I think your new error message would still have me 
looking for NULL values in my data (instead of my metadata).  That's likely a 
part of my unfamiliarity of Scala, but I chased as far down the pyspark code as 
I could go and didn't figure it out without trial and error.

I realize this might mean traversing an arbitrary dictionary in the StructType 
initialization looking for unallowed types, which might be unacceptable.  It 
would still be much more in line with "Crash Early, Crash Often" philosophy if 
it were possible to bomb at the creation of the metadata.

Thanks again for the assistance!

> Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
> 
>
> Key: SPARK-10847
> URL: https://issues.apache.org/jira/browse/SPARK-10847
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 1.5.0
> Environment: Windows 7
> java version "1.8.0_60" (64bit)
> Python 3.4.x
> Standalone cluster mode (not local[n]; a full local cluster)
>Reporter: Shea Parkes
>Priority: Minor
>
> If the optional metadata passed to `pyspark.sql.types.StructField` includes a 
> pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a 
> very cryptic/unhelpful error.
> Here is a minimal reproducible example:
> {code:none}
> # Assumes sc exists
> import pyspark.sql.types as types
> sqlContext = SQLContext(sc)
> literal_metadata = types.StructType([
> types.StructField(
> 'name',
> types.StringType(),
> nullable=True,
> metadata={'comment': 'From accounting system.'}
> ),
> types.StructField(
> 'age',
> types.IntegerType(),
> nullable=True,
> metadata={'comment': None}
> ),
> ])
> literal_rdd = sc.parallelize([
> ['Bob', 34],
> ['Dan', 42],
> ])
> print(literal_rdd.take(2))
> failed_dataframe = sqlContext.createDataFrame(
> literal_rdd,
> literal_metadata,
> )
> {code}
> This produces the following ~stacktrace:
> {noformat}
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "", line 28, in 
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py",
>  line 408, in createDataFrame
> jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py",
>  line 538, in __call__
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py",
>  line 36, in deco
> return f(*a, **kw)
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o757.applySchemaToPythonRDD.
> : java.lang.RuntimeException: Do not support type class scala.Tuple2.
>   at 
> org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160)
>   at 
> org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127)
>   at 
> org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173)
>   at 
> org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
>   at 
> org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148)
>   at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96)
>   at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961)
>   at 
> org.apache.spark.

[jira] [Commented] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure

2015-09-28 Thread Shea Parkes (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933924#comment-14933924
 ] 

Shea Parkes commented on SPARK-10847:
-

This issue caused me to learn enough about Scala only to learn that the 
exception still wasn't helpful once I even knew what a scala.Tuple2 was.

I'm not planning on doing any further work on this, so to the extent you were 
waiting to avoid duplication of efforts with me, feel free to go ahead and 
knock it out.  I'm not entirely familiar with the contribution guidelines, but 
I'm sure you can work them out.

In case it wasn't clear above, the line that triggers the error is:
{code:none}
metadata={'comment': None}
{code}

Thanks for the interest!

> Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
> 
>
> Key: SPARK-10847
> URL: https://issues.apache.org/jira/browse/SPARK-10847
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 1.5.0
> Environment: Windows 7
> java version "1.8.0_60" (64bit)
> Python 3.4.x
> Standalone cluster mode (not local[n]; a full local cluster)
>Reporter: Shea Parkes
>Priority: Minor
>
> If the optional metadata passed to `pyspark.sql.types.StructField` includes a 
> pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a 
> very cryptic/unhelpful error.
> Here is a minimal reproducible example:
> {code:none}
> # Assumes sc exists
> import pyspark.sql.types as types
> sqlContext = SQLContext(sc)
> literal_metadata = types.StructType([
> types.StructField(
> 'name',
> types.StringType(),
> nullable=True,
> metadata={'comment': 'From accounting system.'}
> ),
> types.StructField(
> 'age',
> types.IntegerType(),
> nullable=True,
> metadata={'comment': None}
> ),
> ])
> literal_rdd = sc.parallelize([
> ['Bob', 34],
> ['Dan', 42],
> ])
> print(literal_rdd.take(2))
> failed_dataframe = sqlContext.createDataFrame(
> literal_rdd,
> literal_metadata,
> )
> {code}
> This produces the following ~stacktrace:
> {noformat}
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "", line 28, in 
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py",
>  line 408, in createDataFrame
> jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py",
>  line 538, in __call__
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py",
>  line 36, in deco
> return f(*a, **kw)
>   File 
> "S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o757.applySchemaToPythonRDD.
> : java.lang.RuntimeException: Do not support type class scala.Tuple2.
>   at 
> org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160)
>   at 
> org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127)
>   at 
> org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173)
>   at 
> org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
>   at 
> org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148)
>   at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96)
>   at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961)
>   at 
> org.apache.spark.sql.SQLContext.applySchemaToPythonRDD(SQLContext.scala:970)
>   at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>   at java.lang.reflect.Method.invoke(Unknown Source)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>   at py4j.reflection.Refle

[jira] [Created] (SPARK-10847) Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure

2015-09-27 Thread Shea Parkes (JIRA)
Shea Parkes created SPARK-10847:
---

 Summary: Pyspark - DataFrame - Optional Metadata with `None` 
triggers cryptic failure
 Key: SPARK-10847
 URL: https://issues.apache.org/jira/browse/SPARK-10847
 Project: Spark
  Issue Type: Bug
  Components: PySpark, SQL
Affects Versions: 1.5.0
 Environment: Windows 7
java version "1.8.0_60" (64bit)
Python 3.4.x

Standalone cluster mode (not local[n]; a full local cluster)
Reporter: Shea Parkes
Priority: Minor


If the optional metadata passed to `pyspark.sql.types.StructField` includes a 
pythonic `None`, the `pyspark.SparkContext.createDataFrame` will fail with a 
very cryptic/unhelpful error.

Here is a minimal reproducible example:
{code:none}
# Assumes sc exists
import pyspark.sql.types as types
sqlContext = SQLContext(sc)


literal_metadata = types.StructType([
types.StructField(
'name',
types.StringType(),
nullable=True,
metadata={'comment': 'From accounting system.'}
),
types.StructField(
'age',
types.IntegerType(),
nullable=True,
metadata={'comment': None}
),
])

literal_rdd = sc.parallelize([
['Bob', 34],
['Dan', 42],
])
print(literal_rdd.take(2))

failed_dataframe = sqlContext.createDataFrame(
literal_rdd,
literal_metadata,
)
{code}

This produces the following ~stacktrace:
{noformat}
Traceback (most recent call last):
  File "", line 1, in 
  File "", line 28, in 
  File 
"S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\context.py",
 line 408, in createDataFrame
jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
  File 
"S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py",
 line 538, in __call__
  File 
"S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\pyspark\sql\utils.py",
 line 36, in deco
return f(*a, **kw)
  File 
"S:\ZQL\Software\Hotware\spark-1.5.0-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py",
 line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o757.applySchemaToPythonRDD.
: java.lang.RuntimeException: Do not support type class scala.Tuple2.
at 
org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:160)
at 
org.apache.spark.sql.types.Metadata$$anonfun$fromJObject$1.apply(Metadata.scala:127)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.types.Metadata$.fromJObject(Metadata.scala:127)
at 
org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:173)
at 
org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
at 
org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:148)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:148)
at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:96)
at org.apache.spark.sql.SQLContext.parseDataType(SQLContext.scala:961)
at 
org.apache.spark.sql.SQLContext.applySchemaToPythonRDD(SQLContext.scala:970)
at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)
{noformat}

I believe the most important line of the traceback is this one:
{noformat}
py4j.protocol.Py4JJavaError: An error occurred while calling 
o757.applySchemaToPythonRDD.
: java.lang.RuntimeException: Do not support type class scala.Tuple2.
{noformat}

But it wasn't enough for me to figure out the problem; I had to steadily 
simplify my program until I could identify what caused the problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@sp