[jira] [Commented] (SPARK-18274) Memory leak in PySpark StringIndexer

2016-11-10 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656431#comment-15656431
 ] 

Joseph K. Bradley commented on SPARK-18274:
---

Adding one more TODO for this task:
The current fix for this is to put a {{__del__}} method in JavaWrapper which 
releases the Java object.  But that exposes another bug: copy should be 
implemented within JavaParams, not JavaModel.  Otherwise, JavaEvaluator (which 
inherits from JavaParams) can be copied to produce multiple Python instances 
(which should be treated independently) all of which link to the same Java 
object.  Changing one instance will then change others.

> Memory leak in PySpark StringIndexer
> 
>
> Key: SPARK-18274
> URL: https://issues.apache.org/jira/browse/SPARK-18274
> Project: Spark
>  Issue Type: Bug
>  Components: ML, PySpark
>Affects Versions: 1.5.2, 1.6.3, 2.0.1, 2.0.2, 2.1.0
>Reporter: Jonas Amrich
>Priority: Critical
>
> StringIndexerModel won't get collected by GC in Java even when deleted in 
> Python. It can be reproduced by this code, which fails after couple of 
> iterations (around 7 if you set driver memory to 600MB): 
> {code}
> import random, string
> from pyspark.ml.feature import StringIndexer
> l = [(''.join(random.choice(string.ascii_uppercase) for _ in range(10)), ) 
> for _ in range(int(7e5))]  # 70 random strings of 10 characters
> df = spark.createDataFrame(l, ['string'])
> for i in range(50):
> indexer = StringIndexer(inputCol='string', outputCol='index')
> indexer.fit(df)
> {code}
> Explicit call to Python GC fixes the issue - following code runs fine:
> {code}
> for i in range(50):
> indexer = StringIndexer(inputCol='string', outputCol='index')
> indexer.fit(df)
> gc.collect()
> {code}
> The issue is similar to SPARK-6194 and can be probably fixed by calling jvm 
> detach in model's destructor. This is implemented in 
> pyspark.mlib.common.JavaModelWrapper but missing in 
> pyspark.ml.wrapper.JavaWrapper. Other models in ml package may also be 
> affected by this memory leak. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18411) Add Argument Types and Test Cases for String Functions

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18411:


Assignee: Apache Spark  (was: Xiao Li)

> Add Argument Types and Test Cases for String Functions
> --
>
> Key: SPARK-18411
> URL: https://issues.apache.org/jira/browse/SPARK-18411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>Assignee: Apache Spark
>
> Add argument types and test cases into the extended descriptions of string 
> functions. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18411) Add Argument Types and Test Cases for String Functions

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18411:


Assignee: Xiao Li  (was: Apache Spark)

> Add Argument Types and Test Cases for String Functions
> --
>
> Key: SPARK-18411
> URL: https://issues.apache.org/jira/browse/SPARK-18411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>
> Add argument types and test cases into the extended descriptions of string 
> functions. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18411) Add Argument Types and Test Cases for String Functions

2016-11-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656407#comment-15656407
 ] 

Apache Spark commented on SPARK-18411:
--

User 'gatorsmile' has created a pull request for this issue:
https://github.com/apache/spark/pull/15850

> Add Argument Types and Test Cases for String Functions
> --
>
> Key: SPARK-18411
> URL: https://issues.apache.org/jira/browse/SPARK-18411
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>Assignee: Xiao Li
>
> Add argument types and test cases into the extended descriptions of string 
> functions. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18411) Add Argument Types and Test Cases for String Functions

2016-11-10 Thread Xiao Li (JIRA)
Xiao Li created SPARK-18411:
---

 Summary: Add Argument Types and Test Cases for String Functions
 Key: SPARK-18411
 URL: https://issues.apache.org/jira/browse/SPARK-18411
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.0
Reporter: Xiao Li
Assignee: Xiao Li


Add argument types and test cases into the extended descriptions of string 
functions. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18410) Add structured kafka example

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18410:


Assignee: Apache Spark

> Add structured kafka example
> 
>
> Key: SPARK-18410
> URL: https://issues.apache.org/jira/browse/SPARK-18410
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2
>Reporter: Genmao Yu
>Assignee: Apache Spark
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18410) Add structured kafka example

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18410:


Assignee: (was: Apache Spark)

> Add structured kafka example
> 
>
> Key: SPARK-18410
> URL: https://issues.apache.org/jira/browse/SPARK-18410
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2
>Reporter: Genmao Yu
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18410) Add structured kafka example

2016-11-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656387#comment-15656387
 ] 

Apache Spark commented on SPARK-18410:
--

User 'uncleGen' has created a pull request for this issue:
https://github.com/apache/spark/pull/15849

> Add structured kafka example
> 
>
> Key: SPARK-18410
> URL: https://issues.apache.org/jira/browse/SPARK-18410
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2
>Reporter: Genmao Yu
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18274) Memory leak in PySpark StringIndexer

2016-11-10 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-18274:
--
Affects Version/s: 2.1.0
   2.0.2
   1.6.3
   1.5.2
 Target Version/s: 2.0.3, 2.1.0
 Priority: Critical  (was: Major)
  Component/s: ML

> Memory leak in PySpark StringIndexer
> 
>
> Key: SPARK-18274
> URL: https://issues.apache.org/jira/browse/SPARK-18274
> Project: Spark
>  Issue Type: Bug
>  Components: ML, PySpark
>Affects Versions: 1.5.2, 1.6.3, 2.0.1, 2.0.2, 2.1.0
>Reporter: Jonas Amrich
>Priority: Critical
>
> StringIndexerModel won't get collected by GC in Java even when deleted in 
> Python. It can be reproduced by this code, which fails after couple of 
> iterations (around 7 if you set driver memory to 600MB): 
> {code}
> import random, string
> from pyspark.ml.feature import StringIndexer
> l = [(''.join(random.choice(string.ascii_uppercase) for _ in range(10)), ) 
> for _ in range(int(7e5))]  # 70 random strings of 10 characters
> df = spark.createDataFrame(l, ['string'])
> for i in range(50):
> indexer = StringIndexer(inputCol='string', outputCol='index')
> indexer.fit(df)
> {code}
> Explicit call to Python GC fixes the issue - following code runs fine:
> {code}
> for i in range(50):
> indexer = StringIndexer(inputCol='string', outputCol='index')
> indexer.fit(df)
> gc.collect()
> {code}
> The issue is similar to SPARK-6194 and can be probably fixed by calling jvm 
> detach in model's destructor. This is implemented in 
> pyspark.mlib.common.JavaModelWrapper but missing in 
> pyspark.ml.wrapper.JavaWrapper. Other models in ml package may also be 
> affected by this memory leak. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18410) Add structured kafka example

2016-11-10 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-18410:
-

 Summary: Add structured kafka example
 Key: SPARK-18410
 URL: https://issues.apache.org/jira/browse/SPARK-18410
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18227) Parquet file stream sink create a hidden directory "_spark_metadata" cause the DataFrame read from directory failed

2016-11-10 Thread Lantao Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656317#comment-15656317
 ] 

Lantao Jin commented on SPARK-18227:


[~marmbrus], oh, yeah. That's the root cause, thanks.

> Parquet file stream sink create a hidden directory "_spark_metadata" cause 
> the DataFrame read from directory failed
> ---
>
> Key: SPARK-18227
> URL: https://issues.apache.org/jira/browse/SPARK-18227
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.1
>Reporter: Lantao Jin
>
> When we set an out directory as a streaming sink with parquet format in 
> structured streaming,  as the streaming job running, all output parquet files 
> will be written to this out directory. However, it also creates a hidden 
> directory called "_spark_metadata" in the out directory. If we load the 
> parquet files from the out directory by "load", it will throw 
> RuntimeException and task failed.
> {code:java}
> val stream = modifiedData.writeStream.format("parquet")
> .option("checkpointLocation", "/path/ck/")
> .start("/path/out/")
> val df1 = spark.read.format("parquet").load("/path/out/*")
> {code}
> {panel}
> 16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID 
> 3131, cupid044.stratus.phx.ebay.com): java.lang.Ru
> ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too 
> s
> mall)   
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
> at 
> org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec
> ordReaderBase.java:107)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec
> ordReader.java:109)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:367)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:341)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Sour
> ce) 
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> {panel}
> That's because the ParquetFileReader reads the metadata file as a parquet 
> format. 
> I thought the smooth way to fix it is moving the metadata directory to 
> another path, but from the code DataSource.scala, it has less path 
> information except out directory path to store into. So maybe skipping hidden 
> files and paths could be a better way. But from the stack trace above, it 
> failed in initialize() in SpecificParquetRecordReaderBase. It means  that 
> metadata files in hidden directory have been traversed in upper 
> invocation(FileScanRDD). But in there, no format info can be known to skip a 
> hidden directory(or over authority).
> So, what is the best way to fix it?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-18227) Parquet file stream sink create a hidden directory "_spark_metadata" cause the DataFrame read from directory failed

2016-11-10 Thread Lantao Jin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lantao Jin closed SPARK-18227.
--

> Parquet file stream sink create a hidden directory "_spark_metadata" cause 
> the DataFrame read from directory failed
> ---
>
> Key: SPARK-18227
> URL: https://issues.apache.org/jira/browse/SPARK-18227
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.1
>Reporter: Lantao Jin
>
> When we set an out directory as a streaming sink with parquet format in 
> structured streaming,  as the streaming job running, all output parquet files 
> will be written to this out directory. However, it also creates a hidden 
> directory called "_spark_metadata" in the out directory. If we load the 
> parquet files from the out directory by "load", it will throw 
> RuntimeException and task failed.
> {code:java}
> val stream = modifiedData.writeStream.format("parquet")
> .option("checkpointLocation", "/path/ck/")
> .start("/path/out/")
> val df1 = spark.read.format("parquet").load("/path/out/*")
> {code}
> {panel}
> 16/11/02 03:49:40 WARN TaskSetManager: Lost task 1.0 in stage 110.0 (TID 
> 3131, cupid044.stratus.phx.ebay.com): java.lang.Ru
> ntimeException: hdfs:///path/out/_spark_metadata/0 is not a Parquet file (too 
> s
> mall)   
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
> at 
> org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRec
> ordReaderBase.java:107)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRec
> ordReader.java:109)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:367)
> at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFor
> mat.scala:341)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Sour
> ce) 
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> {panel}
> That's because the ParquetFileReader reads the metadata file as a parquet 
> format. 
> I thought the smooth way to fix it is moving the metadata directory to 
> another path, but from the code DataSource.scala, it has less path 
> information except out directory path to store into. So maybe skipping hidden 
> files and paths could be a better way. But from the stack trace above, it 
> failed in initialize() in SpecificParquetRecordReaderBase. It means  that 
> metadata files in hidden directory have been traversed in upper 
> invocation(FileScanRDD). But in there, no format info can be known to skip a 
> hidden directory(or over authority).
> So, what is the best way to fix it?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656254#comment-15656254
 ] 

Nicholas Chammas commented on SPARK-18367:
--

Ah, sounds like the correct explanation to me. So in my repro script, we should 
expect the shuffle to generate up to 8 * 200 = 1,600 files. If we add that to 
the ~500 files which Spark seems to open before the join, this puts us over 
2,000 open files.

Thank you for this valuable tip. I will experiment with 
{{spark.shuffle.sort.bypassMergeThreshold}} tomorrow and confirm here that it 
solves my problem.

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>Assignee: Reynold Xin
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin reopened SPARK-18367:
-

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin reassigned SPARK-18367:
---

Assignee: Reynold Xin

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>Assignee: Reynold Xin
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin closed SPARK-18367.
---
Resolution: Not A Problem

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>Assignee: Reynold Xin
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin closed SPARK-18367.
---
Resolution: Won't Fix

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656234#comment-15656234
 ] 

Reynold Xin commented on SPARK-18367:
-

Actually try set "spark.shuffle.sort.bypassMergeThreshold" to a smaller number 
(say 10). The max number of files Spark will create without you explicitly 
changing the number of partitions is bypassMergeThreshold * N, where N is the 
number of partitions pre-shuffle. bypassMergeThreshold default is 200.




> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656234#comment-15656234
 ] 

Reynold Xin edited comment on SPARK-18367 at 11/11/16 5:46 AM:
---

Actually try set "spark.shuffle.sort.bypassMergeThreshold" to a smaller number 
(say 10). The max number of files Spark will create without you explicitly 
changing the number of partitions is bypassMergeThreshold * N, where N is the 
number of partitions pre-shuffle. bypassMergeThreshold default is 200.

The reason you didn't see it with a small number of distinct keys is because 
there is a short cut to not create a file if there is no record matching that 
hash bucket.

This is working as expected.



was (Author: rxin):
Actually try set "spark.shuffle.sort.bypassMergeThreshold" to a smaller number 
(say 10). The max number of files Spark will create without you explicitly 
changing the number of partitions is bypassMergeThreshold * N, where N is the 
number of partitions pre-shuffle. bypassMergeThreshold default is 200.




> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656230#comment-15656230
 ] 

Nicholas Chammas commented on SPARK-18367:
--

How are you monitoring the number of open files? I can share the script I am 
using if you're interested.

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656228#comment-15656228
 ] 

Nicholas Chammas commented on SPARK-18367:
--

Tomorrow I'll try running this on a Linux VM. Maybe this is specific to macOS.

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656224#comment-15656224
 ] 

Reynold Xin commented on SPARK-18367:
-

I just tried your thing and didn't see large number of open files.

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656199#comment-15656199
 ] 

Nicholas Chammas edited comment on SPARK-18367 at 11/11/16 5:30 AM:


Looks to be 8 partitions before the exchange.

Here's a tweaked repro script to show the number of partitions before and after 
the join:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
df.explain()
print('partitions:', df.rdd.getNumPartitions())

df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
df.explain()
print('partitions:', df.rdd.getNumPartitions())

df.show(1)
{code}

Output:

{code}
== Physical Plan ==
Scan ExistingRDD[a#0L]
partitions: 8
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
partitions: 200
16/11/11 00:27:32 WARN TaskSetManager: Stage 0 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
16/11/11 00:27:33 WARN TaskSetManager: Stage 1 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
+---+   
|  a|
+---+
| 26|
+---+
only showing top 1 row
{code}



was (Author: nchammas):
Tweaked repro script to show partitions before and after the join:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
df.explain()
print('partitions:', df.rdd.getNumPartitions())

df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
df.explain()
print('partitions:', df.rdd.getNumPartitions())

df.show(1)
{code}

Output:

{code}
== Physical Plan ==
Scan ExistingRDD[a#0L]
partitions: 8
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
partitions: 200
16/11/11 00:27:32 WARN TaskSetManager: Stage 0 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
16/11/11 00:27:33 WARN TaskSetManager: Stage 1 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
+---+   
|  a|
+---+
| 26|
+---+
only showing top 1 row
{code}


> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here 

[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656199#comment-15656199
 ] 

Nicholas Chammas commented on SPARK-18367:
--

Tweaked repro script to show partitions before and after the join:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
df.explain()
print('partitions:', df.rdd.getNumPartitions())

df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
df.explain()
print('partitions:', df.rdd.getNumPartitions())

df.show(1)
{code}

Output:

{code}
== Physical Plan ==
Scan ExistingRDD[a#0L]
partitions: 8
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
partitions: 200
16/11/11 00:27:32 WARN TaskSetManager: Stage 0 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
16/11/11 00:27:33 WARN TaskSetManager: Stage 1 contains a task of very large 
size (510 KB). The maximum recommended task size is 100 KB.
+---+   
|  a|
+---+
| 26|
+---+
only showing top 1 row
{code}


> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656187#comment-15656187
 ] 

Reynold Xin commented on SPARK-18367:
-

What's the number of partitions before the exchange? Each partition should 
create only one file. What you are observing doesn't make sense here.


> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656066#comment-15656066
 ] 

Nicholas Chammas commented on SPARK-18367:
--

I noticed that if I generate a DataFrame with fewer distinct keys but a similar 
or even greater number of rows, Spark holds fewer files open during the join.

For example:

{code}
df = spark.createDataFrame([
Row(a=n)
for n in range(1000)
])
# gives us ~1M rows
for _ in range(5):
df = df.union(df)
{code}

Plugging that into the repro script above in the ticket description, I see that 
Spark opens <1K files, compared to >2K files when there are 500K distinct keys.

I suppose this may all just be a natural consequence of how joins are 
implemented Spark. Perhaps I need to start thinking about how to manage the 
number of distinct keys when I join DataFrames? That would suck, but if this is 
a design limitation then what can you do...

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18353) spark.rpc.askTimeout defalut value is not 120s

2016-11-10 Thread Jason Pan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656029#comment-15656029
 ] 

Jason Pan commented on SPARK-18353:
---

Yes, The pull requests make it can be set at least.

Thanks.



> spark.rpc.askTimeout defalut value is not 120s
> --
>
> Key: SPARK-18353
> URL: https://issues.apache.org/jira/browse/SPARK-18353
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1, 2.0.1
> Environment: Linux zzz 3.10.0-327.el7.x86_64 #1 SMP Thu Oct 29 
> 17:29:29 EDT 2015 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Jason Pan
>Priority: Critical
>
> in http://spark.apache.org/docs/latest/configuration.html 
> spark.rpc.askTimeout  120sDuration for an RPC ask operation to wait 
> before timing out
> the defalut value is 120s as documented.
> However when I run "spark-summit" with standalone cluster mode:
> the cmd is:
> Launch Command: "/opt/jdk1.8.0_102/bin/java" "-cp" 
> "/opt/spark-2.0.1-bin-hadoop2.7/conf/:/opt/spark-2.0.1-bin-hadoop2.7/jars/*" 
> "-Xmx1024M" "-Dspark.eventLog.enabled=true" 
> "-Dspark.master=spark://9.111.159.127:7101" "-Dspark.driver.supervise=false" 
> "-Dspark.app.name=org.apache.spark.examples.SparkPi" 
> "-Dspark.submit.deployMode=cluster" 
> "-Dspark.jars=file:/opt/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar"
>  "-Dspark.history.ui.port=18087" "-Dspark.rpc.askTimeout=10" 
> "-Dspark.history.fs.logDirectory=file:/opt/tmp/spark-event" 
> "-Dspark.eventLog.dir=file:///opt/tmp/spark-event" 
> "org.apache.spark.deploy.worker.DriverWrapper" 
> "spark://Worker@9.111.159.127:7103" 
> "/opt/spark-2.0.1-bin-hadoop2.7/work/driver-20161109031939-0002/spark-examples-1.6.1-hadoop2.6.0.jar"
>  "org.apache.spark.examples.SparkPi" "1000"
> Dspark.rpc.askTimeout=10
> the value is 10, it is not the same as document.
> Note: when I summit to REST URL, it has no this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656019#comment-15656019
 ] 

Nicholas Chammas edited comment on SPARK-18367 at 11/11/16 3:44 AM:


Here is the output of {{lsof}} on all the pids owned by Spark as it holds 2K+ 
files open at once. (attached to ticket)

It appears that the explosion is coming mostly from files like this:

{code}
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-4cab1943-7482-4dcd-8374-5209f21e1ddc/1c/temp_shuffle_6f0c4324-d2be-47ad-9f18-2cba2ed9cdd1
{code}


was (Author: nchammas):
Here is the output of {{lsof}} on all the pids owned by Spark as it holds 2K+ 
files open at once.

It appears that the explosion is coming mostly from files like this:

{code}
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-4cab1943-7482-4dcd-8374-5209f21e1ddc/1c/temp_shuffle_6f0c4324-d2be-47ad-9f18-2cba2ed9cdd1
{code}

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-
Attachment: spark-lsof.txt

Here is the output of {{lsof}} on all the pids owned by Spark as it holds 2K+ 
files open at once.

It appears that the explosion is coming mostly from files like this:

{code}
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-4cab1943-7482-4dcd-8374-5209f21e1ddc/1c/temp_shuffle_6f0c4324-d2be-47ad-9f18-2cba2ed9cdd1
{code}

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: spark-lsof.txt
>
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18409) LSH approxNearestNeighbors should use approxQuantile instead of sort

2016-11-10 Thread Joseph K. Bradley (JIRA)
Joseph K. Bradley created SPARK-18409:
-

 Summary: LSH approxNearestNeighbors should use approxQuantile 
instead of sort
 Key: SPARK-18409
 URL: https://issues.apache.org/jira/browse/SPARK-18409
 Project: Spark
  Issue Type: Improvement
  Components: ML
Reporter: Joseph K. Bradley


LSHModel.approxNearestNeighbors sorts the full dataset on the hashDistance in 
order to find a threshold.  It should use approxQuantile instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4851) "Uninitialized staticmethod object" error in PySpark

2016-11-10 Thread Ott Toomet (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655738#comment-15655738
 ] 

Ott Toomet edited comment on SPARK-4851 at 11/11/16 1:15 AM:
-

In which version is this fixed?  I still get "uninitialized staticmethod 
object" when running the example on 2.1.0 snapshot (compiled today, Nov 10th)


was (Author: otoomet):
In which version is this fixed?  I still get "uninitialized staticmethod 
object" when running on 2.1.0 snapshot (compiled today, Nov 10th)

> "Uninitialized staticmethod object" error in PySpark
> 
>
> Key: SPARK-4851
> URL: https://issues.apache.org/jira/browse/SPARK-4851
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.0.2, 1.2.0, 1.3.0
>Reporter: Nadav Grossug
>Priority: Minor
>
> *Reproduction:*
> {code}
> class A:
> @staticmethod
> def foo(self, x):
> return x
> sc.parallelize([1]).map(lambda x: A.foo(x)).count()
> {code}
> This gives
> {code}
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 
> (TID 3, localhost): org.apache.spark.api.python.PythonException: Traceback 
> (most recent call last):
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/worker.py", line 107, 
> in main
> process()
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/worker.py", line 98, 
> in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 2070, 
> in pipeline_func
> return func(split, prev_func(split, iterator))
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 2070, 
> in pipeline_func
> return func(split, prev_func(split, iterator))
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 2070, 
> in pipeline_func
> return func(split, prev_func(split, iterator))
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 247, in 
> func
> return f(iterator)
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 818, in 
> 
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 818, in 
> 
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "", line 1, in 
> RuntimeError: uninitialized staticmethod object
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:136)
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:173)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:95)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4851) "Uninitialized staticmethod object" error in PySpark

2016-11-10 Thread Ott Toomet (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655738#comment-15655738
 ] 

Ott Toomet commented on SPARK-4851:
---

In which version is this fixed?  I still get "uninitialized staticmethod 
object" when running on 2.1.0 snapshot (compiled today, Nov 10th)

> "Uninitialized staticmethod object" error in PySpark
> 
>
> Key: SPARK-4851
> URL: https://issues.apache.org/jira/browse/SPARK-4851
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.0.2, 1.2.0, 1.3.0
>Reporter: Nadav Grossug
>Priority: Minor
>
> *Reproduction:*
> {code}
> class A:
> @staticmethod
> def foo(self, x):
> return x
> sc.parallelize([1]).map(lambda x: A.foo(x)).count()
> {code}
> This gives
> {code}
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 
> (TID 3, localhost): org.apache.spark.api.python.PythonException: Traceback 
> (most recent call last):
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/worker.py", line 107, 
> in main
> process()
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/worker.py", line 98, 
> in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 2070, 
> in pipeline_func
> return func(split, prev_func(split, iterator))
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 2070, 
> in pipeline_func
> return func(split, prev_func(split, iterator))
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 2070, 
> in pipeline_func
> return func(split, prev_func(split, iterator))
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 247, in 
> func
> return f(iterator)
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 818, in 
> 
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/Users/joshrosen/Documents/Spark/python/pyspark/rdd.py", line 818, in 
> 
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "", line 1, in 
> RuntimeError: uninitialized staticmethod object
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:136)
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:173)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:95)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18401) SparkR random forest should support output original label

2016-11-10 Thread Yanbo Liang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanbo Liang resolved SPARK-18401.
-
   Resolution: Fixed
 Assignee: Yanbo Liang
Fix Version/s: 2.1.0

> SparkR random forest should support output original label
> -
>
> Key: SPARK-18401
> URL: https://issues.apache.org/jira/browse/SPARK-18401
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SparkR
>Reporter: Yanbo Liang
>Assignee: Yanbo Liang
> Fix For: 2.1.0
>
>
> SparkR {{spark.randomForest}} classification prediction should output 
> original label rather than the indexed label. This issue is very similar with 
> SPARK-18291.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16757) Set up caller context to HDFS and Yarn

2016-11-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-16757:
--
Component/s: YARN

> Set up caller context to HDFS and Yarn
> --
>
> Key: SPARK-16757
> URL: https://issues.apache.org/jira/browse/SPARK-16757
> Project: Spark
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: Weiqing Yang
>Assignee: Weiqing Yang
> Fix For: 2.1.0
>
>
> In this jira, Spark will invoke hadoop caller context api to set up its 
> caller context to HDFS.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17108) BIGINT and INT comparison failure in spark sql

2016-11-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-17108:
--
Component/s: SQL

> BIGINT and INT comparison failure in spark sql
> --
>
> Key: SPARK-17108
> URL: https://issues.apache.org/jira/browse/SPARK-17108
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Sai Krishna Kishore Beathanabhotla
>Assignee: Weiqing Yang
> Fix For: 2.1.0
>
>
> I have a Hive table with the following definition:
> {noformat}
> create table testforerror (
> my_column MAP
> );
> {noformat}
> The table has the following records
> {noformat}
> hive> select * from testforerror;
> OK
> {11001:["0034111000a4WaAAA2"]}
> {11001:["0034111000orWiWAAU"]}
> {11001:["","0034111000VgrHdAAJ"]}
> {11001:["003411cS4rDAAS"]}
> {12001:["0037110001a7ofsAAA"]}
> Time taken: 0.067 seconds, Fetched: 5 row(s)
> {noformat}
> I have a query which filters records with key of the my_column. The query is 
> as follows
> {noformat}
> select * from testforerror where my_column[11001] is not null;
> {noformat}
> This query is executing fine from hive/beeline shell and producing the 
> following records:
> {noformat}
> hive> select * from testforerror where my_column[11001] is not null;
> OK
> {11001:["0034111000a4WaAAA2"]}
> {11001:["0034111000orWiWAAU"]}
> {11001:["","0034111000VgrHdAAJ"]}
> {11001:["003411cS4rDAAS"]}
> Time taken: 2.224 seconds, Fetched: 4 row(s)
> {noformat}
> But however I get an error when trying to execute from spark sqlContext. The 
> following is the error message:
> {noformat}
> scala> val errorquery = "select * from testforerror where my_column[11001] is 
> not null"
> errorquery: String = select * from testforerror where my_column[11001] is not 
> null
> scala> sqlContext.sql(errorquery).show()
> org.apache.spark.sql.AnalysisException: cannot resolve 'my_column[11001]' due 
> to data type mismatch: argument 2 requires bigint type, however, '11001' is 
> of int type.; line 1 pos 43
> at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)
> at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17471) Add compressed method for Matrix class

2016-11-10 Thread DB Tsai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-17471:

Shepherd: DB Tsai

> Add compressed method for Matrix class
> --
>
> Key: SPARK-17471
> URL: https://issues.apache.org/jira/browse/SPARK-17471
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Seth Hendrickson
>Assignee: Seth Hendrickson
>
> Vectors in Spark have a {{compressed}} method which selects either sparse or 
> dense representation by minimizing storage requirements. Matrices should also 
> have this method, which is now explicitly needed in {{LogisticRegression}} 
> since we have implemented multiclass regression.
> The compressed method should also give the option to store row major or 
> column major, and if nothing is specified should select the lower storage 
> representation (for sparse).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17471) Add compressed method for Matrix class

2016-11-10 Thread DB Tsai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai reassigned SPARK-17471:
---

Assignee: DB Tsai

> Add compressed method for Matrix class
> --
>
> Key: SPARK-17471
> URL: https://issues.apache.org/jira/browse/SPARK-17471
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Seth Hendrickson
>Assignee: DB Tsai
>
> Vectors in Spark have a {{compressed}} method which selects either sparse or 
> dense representation by minimizing storage requirements. Matrices should also 
> have this method, which is now explicitly needed in {{LogisticRegression}} 
> since we have implemented multiclass regression.
> The compressed method should also give the option to store row major or 
> column major, and if nothing is specified should select the lower storage 
> representation (for sparse).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17471) Add compressed method for Matrix class

2016-11-10 Thread DB Tsai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DB Tsai updated SPARK-17471:

Assignee: Seth Hendrickson  (was: DB Tsai)

> Add compressed method for Matrix class
> --
>
> Key: SPARK-17471
> URL: https://issues.apache.org/jira/browse/SPARK-17471
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Seth Hendrickson
>Assignee: Seth Hendrickson
>
> Vectors in Spark have a {{compressed}} method which selects either sparse or 
> dense representation by minimizing storage requirements. Matrices should also 
> have this method, which is now explicitly needed in {{LogisticRegression}} 
> since we have implemented multiclass regression.
> The compressed method should also give the option to store row major or 
> column major, and if nothing is specified should select the lower storage 
> representation (for sparse).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18185) Should fix INSERT OVERWRITE TABLE of Datasource tables with dynamic partitions

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin resolved SPARK-18185.
-
   Resolution: Fixed
 Assignee: Eric Liang
Fix Version/s: 2.1.0

> Should fix INSERT OVERWRITE TABLE of Datasource tables with dynamic partitions
> --
>
> Key: SPARK-18185
> URL: https://issues.apache.org/jira/browse/SPARK-18185
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Eric Liang
>Assignee: Eric Liang
> Fix For: 2.1.0
>
>
> As of current 2.1, INSERT OVERWRITE with dynamic partitions against a 
> Datasource table will overwrite the entire table instead of only the updated 
> partitions as in Hive. It also doesn't respect custom partition locations.
> We should delete only the proper partitions, scan the metastore for affected 
> partitions with custom locations, and ensure that deletes/writes go to the 
> right locations for those as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10628) Add support for arbitrary RandomRDD generation to PySparkAPI

2016-11-10 Thread Sandeep Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655691#comment-15655691
 ] 

Sandeep Singh commented on SPARK-10628:
---

[~holdenk] I'm interested in working on this, but have a question.
We need to provide an interface like `RandomDataGenerator` or into which we can 
pass lambda(not sure how we can state this way) so the question is how do we 
pass these to the RandomRDD on scala ?

> Add support for arbitrary RandomRDD generation to PySparkAPI
> 
>
> Key: SPARK-10628
> URL: https://issues.apache.org/jira/browse/SPARK-10628
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib, PySpark
>Reporter: holdenk
>Priority: Minor
>
> SPARK-2724 added support for specific RandomRDDs, add support for arbitrary 
> Random RDD generation for feature parity with Scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9487) Use the same num. worker threads in Scala/Python unit tests

2016-11-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655682#comment-15655682
 ] 

Apache Spark commented on SPARK-9487:
-

User 'skanjila' has created a pull request for this issue:
https://github.com/apache/spark/pull/15848

> Use the same num. worker threads in Scala/Python unit tests
> ---
>
> Key: SPARK-9487
> URL: https://issues.apache.org/jira/browse/SPARK-9487
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core, SQL, Tests
>Affects Versions: 1.5.0
>Reporter: Xiangrui Meng
>  Labels: starter
> Attachments: ContextCleanerSuiteResults, HeartbeatReceiverSuiteResults
>
>
> In Python we use `local[4]` for unit tests, while in Scala/Java we use 
> `local[2]` and `local` for some unit tests in SQL, MLLib, and other 
> components. If the operation depends on partition IDs, e.g., random number 
> generator, this will lead to different result in Python and Scala/Java. It 
> would be nice to use the same number in all unit tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18387) Test that expressions can be serialized

2016-11-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655644#comment-15655644
 ] 

Apache Spark commented on SPARK-18387:
--

User 'rdblue' has created a pull request for this issue:
https://github.com/apache/spark/pull/15847

> Test that expressions can be serialized
> ---
>
> Key: SPARK-18387
> URL: https://issues.apache.org/jira/browse/SPARK-18387
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Ryan Blue
>Priority: Blocker
>
> SPARK-18368 fixes regexp_replace when it is serialized. One of the reviews 
> requested updating the tests so that all expressions that are tested using 
> checkEvaluation are first serialized. That caused several new [test 
> failures], so this issue is to add serialization to the tests ([pick commit 
> 10805059|https://github.com/apache/spark/commit/10805059]) and fix the bugs 
> serialization exposes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18387) Test that expressions can be serialized

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18387:


Assignee: Apache Spark

> Test that expressions can be serialized
> ---
>
> Key: SPARK-18387
> URL: https://issues.apache.org/jira/browse/SPARK-18387
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Ryan Blue
>Assignee: Apache Spark
>Priority: Blocker
>
> SPARK-18368 fixes regexp_replace when it is serialized. One of the reviews 
> requested updating the tests so that all expressions that are tested using 
> checkEvaluation are first serialized. That caused several new [test 
> failures], so this issue is to add serialization to the tests ([pick commit 
> 10805059|https://github.com/apache/spark/commit/10805059]) and fix the bugs 
> serialization exposes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18387) Test that expressions can be serialized

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18387:


Assignee: (was: Apache Spark)

> Test that expressions can be serialized
> ---
>
> Key: SPARK-18387
> URL: https://issues.apache.org/jira/browse/SPARK-18387
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Ryan Blue
>Priority: Blocker
>
> SPARK-18368 fixes regexp_replace when it is serialized. One of the reviews 
> requested updating the tests so that all expressions that are tested using 
> checkEvaluation are first serialized. That caused several new [test 
> failures], so this issue is to add serialization to the tests ([pick commit 
> 10805059|https://github.com/apache/spark/commit/10805059]) and fix the bugs 
> serialization exposes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18368) Regular expression replace throws NullPointerException when serialized

2016-11-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655641#comment-15655641
 ] 

Apache Spark commented on SPARK-18368:
--

User 'rdblue' has created a pull request for this issue:
https://github.com/apache/spark/pull/15847

> Regular expression replace throws NullPointerException when serialized
> --
>
> Key: SPARK-18368
> URL: https://issues.apache.org/jira/browse/SPARK-18368
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Ryan Blue
>Assignee: Ryan Blue
> Fix For: 2.0.3, 2.1.0
>
>
> This query fails with a [NullPointerException on line 
> 247|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala#L247]:
> {code}
> SELECT POSEXPLODE(SPLIT(REGEXP_REPLACE(ranks, '[\\[ \\]]', ''), ',')) AS 
> (rank, col0) FROM table;
> {code}
> The problem is that POSEXPLODE is causing the REGEXP_REPLACE to be serialized 
> after it is instantiated. The null value is a transient StringBuffer that 
> should hold the result. The fix is to make the result value lazy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18388) Running aggregation on many columns throws SOE

2016-11-10 Thread Herman van Hovell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell updated SPARK-18388:
--
Priority: Major  (was: Critical)

> Running aggregation on many columns throws SOE
> --
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
>Reporter: Raviteja Lokineni
> Attachments: spark-bug-jupyter.py, spark-bug-stacktrace.txt, 
> spark-bug.csv
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 
> 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18388) Running aggregation on many columns throws SOE

2016-11-10 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655568#comment-15655568
 ] 

Herman van Hovell commented on SPARK-18388:
---

This is caused by the {{col("dt").cast("timestamp").cast("long")}} in the 
Window clause. This creates a new column for each window function, which forces 
a sort and a project (to remove the column) for each created window function (~ 
700). We should look into deduplicating these expressions in the analyzer 
({{ExtractWindowExpressions}} rule).

It is quite easy to work around this:
{code}
from pyspark.sql.window import Window
from pyspark.sql.functions import *

timeSeries = sqlContext.read.option("header", 
"true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")

# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400

w = (Window()
   .partitionBy("id")
   .orderBy(col("ms").asc())
   .rangeBetween(-days(6), 0))

cols = ["id", "dt"]
skipCols = ["id", "dt"]

for cl in timeSeries.columns:
if cl in skipCols:
continue
cols.append(mean(cl).over(w).alias("mean_7_"+cl))
cols.append(count(cl).over(w).alias("count_7_"+cl))
cols.append(sum(cl).over(w).alias("sum_7_"+cl))
cols.append(min(cl).over(w).alias("min_7_"+cl))
cols.append(max(cl).over(w).alias("max_7_"+cl))

df = timeSeries.withColumn("ms", 
col("dt").cast("timestamp").cast("long")).select(cols)
df.orderBy('id', 
'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
{code}

> Running aggregation on many columns throws SOE
> --
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
>Reporter: Raviteja Lokineni
>Priority: Critical
> Attachments: spark-bug-jupyter.py, spark-bug-stacktrace.txt, 
> spark-bug.csv
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 
> 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8884) 1-sample Anderson-Darling Goodness-of-Fit test

2016-11-10 Thread Timothy Hunter (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655563#comment-15655563
 ] 

Timothy Hunter commented on SPARK-8884:
---

[~srowen] this ticket should still be open I believe? [~yuhaoyan] has an open 
PR for it.

> 1-sample Anderson-Darling Goodness-of-Fit test
> --
>
> Key: SPARK-8884
> URL: https://issues.apache.org/jira/browse/SPARK-8884
> Project: Spark
>  Issue Type: New Feature
>  Components: MLlib
>Reporter: Jose Cambronero
>
> We have implemented a 1-sample Anderson-Darling goodness-of-fit test to add 
> to the current hypothesis testing functionality. The current implementation 
> supports various distributions (normal, exponential, gumbel, logistic, and 
> weibull). However, users must provide distribution parameters for all except 
> normal/exponential (in which case they are estimated from the data). In 
> contrast to other tests, such as the Kolmogorov Smirnov test, we only support 
> specific distributions as the critical values depend on the distribution 
> being tested. 
> The distributed implementation of AD takes advantage of the fact that we can 
> calculate a portion of the statistic within each partition of a sorted data 
> set, independent of the global order of those observations. We can then carry 
> some additional information that allows us to adjust the final amounts once 
> we have collected 1 result per partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655492#comment-15655492
 ] 

Reynold Xin commented on SPARK-18367:
-

Can you show the list of open files? That would help with debugging.


> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('partitions:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18408) API Improvements for LSH

2016-11-10 Thread Yun Ni (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Ni updated SPARK-18408:
---
Description: 
As the first improvements to current LSH Implementations, we are planning to do 
the followings:
 - Change output schema to {{Array of Vector}} instead of {{Vectors}}
 - Use {{numHashTables}} as the dimension of {{Array}} and {{numHashFunctions}} 
as the dimension of {{Vector}}
 - Rename {{RandomProjection}} to {{BucketedRandomProjectionLSH}}, {{MinHash}} 
to {{MinHashLSH}}
 - Make randUnitVectors/randCoefficients private

  was:
As the first improvements to current LSH Implementations, we are planning to do 
the followings:
 - Change output schema to {{Array of Vector}} instead of {{Vectors}}
 - Use `numHashTables` as the dimension of {{Array}} and {{numHashFunctions}} 
as the dimension of {{Vector}}
 - Rename {{RandomProjection}} to {{BucketedRandomProjectionLSH}}, {{MinHash}} 
to {{MinHashLSH}}
 - Make randUnitVectors/randCoefficients private


> API Improvements for LSH
> 
>
> Key: SPARK-18408
> URL: https://issues.apache.org/jira/browse/SPARK-18408
> Project: Spark
>  Issue Type: Improvement
>Reporter: Yun Ni
>
> As the first improvements to current LSH Implementations, we are planning to 
> do the followings:
>  - Change output schema to {{Array of Vector}} instead of {{Vectors}}
>  - Use {{numHashTables}} as the dimension of {{Array}} and 
> {{numHashFunctions}} as the dimension of {{Vector}}
>  - Rename {{RandomProjection}} to {{BucketedRandomProjectionLSH}}, 
> {{MinHash}} to {{MinHashLSH}}
>  - Make randUnitVectors/randCoefficients private



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18408) API Improvements for LSH

2016-11-10 Thread Yun Ni (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Ni updated SPARK-18408:
---
Description: 
As the first improvements to current LSH Implementations, we are planning to do 
the followings:
 - Change output schema to {{Array of Vector}} instead of {{Vectors}}
 - Use `numHashTables` as the dimension of {{Array}} and {{numHashFunctions}} 
as the dimension of {{Vector}}
 - Rename {{RandomProjection}} to {{BucketedRandomProjectionLSH}}, {{MinHash}} 
to {{MinHashLSH}}
 - Make randUnitVectors/randCoefficients private

  was:
As the first improvements to current LSH Implementations, we are planning to do 
the followings:
 - Change output schema to `Array of Vector` instead of `Vectors`
 - Use `numHashTables` as the dimension of `Array` and `numHashFunctions` as 
the dimension of `Vector`
 - Rename `RandomProjection` to `BucketedRandomProjectionLSH`, `MinHash` to 
`MinHashLSH`
 - Make randUnitVectors/randCoefficients private


> API Improvements for LSH
> 
>
> Key: SPARK-18408
> URL: https://issues.apache.org/jira/browse/SPARK-18408
> Project: Spark
>  Issue Type: Improvement
>Reporter: Yun Ni
>
> As the first improvements to current LSH Implementations, we are planning to 
> do the followings:
>  - Change output schema to {{Array of Vector}} instead of {{Vectors}}
>  - Use `numHashTables` as the dimension of {{Array}} and {{numHashFunctions}} 
> as the dimension of {{Vector}}
>  - Rename {{RandomProjection}} to {{BucketedRandomProjectionLSH}}, 
> {{MinHash}} to {{MinHashLSH}}
>  - Make randUnitVectors/randCoefficients private



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18387) Test that expressions can be serialized

2016-11-10 Thread Ryan Blue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655454#comment-15655454
 ] 

Ryan Blue commented on SPARK-18387:
---

Yeah, I'm working on it. Thanks!

On Thu, Nov 10, 2016 at 1:37 PM, Herman van Hovell (JIRA) 




-- 
Ryan Blue


> Test that expressions can be serialized
> ---
>
> Key: SPARK-18387
> URL: https://issues.apache.org/jira/browse/SPARK-18387
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Ryan Blue
>Priority: Blocker
>
> SPARK-18368 fixes regexp_replace when it is serialized. One of the reviews 
> requested updating the tests so that all expressions that are tested using 
> checkEvaluation are first serialized. That caused several new [test 
> failures], so this issue is to add serialization to the tests ([pick commit 
> 10805059|https://github.com/apache/spark/commit/10805059]) and fix the bugs 
> serialization exposes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18408) API Improvements for LSH

2016-11-10 Thread Yun Ni (JIRA)
Yun Ni created SPARK-18408:
--

 Summary: API Improvements for LSH
 Key: SPARK-18408
 URL: https://issues.apache.org/jira/browse/SPARK-18408
 Project: Spark
  Issue Type: Improvement
Reporter: Yun Ni


As the first improvements to current LSH Implementations, we are planning to do 
the followings:
 - Change output schema to `Array of Vector` instead of `Vectors`
 - Use `numHashTables` as the dimension of `Array` and `numHashFunctions` as 
the dimension of `Vector`
 - Rename `RandomProjection` to `BucketedRandomProjectionLSH`, `MinHash` to 
`MinHashLSH`
 - Make randUnitVectors/randCoefficients private



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-
Description: 
I have a moderately complex DataFrame query that spawns north of 10K open 
files, causing my job to crash. 10K is the macOS limit on how many files a 
single process can have open at once. It seems unreasonable that Spark should 
hold that many files open at once.

I was able to boil down what I'm seeing to the following minimal reproduction:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help

print('partitions:', df.rdd.getNumPartitions())
df.explain()
df.show(1)
{code}

When I run this code, Spark spawns over 2K open files. I can "fix" the problem 
by adding a {{coalesce(1)}} in the right place, as indicated in the comments 
above. When I do, Spark spawns no more than 600 open files. The number of 
partitions without the coalesce is 200.

Here are the execution plans with and without the coalesce.

2K+ open files, without the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
{code}

<600 open files, with the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#4L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Coalesce 1
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#4L ASC NULLS FIRST], false, 0
  +- Coalesce 1
 +- *Filter isnotnull(a#4L)
+- Scan ExistingRDD[a#4L]
{code}

So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
200)}} operator.

Is the large number of open files perhaps expected given the join on a large 
number of distinct keys? If so, how would one mitigate that issue? If not, is 
this a bug in Spark?

  was:
I have a moderately complex DataFrame query that spawns north of 10K open 
files, causing my job to crash. 10K is the macOS limit on how many files a 
single process can have open at once. It seems unreasonable that Spark should 
hold that many files open at once.

I was able to boil down what I'm seeing to the following minimal reproduction:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help

print('parititons:', df.rdd.getNumPartitions())
df.explain()
df.show(1)
{code}

When I run this code, Spark spawns over 2K open files. I can "fix" the problem 
by adding a {{coalesce(1)}} in the right place, as indicated in the comments 
above. When I do, Spark spawns no more than 600 open files. The number of 
partitions without the coalesce is 200.

Here are the execution plans with and without the coalesce.

2K+ open files, without the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
{code}

<600 open files, with the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#4L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Coalesce 1
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#4L ASC NULLS FIRST], false, 0
  +- Coalesce 1
 +- *Filter isnotnull(a#4L)
+- Scan ExistingRDD[a#4L]
{code}

So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
200)}} operator.

Is the large number of open files perhaps expected given the join on a large 
number of distinct keys? If so, how would one mitigate that issue? If not, is 
this a bug in Spark?


> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: 

[jira] [Commented] (SPARK-18388) Running aggregation on many columns throws SOE

2016-11-10 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655379#comment-15655379
 ] 

Herman van Hovell commented on SPARK-18388:
---

Ok, thanks. I'll take a look.

> Running aggregation on many columns throws SOE
> --
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
>Reporter: Raviteja Lokineni
>Priority: Critical
> Attachments: spark-bug-jupyter.py, spark-bug-stacktrace.txt, 
> spark-bug.csv
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 
> 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17993) Spark prints an avalanche of warning messages from Parquet when reading parquet files written by older versions of Parquet-mr

2016-11-10 Thread Michael Allman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655355#comment-15655355
 ] 

Michael Allman commented on SPARK-17993:


This patch will be part of Spark 2.1, but it looks like it won't make it into 
2.0.2. If you'd like help backporting this patch to 2.0, mail me privately and 
I can send you a patch.

> Spark prints an avalanche of warning messages from Parquet when reading 
> parquet files written by older versions of Parquet-mr
> -
>
> Key: SPARK-17993
> URL: https://issues.apache.org/jira/browse/SPARK-17993
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Michael Allman
>Assignee: Michael Allman
> Fix For: 2.1.0
>
>
> It looks like https://github.com/apache/spark/pull/14690 broke parquet log 
> output redirection. After that patch, when querying parquet files written by 
> Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from 
> the Parquet reader:
> {code}
> Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: 
> Ignoring statistics because created_by could not be parsed (see PARQUET-251): 
> parquet-mr version 1.6.0
> org.apache.parquet.VersionParser$VersionParseException: Could not parse 
> created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) 
> )?\(build ?(.*)\)
>   at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
>   at 
> org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
>   at 
> org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> This only happens during execution, not planning, and it doesn't matter what 
> log level the {{SparkContext}} is set to.
> This is a regression I noted as something we needed to fix as a follow up to 
> PR 14690. I feel responsible, so I'm going to expedite a fix for it. I 
> suspect that PR broke Spark's Parquet log output redirection. That's the 
> premise I'm going by.



--
This message was sent by 

[jira] [Updated] (SPARK-18407) Inferred partition columns cause assertion error

2016-11-10 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18407:
-
Description: 
[This 
assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
 fails when you run a stream against json data that is stored in partitioned 
folders, if you manually specify the schema and that schema omits the 
partitioned columns.

My hunch is that we are inferring those columns even though the schema is being 
passed in manually and adding them to the end.

While we are fixing this bug, it would be nice to make the assertion better.  
Truncating is not terribly useful as, at least in my case, it truncated the 
most interesting part.  I changed it to this while debugging:

{code}
  s"""
 |Batch does not have expected schema
 |Expected: ${output.mkString(",")}
 |Actual: ${newPlan.output.mkString(",")}
 |
 |== Original ==
 |$logicalPlan
 |
 |== Batch ==
 |$newPlan
   """.stripMargin
{code}

I also tried specifying the partition columns in the schema and now it appears 
that they are filled with corrupted data.

  was:
[This 
assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
 fails when you run a stream against json data that is stored in partitioned 
folders, if you manually specify the schema and that schema omits the 
partitioned columns.

My hunch is that we are inferring those columns even though the schema is being 
passed in manually and adding them to the end.

While we are fixing this bug, it would be nice to make the assertion better.  
Truncating is not terribly useful as, at least in my case, it truncated the 
most interesting part.  I changed it to this while debugging:

{code}
  s"""
 |Batch does not have expected schema
 |Expected: ${output.mkString(",")}
 |Actual: ${newPlan.output.mkString(",")}
 |
 |== Original ==
 |$logicalPlan
 |
 |== Batch ==
 |$newPlan
   """.stripMargin
{code}


> Inferred partition columns cause assertion error
> 
>
> Key: SPARK-18407
> URL: https://issues.apache.org/jira/browse/SPARK-18407
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.2
>Reporter: Michael Armbrust
>Priority: Critical
>
> [This 
> assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
>  fails when you run a stream against json data that is stored in partitioned 
> folders, if you manually specify the schema and that schema omits the 
> partitioned columns.
> My hunch is that we are inferring those columns even though the schema is 
> being passed in manually and adding them to the end.
> While we are fixing this bug, it would be nice to make the assertion better.  
> Truncating is not terribly useful as, at least in my case, it truncated the 
> most interesting part.  I changed it to this while debugging:
> {code}
>   s"""
>  |Batch does not have expected schema
>  |Expected: ${output.mkString(",")}
>  |Actual: ${newPlan.output.mkString(",")}
>  |
>  |== Original ==
>  |$logicalPlan
>  |
>  |== Batch ==
>  |$newPlan
>""".stripMargin
> {code}
> I also tried specifying the partition columns in the schema and now it 
> appears that they are filled with corrupted data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18388) Running aggregation on many columns throws SOE

2016-11-10 Thread Raviteja Lokineni (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655279#comment-15655279
 ] 

Raviteja Lokineni edited comment on SPARK-18388 at 11/10/16 9:57 PM:
-

[~hvanhovell], Tried but still fails with the same exception

Steps that I did:
* Download latest: 
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
* Ran: mvn package
* /sbin/start-master.sh
* /bin/start-submit.sh 
* Same stack trace


was (Author: bond_):
[~hvanhovell], Nope still fails with the same exception

Steps that I did:
* Download latest: 
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
* Ran: mvn package
* /sbin/start-master.sh
* /bin/start-submit.sh 
* Same stack trace

> Running aggregation on many columns throws SOE
> --
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
>Reporter: Raviteja Lokineni
>Priority: Critical
> Attachments: spark-bug-jupyter.py, spark-bug-stacktrace.txt, 
> spark-bug.csv
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 
> 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18388) Running aggregation on many columns throws SOE

2016-11-10 Thread Raviteja Lokineni (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655279#comment-15655279
 ] 

Raviteja Lokineni edited comment on SPARK-18388 at 11/10/16 9:57 PM:
-

[~hvanhovell], Tried but still fails with the same exception

Steps that I did:
* Download latest: 
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
* Ran: mvn package
* /sbin/start-master.sh
* /bin/start-submit.sh 
* Same stack trace


was (Author: bond_):
[~hvanhovell], Tried but still fails with the same exception

Steps that I did:
* Download latest: 
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
* Ran: mvn package
* /sbin/start-master.sh
* /bin/start-submit.sh 
* Same stack trace

> Running aggregation on many columns throws SOE
> --
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
>Reporter: Raviteja Lokineni
>Priority: Critical
> Attachments: spark-bug-jupyter.py, spark-bug-stacktrace.txt, 
> spark-bug.csv
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 
> 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18388) Running aggregation on many columns throws SOE

2016-11-10 Thread Raviteja Lokineni (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655279#comment-15655279
 ] 

Raviteja Lokineni commented on SPARK-18388:
---

[~hvanhovell], Nope still fails with the same exception

Steps that I did:
* Download latest: 
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
* Ran: mvn package
* /sbin/start-master.sh
* /bin/start-submit.sh 
* Same stack trace

> Running aggregation on many columns throws SOE
> --
>
> Key: SPARK-18388
> URL: https://issues.apache.org/jira/browse/SPARK-18388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.2, 2.0.1
> Environment: PySpark 2.0.1, Jupyter
>Reporter: Raviteja Lokineni
>Priority: Critical
> Attachments: spark-bug-jupyter.py, spark-bug-stacktrace.txt, 
> spark-bug.csv
>
>
> Usecase: I am generating weekly aggregates of every column of data
> {code}
> from pyspark.sql.window import Window
> from pyspark.sql.functions import *
> timeSeries = sqlContext.read.option("header", 
> "true").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").load("file:///tmp/spark-bug.csv")
> # Hive timestamp is interpreted as UNIX timestamp in seconds*
> days = lambda i: i * 86400
> w = (Window()
>  .partitionBy("id")
>  .orderBy(col("dt").cast("timestamp").cast("long"))
>  .rangeBetween(-days(6), 0))
> cols = ["id", "dt"]
> skipCols = ["id", "dt"]
> for col in timeSeries.columns:
> if col in skipCols:
> continue
> cols.append(mean(col).over(w).alias("mean_7_"+col))
> cols.append(count(col).over(w).alias("count_7_"+col))
> cols.append(sum(col).over(w).alias("sum_7_"+col))
> cols.append(min(col).over(w).alias("min_7_"+col))
> cols.append(max(col).over(w).alias("max_7_"+col))
> df = timeSeries.select(cols)
> df.orderBy('id', 
> 'dt').write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").save("file:///tmp/spark-bug-out.csv")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18364) expose metrics for YarnShuffleService

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-18364:

Target Version/s:   (was: 2.1.0)

> expose metrics for YarnShuffleService
> -
>
> Key: SPARK-18364
> URL: https://issues.apache.org/jira/browse/SPARK-18364
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.0.1
>Reporter: Steven Rand
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> ExternalShuffleService exposes metrics as of SPARK-16405. However, 
> YarnShuffleService does not.
> The work of instrumenting ExternalShuffleBlockHandler was already done in 
> SPARK-1645, so this JIRA is for creating a MetricsSystem in 
> YarnShuffleService similarly to how ExternalShuffleService already does it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18403) ObjectHashAggregateSuite is being flaky (occasional OOM errors)

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-18403:

Fix Version/s: (was: 2.1.0)
   2.2.0

> ObjectHashAggregateSuite is being flaky (occasional OOM errors)
> ---
>
> Key: SPARK-18403
> URL: https://issues.apache.org/jira/browse/SPARK-18403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
> Fix For: 2.2.0
>
>
> This test suite fails occasionally on Jenkins due to OOM errors. I've already 
> reproduced it locally but haven't figured out the root cause.
> We should probably disable it temporarily before getting it fixed so that it 
> doesn't break the PR build too often.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18403) ObjectHashAggregateSuite is being flaky (occasional OOM errors)

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin resolved SPARK-18403.
-
   Resolution: Fixed
Fix Version/s: 2.1.0

Please make sure we enable it.


> ObjectHashAggregateSuite is being flaky (occasional OOM errors)
> ---
>
> Key: SPARK-18403
> URL: https://issues.apache.org/jira/browse/SPARK-18403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
> Fix For: 2.1.0
>
>
> This test suite fails occasionally on Jenkins due to OOM errors. I've already 
> reproduced it locally but haven't figured out the root cause.
> We should probably disable it temporarily before getting it fixed so that it 
> doesn't break the PR build too often.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18302) correct several partition related behaviours of ExternalCatalog

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin resolved SPARK-18302.
-
   Resolution: Fixed
Fix Version/s: 2.1.0

> correct several partition related behaviours of ExternalCatalog
> ---
>
> Key: SPARK-18302
> URL: https://issues.apache.org/jira/browse/SPARK-18302
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17990) ALTER TABLE ... ADD PARTITION does not play nice with mixed-case partition column names

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin resolved SPARK-17990.
-
   Resolution: Fixed
 Assignee: Wenchen Fan
Fix Version/s: 2.1.0

> ALTER TABLE ... ADD PARTITION does not play nice with mixed-case partition 
> column names
> ---
>
> Key: SPARK-17990
> URL: https://issues.apache.org/jira/browse/SPARK-17990
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: Linux
> Mac OS with a case-sensitive filesystem
>Reporter: Michael Allman
>Assignee: Wenchen Fan
> Fix For: 2.1.0
>
>
> Writing partition data to an external table's file location and then adding 
> those as table partition metadata is a common use case. However, for tables 
> with partition column names with upper case letters, the SQL command {{ALTER 
> TABLE ... ADD PARTITION}} does not work, as illustrated in the following 
> example:
> {code}
> scala> sql("create external table mixed_case_partitioning (a bigint) 
> PARTITIONED BY (partCol bigint) STORED AS parquet LOCATION 
> '/tmp/mixed_case_partitioning'")
> res0: org.apache.spark.sql.DataFrame = []
> scala> spark.sqlContext.range(10).selectExpr("id as a", "id as 
> partCol").write.partitionBy("partCol").mode("overwrite").parquet("/tmp/mixed_case_partitioning")
> {code}
> At this point, doing a {{hadoop fs -ls /tmp/mixed_case_partitioning}} 
> produces the following:
> {code}
> [msa@jupyter ~]$ hadoop fs -ls /tmp/mixed_case_partitioning
> Found 11 items
> -rw-r--r--   3 msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/_SUCCESS
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=0
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=1
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=2
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=3
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=4
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=5
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=6
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=7
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=8
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=9
> {code}
> Returning to the Spark shell, we execute the following to add the partition 
> metadata:
> {code}
> scala> (0 to 9).foreach { p => sql(s"alter table mixed_case_partitioning add 
> partition(partCol=$p)") }
> {code}
> Examining the HDFS file listing again, we see:
> {code}
> [msa@jupyter ~]$ hadoop fs -ls /tmp/mixed_case_partitioning
> Found 21 items
> -rw-r--r--   3 msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/_SUCCESS
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=0
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=1
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=2
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=3
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=4
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=5
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=6
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=7
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=8
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:52 
> /tmp/mixed_case_partitioning/partCol=9
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:53 
> /tmp/mixed_case_partitioning/partcol=0
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:53 
> /tmp/mixed_case_partitioning/partcol=1
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:53 
> /tmp/mixed_case_partitioning/partcol=2
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:53 
> /tmp/mixed_case_partitioning/partcol=3
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:53 
> /tmp/mixed_case_partitioning/partcol=4
> drwxr-xr-x   - msa supergroup  0 2016-10-18 17:53 
> /tmp/mixed_case_partitioning/partcol=5
> drwxr-xr-x   - msa supergroup  

[jira] [Resolved] (SPARK-17993) Spark prints an avalanche of warning messages from Parquet when reading parquet files written by older versions of Parquet-mr

2016-11-10 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin resolved SPARK-17993.
-
   Resolution: Fixed
 Assignee: Michael Allman
Fix Version/s: 2.1.0

> Spark prints an avalanche of warning messages from Parquet when reading 
> parquet files written by older versions of Parquet-mr
> -
>
> Key: SPARK-17993
> URL: https://issues.apache.org/jira/browse/SPARK-17993
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Michael Allman
>Assignee: Michael Allman
> Fix For: 2.1.0
>
>
> It looks like https://github.com/apache/spark/pull/14690 broke parquet log 
> output redirection. After that patch, when querying parquet files written by 
> Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from 
> the Parquet reader:
> {code}
> Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: 
> Ignoring statistics because created_by could not be parsed (see PARQUET-251): 
> parquet-mr version 1.6.0
> org.apache.parquet.VersionParser$VersionParseException: Could not parse 
> created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) 
> )?\(build ?(.*)\)
>   at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
>   at 
> org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
>   at 
> org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> This only happens during execution, not planning, and it doesn't matter what 
> log level the {{SparkContext}} is set to.
> This is a regression I noted as something we needed to fix as a follow up to 
> PR 14690. I feel responsible, so I'm going to expedite a fix for it. I 
> suspect that PR broke Spark's Parquet log output redirection. That's the 
> premise I'm going by.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: 

[jira] [Commented] (SPARK-18387) Test that expressions can be serialized

2016-11-10 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655224#comment-15655224
 ] 

Herman van Hovell commented on SPARK-18387:
---

[~rdblue] Are you working on this one? I'd be happy to pick this one up.

> Test that expressions can be serialized
> ---
>
> Key: SPARK-18387
> URL: https://issues.apache.org/jira/browse/SPARK-18387
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Ryan Blue
>Priority: Blocker
>
> SPARK-18368 fixes regexp_replace when it is serialized. One of the reviews 
> requested updating the tests so that all expressions that are tested using 
> checkEvaluation are first serialized. That caused several new [test 
> failures], so this issue is to add serialization to the tests ([pick commit 
> 10805059|https://github.com/apache/spark/commit/10805059]) and fix the bugs 
> serialization exposes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18407) Inferred partition columns cause assertion error

2016-11-10 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-18407:


 Summary: Inferred partition columns cause assertion error
 Key: SPARK-18407
 URL: https://issues.apache.org/jira/browse/SPARK-18407
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.0.2
Reporter: Michael Armbrust
Priority: Critical


[This 
assertion|https://github.com/apache/spark/blob/16eaad9daed0b633e6a714b5704509aa7107d6e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L408]
 fails when you run a stream against json data that is stored in partitioned 
folders, if you manually specify the schema and that schema omits the 
partitioned columns.

My hunch is that we are inferring those columns even though the schema is being 
passed in manually and adding them to the end.

While we are fixing this bug, it would be nice to make the assertion better.  
Truncating is not terribly useful as, at least in my case, it truncated the 
most interesting part.  I changed it to this while debugging:

{code}
  s"""
 |Batch does not have expected schema
 |Expected: ${output.mkString(",")}
 |Actual: ${newPlan.output.mkString(",")}
 |
 |== Original ==
 |$logicalPlan
 |
 |== Batch ==
 |$newPlan
   """.stripMargin
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18406) Race between end-of-task and completion iterator read lock release

2016-11-10 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-18406:
---
Description: 
The following log comes from a production streaming job where executors 
periodically die due to uncaught exceptions during block release:


{code}
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7921
16/11/07 17:11:06 INFO Executor: Running task 0.0 in stage 2390.0 (TID 7921)
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7922
16/11/07 17:11:06 INFO Executor: Running task 1.0 in stage 2390.0 (TID 7922)
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7923
16/11/07 17:11:06 INFO Executor: Running task 2.0 in stage 2390.0 (TID 7923)
16/11/07 17:11:06 INFO TorrentBroadcast: Started reading broadcast variable 2721
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7924
16/11/07 17:11:06 INFO Executor: Running task 3.0 in stage 2390.0 (TID 7924)
16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721_piece0 stored as bytes 
in memory (estimated size 5.0 KB, free 4.9 GB)
16/11/07 17:11:06 INFO TorrentBroadcast: Reading broadcast variable 2721 took 3 
ms
16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721 stored as values in 
memory (estimated size 9.4 KB, free 4.9 GB)
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_3 locally
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_2 locally
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_4 locally
16/11/07 17:11:06 INFO PythonRunner: Times: total = 2, boot = -566, init = 567, 
finish = 1
16/11/07 17:11:06 INFO PythonRunner: Times: total = 7, boot = -540, init = 541, 
finish = 6
16/11/07 17:11:06 INFO Executor: Finished task 2.0 in stage 2390.0 (TID 7923). 
1429 bytes result sent to driver
16/11/07 17:11:06 INFO PythonRunner: Times: total = 8, boot = -532, init = 533, 
finish = 7
16/11/07 17:11:06 INFO Executor: Finished task 3.0 in stage 2390.0 (TID 7924). 
1429 bytes result sent to driver
16/11/07 17:11:06 ERROR Executor: Exception in task 0.0 in stage 2390.0 (TID 
7921)
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at 
org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
at 
org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
at 
org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7925
16/11/07 17:11:06 INFO Executor: Running task 0.1 in stage 2390.0 (TID 7925)
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
16/11/07 17:11:06 INFO PythonRunner: Times: total = 41, boot = -536, init = 
576, finish = 1
16/11/07 17:11:06 INFO Executor: Finished task 1.0 in stage 2390.0 (TID 7922). 
1429 bytes result sent to driver
16/11/07 17:11:06 ERROR Utils: Uncaught exception in thread stdout writer for 
/databricks/python/bin/python
java.lang.AssertionError: assertion failed: Block rdd_2741_1 is not locked for 
reading
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294)
at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:630)
at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:434)
at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 

[jira] [Updated] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-
Description: 
I have a moderately complex DataFrame query that spawns north of 10K open 
files, causing my job to crash. 10K is the macOS limit on how many files a 
single process can have open at once. It seems unreasonable that Spark should 
hold that many files open at once.

I was able to boil down what I'm seeing to the following minimal reproduction:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
])  #.coalesce(1)  # a coalesce(1) here "fixes" the problem
df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help

print('parititons:', df.rdd.getNumPartitions())
df.explain()
df.show(1)
{code}

When I run this code, Spark spawns over 2K open files. I can "fix" the problem 
by adding a {{coalesce(1)}} in the right place, as indicated in the comments 
above. When I do, Spark spawns no more than 600 open files. The number of 
partitions without the coalesce is 200.

Here are the execution plans with and without the coalesce.

2K+ open files, without the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
{code}

<600 open files, with the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#4L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Coalesce 1
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#4L ASC NULLS FIRST], false, 0
  +- Coalesce 1
 +- *Filter isnotnull(a#4L)
+- Scan ExistingRDD[a#4L]
{code}

So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
200)}} operator.

Is the large number of open files perhaps expected given the join on a large 
number of distinct keys? If so, how would one mitigate that issue? If not, is 
this a bug in Spark?

  was:
I have a moderately complex DataFrame query that spawns north of 10K open 
files, causing my job to crash. 10K is the macOS limit on how many files a 
single process can have open at once. It seems unreasonable that Spark should 
hold that many files open at once.

I was able to boil down what I'm seeing to the following minimal reproduction:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
]).coalesce(1)  # a coalesce(1) here "fixes" the problem
df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help

print('parititons:', df.rdd.getNumPartitions())
df.explain()
df.show(1)
{code}

When I run this code, Spark spawns over 2K open files. I can "fix" the problem 
by adding a {{coalesce(1)}} in the right place, as indicated in the comments 
above. When I do, Spark spawns no more than 600 open files. The number of 
partitions without the coalesce is 200.

Here are the execution plans with and without the coalesce.

2K+ open files, without the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
{code}

<600 open files, with the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#4L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Coalesce 1
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#4L ASC NULLS FIRST], false, 0
  +- Coalesce 1
 +- *Filter isnotnull(a#4L)
+- Scan ExistingRDD[a#4L]
{code}

So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
200)}} operator.

Is the large number of open files perhaps expected given the join on a large 
number of distinct keys? If so, how would one mitigate that issue? If not, is 
this a bug in Spark?


> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: 

[jira] [Created] (SPARK-18406) Race between end-of-task and completion iterator read lock release

2016-11-10 Thread Josh Rosen (JIRA)
Josh Rosen created SPARK-18406:
--

 Summary: Race between end-of-task and completion iterator read 
lock release
 Key: SPARK-18406
 URL: https://issues.apache.org/jira/browse/SPARK-18406
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Spark Core
Affects Versions: 2.0.1, 2.0.0
Reporter: Josh Rosen


{code}
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7921
16/11/07 17:11:06 INFO Executor: Running task 0.0 in stage 2390.0 (TID 7921)
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7922
16/11/07 17:11:06 INFO Executor: Running task 1.0 in stage 2390.0 (TID 7922)
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7923
16/11/07 17:11:06 INFO Executor: Running task 2.0 in stage 2390.0 (TID 7923)
16/11/07 17:11:06 INFO TorrentBroadcast: Started reading broadcast variable 2721
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7924
16/11/07 17:11:06 INFO Executor: Running task 3.0 in stage 2390.0 (TID 7924)
16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721_piece0 stored as bytes 
in memory (estimated size 5.0 KB, free 4.9 GB)
16/11/07 17:11:06 INFO TorrentBroadcast: Reading broadcast variable 2721 took 3 
ms
16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721 stored as values in 
memory (estimated size 9.4 KB, free 4.9 GB)
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_3 locally
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_2 locally
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_4 locally
16/11/07 17:11:06 INFO PythonRunner: Times: total = 2, boot = -566, init = 567, 
finish = 1
16/11/07 17:11:06 INFO PythonRunner: Times: total = 7, boot = -540, init = 541, 
finish = 6
16/11/07 17:11:06 INFO Executor: Finished task 2.0 in stage 2390.0 (TID 7923). 
1429 bytes result sent to driver
16/11/07 17:11:06 INFO PythonRunner: Times: total = 8, boot = -532, init = 533, 
finish = 7
16/11/07 17:11:06 INFO Executor: Finished task 3.0 in stage 2390.0 (TID 7924). 
1429 bytes result sent to driver
16/11/07 17:11:06 ERROR Executor: Exception in task 0.0 in stage 2390.0 (TID 
7921)
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at 
org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
at 
org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
at 
org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
at 
org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7925
16/11/07 17:11:06 INFO Executor: Running task 0.1 in stage 2390.0 (TID 7925)
16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
16/11/07 17:11:06 INFO PythonRunner: Times: total = 41, boot = -536, init = 
576, finish = 1
16/11/07 17:11:06 INFO Executor: Finished task 1.0 in stage 2390.0 (TID 7922). 
1429 bytes result sent to driver
16/11/07 17:11:06 ERROR Utils: Uncaught exception in thread stdout writer for 
/databricks/python/bin/python
java.lang.AssertionError: assertion failed: Block rdd_2741_1 is not locked for 
reading
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294)
at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:630)
at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:434)
at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at 

[jira] [Created] (SPARK-18405) Add yarn-cluster mode support to Spark Thrift Server

2016-11-10 Thread Prabhu Kasinathan (JIRA)
Prabhu Kasinathan created SPARK-18405:
-

 Summary: Add yarn-cluster mode support to Spark Thrift Server
 Key: SPARK-18405
 URL: https://issues.apache.org/jira/browse/SPARK-18405
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.0.1, 2.0.0, 1.6.2, 1.6.0
Reporter: Prabhu Kasinathan


Currently, spark thrift server can run only on yarn-client mode. 

Can we add Yarn-Cluster mode support to spark thrift server?

This will help us to launch multiple spark thrift server with different spark 
configurations and it really help in large distributed clusters where there is 
requirement to run complex sqls through STS. With client mode, there is a 
chance to overload local host with too much driver memory. 

Please let me know your thoughts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18404) RPC call from executor to driver blocks when getting map output locations (Netty Only)

2016-11-10 Thread Jeffrey Shmain (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655101#comment-15655101
 ] 

Jeffrey Shmain commented on SPARK-18404:


Switching back to Akka, by setting spark.rpc=akka, makes the issue go away.

> RPC call from executor to driver blocks when getting map output locations 
> (Netty Only)
> --
>
> Key: SPARK-18404
> URL: https://issues.apache.org/jira/browse/SPARK-18404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Jeffrey Shmain
>
> Compared identical application run on Spark 1.5 and Spark 1.6.  Noticed that 
> jobs became slower. After looking at it closer, found that 75% of tasks 
> finished same or above, and 25% had significant delays (unrelated to data 
> skew and GC)
> After more debugging noticed that the executors are blocking for few seconds 
> (sometimes 25) on this call:
> https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L199
>logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
> // This try-finally prevents hangs due to timeouts:
> try {
>   val fetchedBytes = 
> askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
>   fetchedStatuses = 
> MapOutputTracker.deserializeMapStatuses(fetchedBytes)
>   logInfo("Got the output locations")
> So the regression seems to be related changing the default from from Akka to 
> Netty.  
> This was an application working with RDDs, submitting 10 concurrent queries 
> at a time.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18404) RPC call from executor to driver blocks when getting map output locations (Netty Only)

2016-11-10 Thread Jeffrey Shmain (JIRA)
Jeffrey Shmain created SPARK-18404:
--

 Summary: RPC call from executor to driver blocks when getting map 
output locations (Netty Only)
 Key: SPARK-18404
 URL: https://issues.apache.org/jira/browse/SPARK-18404
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.6.0
Reporter: Jeffrey Shmain


Compared identical application run on Spark 1.5 and Spark 1.6.  Noticed that 
jobs became slower. After looking at it closer, found that 75% of tasks 
finished same or above, and 25% had significant delays (unrelated to data skew 
and GC)

After more debugging noticed that the executors are blocking for few seconds 
(sometimes 25) on this call:

https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L199

   logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
  val fetchedBytes = 
askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
  fetchedStatuses = 
MapOutputTracker.deserializeMapStatuses(fetchedBytes)
  logInfo("Got the output locations")

So the regression seems to be related changing the default from from Akka to 
Netty.  

This was an application working with RDDs, submitting 10 concurrent queries at 
a time.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655091#comment-15655091
 ] 

Nicholas Chammas commented on SPARK-18367:
--

I've updated the issue description with a minimal repro and a better 
understanding of what is going on. This may well be expected Spark behavior, 
but I'm not sure.

If you want, I can share the script I am using to track the number of files 
that Spark is holding open, so you can verify my numbers.

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([
> Row(a=n)
> for n in range(50)
> ]).coalesce(1)  # a coalesce(1) here "fixes" the problem
> df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
> print('parititons:', df.rdd.getNumPartitions())
> df.explain()
> df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#0L, 200)
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#3L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#3L, 200)
>  +- *Filter isnotnull(a#3L)
> +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>:- *Sort [a#0L ASC NULLS FIRST], false, 0
>:  +- Coalesce 1
>: +- *Filter isnotnull(a#0L)
>:+- Scan ExistingRDD[a#0L]
>+- *Sort [a#4L ASC NULLS FIRST], false, 0
>   +- Coalesce 1
>  +- *Filter isnotnull(a#4L)
> +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-
Description: 
I have a moderately complex DataFrame query that spawns north of 10K open 
files, causing my job to crash. 10K is the macOS limit on how many files a 
single process can have open at once. It seems unreasonable that Spark should 
hold that many files open at once.

I was able to boil down what I'm seeing to the following minimal reproduction:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
Row(a=n)
for n in range(50)
]).coalesce(1)  # a coalesce(1) here "fixes" the problem
df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help

print('parititons:', df.rdd.getNumPartitions())
df.explain()
df.show(1)
{code}

When I run this code, Spark spawns over 2K open files. I can "fix" the problem 
by adding a {{coalesce(1)}} in the right place, as indicated in the comments 
above. When I do, Spark spawns no more than 600 open files. The number of 
partitions without the coalesce is 200.

Here are the execution plans with and without the coalesce.

2K+ open files, without the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(a#3L, 200)
 +- *Filter isnotnull(a#3L)
+- Scan ExistingRDD[a#3L]
{code}

<600 open files, with the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#4L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Coalesce 1
   : +- *Filter isnotnull(a#0L)
   :+- Scan ExistingRDD[a#0L]
   +- *Sort [a#4L ASC NULLS FIRST], false, 0
  +- Coalesce 1
 +- *Filter isnotnull(a#4L)
+- Scan ExistingRDD[a#4L]
{code}

So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
200)}} operator.

Is the large number of open files perhaps expected given the join on a large 
number of distinct keys? If so, how would one mitigate that issue? If not, is 
this a bug in Spark?

  was:
I have a complex DataFrame query that fails to run normally but succeeds if I 
add a dummy {{limit()}} upstream in the query tree.

The failure presents itself like this:

{code}
ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes 
to file 
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
java.io.FileNotFoundException: 
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
 (Too many open files in system)
{code}

My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
macOS. However, I don't think that's the issue, since if I add a dummy 
{{limit()}} early on the query tree -- dummy as in it does not actually reduce 
the number of rows queried -- then the same query works.

I've diffed the physical query plans to see what this {{limit()}} is actually 
doing, and the diff is as follows:

{code}
diff plan-with-limit.txt plan-without-limit.txt
24,28c24
<:  : : +- *GlobalLimit 100
<:  : :+- Exchange SinglePartition
<:  : :   +- *LocalLimit 100
<:  : :  +- *Project [...]
<:  : : +- *Scan orc [...] 
Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<...
---
>:  : : +- *Scan orc [...] Format: ORC, 
> InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<...
49,53c45
<   : : +- *GlobalLimit 100
<   : :+- Exchange SinglePartition
<   : :   +- *LocalLimit 100
<   : :  +- *Project [...]
<   : : +- *Scan orc [...] 
Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<...
---
>   : : +- *Scan orc [] Format: ORC, 
> InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<...
{code}

Does this give any clues as to why this {{limit()}} is helping? Again, the 
100 limit 

[jira] [Assigned] (SPARK-18403) ObjectHashAggregateSuite is being flaky (occasional OOM errors)

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18403:


Assignee: Cheng Lian  (was: Apache Spark)

> ObjectHashAggregateSuite is being flaky (occasional OOM errors)
> ---
>
> Key: SPARK-18403
> URL: https://issues.apache.org/jira/browse/SPARK-18403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
>
> This test suite fails occasionally on Jenkins due to OOM errors. I've already 
> reproduced it locally but haven't figured out the root cause.
> We should probably disable it temporarily before getting it fixed so that it 
> doesn't break the PR build too often.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18403) ObjectHashAggregateSuite is being flaky (occasional OOM errors)

2016-11-10 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655058#comment-15655058
 ] 

Apache Spark commented on SPARK-18403:
--

User 'liancheng' has created a pull request for this issue:
https://github.com/apache/spark/pull/15845

> ObjectHashAggregateSuite is being flaky (occasional OOM errors)
> ---
>
> Key: SPARK-18403
> URL: https://issues.apache.org/jira/browse/SPARK-18403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
>
> This test suite fails occasionally on Jenkins due to OOM errors. I've already 
> reproduced it locally but haven't figured out the root cause.
> We should probably disable it temporarily before getting it fixed so that it 
> doesn't break the PR build too often.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18403) ObjectHashAggregateSuite is being flaky (occasional OOM errors)

2016-11-10 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-18403:


Assignee: Apache Spark  (was: Cheng Lian)

> ObjectHashAggregateSuite is being flaky (occasional OOM errors)
> ---
>
> Key: SPARK-18403
> URL: https://issues.apache.org/jira/browse/SPARK-18403
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Cheng Lian
>Assignee: Apache Spark
>
> This test suite fails occasionally on Jenkins due to OOM errors. I've already 
> reproduced it locally but haven't figured out the root cause.
> We should probably disable it temporarily before getting it fixed so that it 
> doesn't break the PR build too often.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-
Attachment: (was: plan-with-limit.txt)

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>
> I have a complex DataFrame query that fails to run normally but succeeds if I 
> add a dummy {{limit()}} upstream in the query tree.
> The failure presents itself like this:
> {code}
> ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial 
> writes to file 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
> java.io.FileNotFoundException: 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
>  (Too many open files in system)
> {code}
> My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
> macOS. However, I don't think that's the issue, since if I add a dummy 
> {{limit()}} early on the query tree -- dummy as in it does not actually 
> reduce the number of rows queried -- then the same query works.
> I've diffed the physical query plans to see what this {{limit()}} is actually 
> doing, and the diff is as follows:
> {code}
> diff plan-with-limit.txt plan-without-limit.txt
> 24,28c24
> <:  : : +- *GlobalLimit 100
> <:  : :+- Exchange SinglePartition
> <:  : :   +- *LocalLimit 100
> <:  : :  +- *Project [...]
> <:  : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >:  : : +- *Scan orc [...] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> 49,53c45
> <   : : +- *GlobalLimit 100
> <   : :+- Exchange SinglePartition
> <   : :   +- *LocalLimit 100
> <   : :  +- *Project [...]
> <   : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >   : : +- *Scan orc [] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> {code}
> Does this give any clues as to why this {{limit()}} is helping? Again, the 
> 100 limit you can see in the plan is much higher than the cardinality of 
> the dataset I'm reading, so there is no theoretical impact on the output. You 
> can see the full query plans attached to this ticket.
> Unfortunately, I don't have a minimal reproduction for this issue, but I can 
> work towards one with some clues.
> I'm seeing this behavior on 2.0.1 and on master at commit 
> {{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655036#comment-15655036
 ] 

Herman van Hovell commented on SPARK-18367:
---

Ok, I am very curious to know what is going on.

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>
> I have a complex DataFrame query that fails to run normally but succeeds if I 
> add a dummy {{limit()}} upstream in the query tree.
> The failure presents itself like this:
> {code}
> ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial 
> writes to file 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
> java.io.FileNotFoundException: 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
>  (Too many open files in system)
> {code}
> My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
> macOS. However, I don't think that's the issue, since if I add a dummy 
> {{limit()}} early on the query tree -- dummy as in it does not actually 
> reduce the number of rows queried -- then the same query works.
> I've diffed the physical query plans to see what this {{limit()}} is actually 
> doing, and the diff is as follows:
> {code}
> diff plan-with-limit.txt plan-without-limit.txt
> 24,28c24
> <:  : : +- *GlobalLimit 100
> <:  : :+- Exchange SinglePartition
> <:  : :   +- *LocalLimit 100
> <:  : :  +- *Project [...]
> <:  : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >:  : : +- *Scan orc [...] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> 49,53c45
> <   : : +- *GlobalLimit 100
> <   : :+- Exchange SinglePartition
> <   : :   +- *LocalLimit 100
> <   : :  +- *Project [...]
> <   : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >   : : +- *Scan orc [] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> {code}
> Does this give any clues as to why this {{limit()}} is helping? Again, the 
> 100 limit you can see in the plan is much higher than the cardinality of 
> the dataset I'm reading, so there is no theoretical impact on the output. You 
> can see the full query plans attached to this ticket.
> Unfortunately, I don't have a minimal reproduction for this issue, but I can 
> work towards one with some clues.
> I'm seeing this behavior on 2.0.1 and on master at commit 
> {{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-
Attachment: (was: plan-without-limit.txt)

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
>
> I have a complex DataFrame query that fails to run normally but succeeds if I 
> add a dummy {{limit()}} upstream in the query tree.
> The failure presents itself like this:
> {code}
> ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial 
> writes to file 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
> java.io.FileNotFoundException: 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
>  (Too many open files in system)
> {code}
> My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
> macOS. However, I don't think that's the issue, since if I add a dummy 
> {{limit()}} early on the query tree -- dummy as in it does not actually 
> reduce the number of rows queried -- then the same query works.
> I've diffed the physical query plans to see what this {{limit()}} is actually 
> doing, and the diff is as follows:
> {code}
> diff plan-with-limit.txt plan-without-limit.txt
> 24,28c24
> <:  : : +- *GlobalLimit 100
> <:  : :+- Exchange SinglePartition
> <:  : :   +- *LocalLimit 100
> <:  : :  +- *Project [...]
> <:  : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >:  : : +- *Scan orc [...] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> 49,53c45
> <   : : +- *GlobalLimit 100
> <   : :+- Exchange SinglePartition
> <   : :   +- *LocalLimit 100
> <   : :  +- *Project [...]
> <   : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >   : : +- *Scan orc [] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> {code}
> Does this give any clues as to why this {{limit()}} is helping? Again, the 
> 100 limit you can see in the plan is much higher than the cardinality of 
> the dataset I'm reading, so there is no theoretical impact on the output. You 
> can see the full query plans attached to this ticket.
> Unfortunately, I don't have a minimal reproduction for this issue, but I can 
> work towards one with some clues.
> I'm seeing this behavior on 2.0.1 and on master at commit 
> {{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18367) DataFrame join spawns unreasonably high number of open files

2016-11-10 Thread Nicholas Chammas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-
Summary: DataFrame join spawns unreasonably high number of open files  
(was: limit() makes the lame walk again)

> DataFrame join spawns unreasonably high number of open files
> 
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: plan-with-limit.txt, plan-without-limit.txt
>
>
> I have a complex DataFrame query that fails to run normally but succeeds if I 
> add a dummy {{limit()}} upstream in the query tree.
> The failure presents itself like this:
> {code}
> ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial 
> writes to file 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
> java.io.FileNotFoundException: 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
>  (Too many open files in system)
> {code}
> My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
> macOS. However, I don't think that's the issue, since if I add a dummy 
> {{limit()}} early on the query tree -- dummy as in it does not actually 
> reduce the number of rows queried -- then the same query works.
> I've diffed the physical query plans to see what this {{limit()}} is actually 
> doing, and the diff is as follows:
> {code}
> diff plan-with-limit.txt plan-without-limit.txt
> 24,28c24
> <:  : : +- *GlobalLimit 100
> <:  : :+- Exchange SinglePartition
> <:  : :   +- *LocalLimit 100
> <:  : :  +- *Project [...]
> <:  : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >:  : : +- *Scan orc [...] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> 49,53c45
> <   : : +- *GlobalLimit 100
> <   : :+- Exchange SinglePartition
> <   : :   +- *LocalLimit 100
> <   : :  +- *Project [...]
> <   : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >   : : +- *Scan orc [] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> {code}
> Does this give any clues as to why this {{limit()}} is helping? Again, the 
> 100 limit you can see in the plan is much higher than the cardinality of 
> the dataset I'm reading, so there is no theoretical impact on the output. You 
> can see the full query plans attached to this ticket.
> Unfortunately, I don't have a minimal reproduction for this issue, but I can 
> work towards one with some clues.
> I'm seeing this behavior on 2.0.1 and on master at commit 
> {{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18367) limit() makes the lame walk again

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655032#comment-15655032
 ] 

Nicholas Chammas commented on SPARK-18367:
--

Scratch that. This is not related to UDFs.

> limit() makes the lame walk again
> -
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: plan-with-limit.txt, plan-without-limit.txt
>
>
> I have a complex DataFrame query that fails to run normally but succeeds if I 
> add a dummy {{limit()}} upstream in the query tree.
> The failure presents itself like this:
> {code}
> ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial 
> writes to file 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
> java.io.FileNotFoundException: 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
>  (Too many open files in system)
> {code}
> My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
> macOS. However, I don't think that's the issue, since if I add a dummy 
> {{limit()}} early on the query tree -- dummy as in it does not actually 
> reduce the number of rows queried -- then the same query works.
> I've diffed the physical query plans to see what this {{limit()}} is actually 
> doing, and the diff is as follows:
> {code}
> diff plan-with-limit.txt plan-without-limit.txt
> 24,28c24
> <:  : : +- *GlobalLimit 100
> <:  : :+- Exchange SinglePartition
> <:  : :   +- *LocalLimit 100
> <:  : :  +- *Project [...]
> <:  : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >:  : : +- *Scan orc [...] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> 49,53c45
> <   : : +- *GlobalLimit 100
> <   : :+- Exchange SinglePartition
> <   : :   +- *LocalLimit 100
> <   : :  +- *Project [...]
> <   : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >   : : +- *Scan orc [] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> {code}
> Does this give any clues as to why this {{limit()}} is helping? Again, the 
> 100 limit you can see in the plan is much higher than the cardinality of 
> the dataset I'm reading, so there is no theoretical impact on the output. You 
> can see the full query plans attached to this ticket.
> Unfortunately, I don't have a minimal reproduction for this issue, but I can 
> work towards one with some clues.
> I'm seeing this behavior on 2.0.1 and on master at commit 
> {{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17982) SQLBuilder should wrap the generated SQL with parenthesis for LIMIT

2016-11-10 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-17982:
--
Summary: SQLBuilder should wrap the generated SQL with parenthesis for 
LIMIT  (was: Spark 2.0.0  CREATE VIEW statement fails :: 
java.lang.RuntimeException: Failed to analyze the canonicalized SQL. It is 
possible there is a bug in Spark.)

> SQLBuilder should wrap the generated SQL with parenthesis for LIMIT
> ---
>
> Key: SPARK-17982
> URL: https://issues.apache.org/jira/browse/SPARK-17982
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
> Environment: spark 2.0.0
>Reporter: Franck Tago
>Priority: Blocker
>
> The following statement fails in the spark shell . 
> {noformat}
> scala> spark.sql("CREATE VIEW 
> DEFAULT.sparkshell_2_VIEW__hive_quoted_with_where (WHERE_ID , WHERE_NAME ) AS 
> SELECT `where`.id,`where`.name FROM DEFAULT.`where` limit 2")
> scala> spark.sql("CREATE VIEW 
> DEFAULT.sparkshell_2_VIEW__hive_quoted_with_where (WHERE_ID , WHERE_NAME ) AS 
> SELECT `where`.id,`where`.name FROM DEFAULT.`where` limit 2")
> java.lang.RuntimeException: Failed to analyze the canonicalized SQL: SELECT 
> `gen_attr_0` AS `WHERE_ID`, `gen_attr_2` AS `WHERE_NAME` FROM (SELECT 
> `gen_attr_1` AS `gen_attr_0`, `gen_attr_3` AS `gen_attr_2` FROM SELECT 
> `gen_attr_1`, `gen_attr_3` FROM (SELECT `id` AS `gen_attr_1`, `name` AS 
> `gen_attr_3` FROM `default`.`where`) AS gen_subquery_0 LIMIT 2) AS 
> gen_subquery_1
>   at 
> org.apache.spark.sql.execution.command.CreateViewCommand.prepareTable(views.scala:192)
>   at 
> org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:122)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
>   at org.apache.spark.sql.Dataset.(Dataset.scala:186)
>   at org.apache.spark.sql.Dataset.(Dataset.scala:167)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
> {noformat}
> This appears to be a limitation of the create view statement .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17982) Spark 2.0.0 CREATE VIEW statement fails :: java.lang.RuntimeException: Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.

2016-11-10 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654987#comment-15654987
 ] 

Dongjoon Hyun commented on SPARK-17982:
---

Thank you for confirming, [~tafra...@gmail.com].

> Spark 2.0.0  CREATE VIEW statement fails :: java.lang.RuntimeException: 
> Failed to analyze the canonicalized SQL. It is possible there is a bug in 
> Spark.
> 
>
> Key: SPARK-17982
> URL: https://issues.apache.org/jira/browse/SPARK-17982
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
> Environment: spark 2.0.0
>Reporter: Franck Tago
>Priority: Blocker
>
> The following statement fails in the spark shell . 
> {noformat}
> scala> spark.sql("CREATE VIEW 
> DEFAULT.sparkshell_2_VIEW__hive_quoted_with_where (WHERE_ID , WHERE_NAME ) AS 
> SELECT `where`.id,`where`.name FROM DEFAULT.`where` limit 2")
> scala> spark.sql("CREATE VIEW 
> DEFAULT.sparkshell_2_VIEW__hive_quoted_with_where (WHERE_ID , WHERE_NAME ) AS 
> SELECT `where`.id,`where`.name FROM DEFAULT.`where` limit 2")
> java.lang.RuntimeException: Failed to analyze the canonicalized SQL: SELECT 
> `gen_attr_0` AS `WHERE_ID`, `gen_attr_2` AS `WHERE_NAME` FROM (SELECT 
> `gen_attr_1` AS `gen_attr_0`, `gen_attr_3` AS `gen_attr_2` FROM SELECT 
> `gen_attr_1`, `gen_attr_3` FROM (SELECT `id` AS `gen_attr_1`, `name` AS 
> `gen_attr_3` FROM `default`.`where`) AS gen_subquery_0 LIMIT 2) AS 
> gen_subquery_1
>   at 
> org.apache.spark.sql.execution.command.CreateViewCommand.prepareTable(views.scala:192)
>   at 
> org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:122)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
>   at org.apache.spark.sql.Dataset.(Dataset.scala:186)
>   at org.apache.spark.sql.Dataset.(Dataset.scala:167)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
> {noformat}
> This appears to be a limitation of the create view statement .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18402) spark: SAXParseException while writing from json to parquet on s3

2016-11-10 Thread Luke Miner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Miner updated SPARK-18402:
---
Environment: 
spark 2.0.1 hadoop 2.7.1
hadoop aws 2.7.1
ubuntu 14.04.5 on aws
mesos 1.0.1
Java 1.7.0_111, openjdk

  was:
spark 2.0.1 hadoop 2.7.1
hadoop aws 2.7.1
ubuntu 14.04.5 on aws
mesos 1.0.1


> spark: SAXParseException while writing from json to parquet on s3
> -
>
> Key: SPARK-18402
> URL: https://issues.apache.org/jira/browse/SPARK-18402
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Spark Submit
>Affects Versions: 1.6.2, 2.0.1
> Environment: spark 2.0.1 hadoop 2.7.1
> hadoop aws 2.7.1
> ubuntu 14.04.5 on aws
> mesos 1.0.1
> Java 1.7.0_111, openjdk
>Reporter: Luke Miner
>
> I'm trying to read in some json, infer a schema, and write it out again as 
> parquet to s3 (s3a). For some reason, about a third of the way through the 
> writing portion of the run, spark always errors out with the error included 
> below. 
> I can't find any obvious reasons for the issue:
> - it isn't out of memory and I have tried increasing the overhead memory
> - there are no long GC pauses.
> - There don't seem to be any additional error messages in the logs of the 
> individual executors.
> - This does not appear to be a problem with badly formed json or corrupted 
> files. I have unzipped and read in each file individually with no error.
> The script runs fine on another set of data that I have, which is of a very 
> similar structure, but several orders of magnitude smaller.
> I am using the FileOutputCommitter. The algorithm version doesn't seem to 
> matter.
> Here's a simplified version of the script:
> {code}
> object Foo {
>   def parseJson(json: String): Option[Map[String, Any]] = {
> if (json == null)
>   Some(Map())
> else
>   parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, 
> Any]])
>   }
>   }
> }
> // read in as text and parse json using json4s
> val jsonRDD: RDD[String] = sc.textFile(inputPath)
> .map(row -> Foo.parseJson(row))
> // infer a schema that will encapsulate the most rows in a sample of size 
> sampleRowNum
> val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, 
> sampleRowNum)
> // get documents compatibility with schema
> val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
>   .map(js => (js, Infer.getSchemaCompatibility(schema, 
> Infer.inferSchema(js)).toBoolean))
>   .repartition(partitions)
> val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
>   .filter { case (js: String, compatible: Boolean) => compatible }
>   .map { case (js: String, _: Boolean) => js }
> // create a dataframe from documents with compatible schema
> val dataFrame: DataFrame = 
> spark.read.schema(schema).json(jsonCompatibleRDD)
> dataFrame.write.parquet("s3a://foo/foo")
> {code}
> It completes the earlier schema inferring steps successfully. The error 
> itself occurs on the last line, but I suppose that could encompass at least 
> the immediately preceding statement, if not earlier:
> {code}
> org.apache.spark.SparkException: Task failed while writing rows
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Failed to commit task
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
> at 
> 

[jira] [Commented] (SPARK-17734) inner equi-join shorthand that returns Datasets, like DataFrame already has

2016-11-10 Thread Leonardo Yvens (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654971#comment-15654971
 ] 

Leonardo Yvens commented on SPARK-17734:


Hello, [~dongjoon] and [~pdxleif]. The issue wants a method that returns a 
typed Dataset but the suggestion in the first comment returns a DataFrame so it 
dosen't fix the issue. Maybe what you want is a `joinWith(other : Dataset\[U], 
usingColumn: String) : Dataset\[(T, U)]`? I think that method should not exist 
because it would have to keep the column duplicated, and that is inconsistent 
with the `join` method since it does not duplicate the column. Then this would 
be closed as wontfix. Is this correct or am I misunderstanding the issue?

> inner equi-join shorthand that returns Datasets, like DataFrame already has
> ---
>
> Key: SPARK-17734
> URL: https://issues.apache.org/jira/browse/SPARK-17734
> Project: Spark
>  Issue Type: Wish
>Reporter: Leif Warner
>Priority: Minor
>
> There's an existing ".join(right: Dataset[_], usingColumn: String): 
> DataFrame" method on Dataset.
> Would appreciate it if a variant that returns typed Datasets would also 
> available.
> If you write a join that contains the common column name name, you get an 
> AnalysisException thrown because that's ambiguous, e.g:
> $"foo" === $"foo"
> So I wrote table1.toDF()("foo") === table2.toDF()("foo"), but that's a little 
> error prone, and coworkers considered it a hack and didn't want to use it, 
> because it "mixes DataFrame and Dataset api".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18392) LSH API, algorithm, and documentation follow-ups

2016-11-10 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654942#comment-15654942
 ] 

Joseph K. Bradley commented on SPARK-18392:
---

That sounds like a good idea.  Please go ahead--thank you!

> LSH API, algorithm, and documentation follow-ups
> 
>
> Key: SPARK-18392
> URL: https://issues.apache.org/jira/browse/SPARK-18392
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: Joseph K. Bradley
>
> This JIRA summarizes discussions from the initial LSH PR 
> [https://github.com/apache/spark/pull/15148] as well as the follow-up for 
> hash distance [https://github.com/apache/spark/pull/15800].  This will be 
> broken into subtasks:
> * API changes (targeted for 2.1)
> * algorithmic fixes (targeted for 2.1)
> * documentation improvements (ideally 2.1, but could slip)
> The major issues we have mentioned are as follows:
> * OR vs AND amplification
> ** Need to make API flexible enough to support both types of amplification in 
> the future
> ** Need to clarify which we support, including in each model function 
> (transform, similarity join, neighbors)
> * Need to clarify which algorithms we have implemented, improve docs and 
> references, and fix the algorithms if needed.
> These major issues are broken down into detailed issues below.
> h3. LSH abstraction
> * Rename {{outputDim}} to something indicative of OR-amplification.
> ** My current top pick is {{numHashTables}}, with {{numHashFunctions}} used 
> in the future for AND amplification (Thanks [~mlnick]!)
> * transform
> ** Update output schema to {{Array of Vector}} instead of {{Vector}}.  This 
> is the "raw" output of all hash functions, i.e., with no aggregation for 
> amplification.
> ** Clarify meaning of output in terms of multiple hash functions and 
> amplification.
> ** Note: We will _not_ worry about users using this output for dimensionality 
> reduction; if anything, that use case can be explained in the User Guide.
> * Documentation
> ** Clarify terminology used everywhere
> *** hash function {{h_i}}: basic hash function without amplification
> *** hash value {{h_i(key)}}: output of a hash function
> *** compound hash function {{g = (h_0,h_1,...h_{K-1})}}: hash function with 
> AND-amplification using K base hash functions
> *** compound hash function value {{g(key)}}: vector-valued output
> *** hash table {{H = (g_0,g_1,...g_{L-1})}}: hash function with 
> OR-amplification using L compound hash functions
> *** hash table value {{H(key)}}: output of array of vectors
> *** This terminology is largely pulled from Wang et al.'s survey and the 
> multi-probe LSH paper.
> ** Link clearly to documentation (Wikipedia or papers) which matches our 
> terminology and what we implemented
> h3. RandomProjection (or P-Stable Distributions)
> * Rename {{RandomProjection}}
> ** Options include: {{ScalarRandomProjectionLSH}}, 
> {{BucketedRandomProjectionLSH}}, {{PStableLSH}}
> * API privacy
> ** Make randUnitVectors private
> * hashFunction
> ** Currently, this uses OR-amplification for single probing, as we intended.
> ** It does *not* do multiple probing, at least not in the sense of the 
> original MPLSH paper.  We should fix that or at least document its behavior.
> * Documentation
> ** Clarify this is the P-Stable Distribution LSH method listed in Wikipedia
> ** Also link to the multi-probe LSH paper since that explains this method 
> very clearly.
> ** Clarify hash function and distance metric
> h3. MinHash
> * Rename {{MinHash}} -> {{MinHashLSH}}
> * API privacy
> ** Make randCoefficients, numEntries private
> * hashDistance (used in approxNearestNeighbors)
> ** Update to use average of indicators of hash collisions [SPARK-18334]
> ** See [Wikipedia | 
> https://en.wikipedia.org/wiki/MinHash#Variant_with_many_hash_functions] for a 
> reference
> h3. All references
> I'm just listing references I looked at.
> Papers
> * [http://cseweb.ucsd.edu/~dasgupta/254-embeddings/lawrence.pdf]
> * [https://people.csail.mit.edu/indyk/p117-andoni.pdf]
> * [http://web.stanford.edu/class/cs345a/slides/05-LSH.pdf]
> * [http://www.cs.princeton.edu/cass/papers/mplsh_vldb07.pdf] - Multi-probe 
> LSH paper
> Wikipedia
> * 
> [https://en.wikipedia.org/wiki/Locality-sensitive_hashing#LSH_algorithm_for_nearest_neighbor_search]
> * [https://en.wikipedia.org/wiki/Locality-sensitive_hashing#Amplification]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-6417) Add Linear Programming algorithm

2016-11-10 Thread Ehsan Mohyedin Kermani (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ehsan Mohyedin Kermani updated SPARK-6417:
--
Comment: was deleted

(was: As I am working on the implementation, I have some serious 
concerns/questions. 

1) Since I am heavily using distributed matrix abstractions provided in linalg, 
I realized it's impossible for me to submit it as a separate package when the 
implementation is completed. I don't think it's possible to define new 
abstractions like Aaron Staple's TFOCS package, for this task.

2) Adding some functionalities like transforming a local matrix to a 
BlockMatrix is necessary, as the underlying matrix is a distributed BlockMatrix 
and that's the only distributed matrix data structure with minimal matrix 
operations suitable for this task (any suggestions?)

3) Because, there's no literature on LP in mapReduce, my only approach is 
primal-dual IPM but with underlying distributed matrix data structures, so can 
that be called a distributed LP solver? if so in what degree?

Please let me know your opinion and it'd be great to have someone who is 
willing to do the code review and guide me before any submission as I'm 
relatively new to Scala. )

> Add Linear Programming algorithm 
> -
>
> Key: SPARK-6417
> URL: https://issues.apache.org/jira/browse/SPARK-6417
> Project: Spark
>  Issue Type: New Feature
>  Components: MLlib
>Reporter: Fan Jiang
>  Labels: features
>
> Linear programming is the problem of finding a vector x that minimizes a 
> linear function fTx subject to linear constraints:
> minxfTx
> such that one or more of the following hold: A·x ≤ b, Aeq·x = beq, l ≤ x ≤ u.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18073) Migrate wiki to spark.apache.org web site

2016-11-10 Thread Siddharth Ahuja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654920#comment-15654920
 ] 

Siddharth Ahuja commented on SPARK-18073:
-

Hey [~srowen], as I may not be able to look into this soon (and consistently) 
due to other priorities coming in please feel free to assign this to someone 
else.

> Migrate wiki to spark.apache.org web site
> -
>
> Key: SPARK-18073
> URL: https://issues.apache.org/jira/browse/SPARK-18073
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.0.1
>Reporter: Sean Owen
>
> Per 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Mini-Proposal-Make-it-easier-to-contribute-to-the-contributing-to-Spark-Guide-td19493.html
>  , let's consider migrating all wiki pages to documents at 
> github.com/apache/spark-website (i.e. spark.apache.org).
> Some reasons:
> * No pull request system or history for changes to the wiki
> * Separate, not-so-clear system for granting write access to wiki
> * Wiki doesn't change much
> * One less place to maintain or look for docs
> The idea would be to then update all wiki pages with a message pointing to 
> the new home of the information (or message saying it's obsolete).
> Here are the current wikis and my general proposal for what to do with the 
> content:
> * Additional Language Bindings -> roll this into wherever Third Party 
> Projects ends up
> * Committers -> Migrate to a new /committers.html page, linked under 
> Community menu (alread exists)
> * Contributing to Spark -> Make this CONTRIBUTING.md? or a new 
> /contributing.html page under Community menu
> ** Jira Permissions Scheme -> obsolete
> ** Spark Code Style Guide -> roll this into new contributing.html page
> * Development Discussions -> obsolete?
> * Powered By Spark -> Make into new /powered-by.html linked by the existing 
> Commnunity menu item
> * Preparing Spark Releases -> see below; roll into where "versioning policy" 
> goes?
> * Profiling Spark Applications -> roll into where Useful Developer Tools goes
> ** Profiling Spark Applications Using YourKit -> ditto
> * Spark Internals -> all of these look somewhat to very stale; remove?
> ** Java API Internals
> ** PySpark Internals
> ** Shuffle Internals
> ** Spark SQL Internals
> ** Web UI Internals
> * Spark QA Infrastructure -> tough one. Good info to document; does it belong 
> on the website? we can just migrate it
> * Spark Versioning Policy -> new page living under Community (?) that 
> documents release policy and process (better menu?)
> ** spark-ec2 AMI list and install file version mappings -> obsolete
> ** Spark-Shark version mapping -> obsolete
> * Third Party Projects -> new Community menu item
> * Useful Developer Tools -> new page under new Developer menu? Community?
> ** Jenkins -> obsolete, remove
> Of course, another outcome is to just remove outdated wikis, migrate some, 
> leave the rest.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18403) ObjectHashAggregateSuite is being flaky (occasional OOM errors)

2016-11-10 Thread Cheng Lian (JIRA)
Cheng Lian created SPARK-18403:
--

 Summary: ObjectHashAggregateSuite is being flaky (occasional OOM 
errors)
 Key: SPARK-18403
 URL: https://issues.apache.org/jira/browse/SPARK-18403
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Cheng Lian
Assignee: Cheng Lian


This test suite fails occasionally on Jenkins due to OOM errors. I've already 
reproduced it locally but haven't figured out the root cause.

We should probably disable it temporarily before getting it fixed so that it 
doesn't break the PR build too often.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18402) spark: SAXParseException while writing from json to parquet on s3

2016-11-10 Thread Luke Miner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Miner updated SPARK-18402:
---
Environment: 
spark 2.0.1 hadoop 2.7.1
hadoop aws 2.7.1
ubuntu 14.04.5 on aws
mesos 1.0.1

  was:
spark 2.0.1 hadoop 2.7.1
hadoop aws 2.7.1
ubuntu 14.04.5 on aws


> spark: SAXParseException while writing from json to parquet on s3
> -
>
> Key: SPARK-18402
> URL: https://issues.apache.org/jira/browse/SPARK-18402
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Spark Submit
>Affects Versions: 1.6.2, 2.0.1
> Environment: spark 2.0.1 hadoop 2.7.1
> hadoop aws 2.7.1
> ubuntu 14.04.5 on aws
> mesos 1.0.1
>Reporter: Luke Miner
>
> I'm trying to read in some json, infer a schema, and write it out again as 
> parquet to s3 (s3a). For some reason, about a third of the way through the 
> writing portion of the run, spark always errors out with the error included 
> below. 
> I can't find any obvious reasons for the issue:
> - it isn't out of memory and I have tried increasing the overhead memory
> - there are no long GC pauses.
> - There don't seem to be any additional error messages in the logs of the 
> individual executors.
> - This does not appear to be a problem with badly formed json or corrupted 
> files. I have unzipped and read in each file individually with no error.
> The script runs fine on another set of data that I have, which is of a very 
> similar structure, but several orders of magnitude smaller.
> I am using the FileOutputCommitter. The algorithm version doesn't seem to 
> matter.
> Here's a simplified version of the script:
> {code}
> object Foo {
>   def parseJson(json: String): Option[Map[String, Any]] = {
> if (json == null)
>   Some(Map())
> else
>   parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, 
> Any]])
>   }
>   }
> }
> // read in as text and parse json using json4s
> val jsonRDD: RDD[String] = sc.textFile(inputPath)
> .map(row -> Foo.parseJson(row))
> // infer a schema that will encapsulate the most rows in a sample of size 
> sampleRowNum
> val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, 
> sampleRowNum)
> // get documents compatibility with schema
> val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
>   .map(js => (js, Infer.getSchemaCompatibility(schema, 
> Infer.inferSchema(js)).toBoolean))
>   .repartition(partitions)
> val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
>   .filter { case (js: String, compatible: Boolean) => compatible }
>   .map { case (js: String, _: Boolean) => js }
> // create a dataframe from documents with compatible schema
> val dataFrame: DataFrame = 
> spark.read.schema(schema).json(jsonCompatibleRDD)
> dataFrame.write.parquet("s3a://foo/foo")
> {code}
> It completes the earlier schema inferring steps successfully. The error 
> itself occurs on the last line, but I suppose that could encompass at least 
> the immediately preceding statement, if not earlier:
> {code}
> org.apache.spark.SparkException: Task failed while writing rows
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Failed to commit task
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
> at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
> at 
> 

[jira] [Updated] (SPARK-18402) spark: SAXParseException while writing from json to parquet on s3

2016-11-10 Thread Luke Miner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Miner updated SPARK-18402:
---
Description: 
I'm trying to read in some json, infer a schema, and write it out again as 
parquet to s3 (s3a). For some reason, about a third of the way through the 
writing portion of the run, spark always errors out with the error included 
below. 

I can't find any obvious reasons for the issue:
- it isn't out of memory and I have tried increasing the overhead memory
- there are no long GC pauses.
- There don't seem to be any additional error messages in the logs of the 
individual executors.
- This does not appear to be a problem with badly formed json or corrupted 
files. I have unzipped and read in each file individually with no error.

The script runs fine on another set of data that I have, which is of a very 
similar structure, but several orders of magnitude smaller.

I am using the FileOutputCommitter. The algorithm version doesn't seem to 
matter.

Here's a simplified version of the script:

{code}
object Foo {

  def parseJson(json: String): Option[Map[String, Any]] = {
if (json == null)
  Some(Map())
else
  parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, 
Any]])
  }
  }
}

// read in as text and parse json using json4s
val jsonRDD: RDD[String] = sc.textFile(inputPath)
.map(row -> Foo.parseJson(row))

// infer a schema that will encapsulate the most rows in a sample of size 
sampleRowNum
val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, 
sampleRowNum)

// get documents compatibility with schema
val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
  .map(js => (js, Infer.getSchemaCompatibility(schema, 
Infer.inferSchema(js)).toBoolean))
  .repartition(partitions)

val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
  .filter { case (js: String, compatible: Boolean) => compatible }
  .map { case (js: String, _: Boolean) => js }

// create a dataframe from documents with compatible schema
val dataFrame: DataFrame = spark.read.schema(schema).json(jsonCompatibleRDD)

dataFrame.write.parquet("s3a://foo/foo")
{code}

It completes the earlier schema inferring steps successfully. The error itself 
occurs on the last line, but I suppose that could encompass at least the 
immediately preceding statement, if not earlier:

{code}
org.apache.spark.SparkException: Task failed while writing rows
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to commit task
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
... 8 more
Suppressed: java.lang.NullPointerException
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
at 

[jira] [Updated] (SPARK-18402) spark: SAXParseException while writing from json to parquet on s3

2016-11-10 Thread Luke Miner (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Miner updated SPARK-18402:
---
Description: 
I'm trying to read in some json, infer a schema, and write it out again as 
parquet to s3 (s3a). For some reason, about a third of the way through the 
writing portion of the run, spark always errors out with the error included 
below. 

I can't find any obvious reasons for the issue:
- it isn't out of memory and I have tried increasing the overhead memory
- there are no long GC pauses.
- There don't seem to be any additional error messages in the logs of the 
individual executors.
- This does not appear to be a problem with badly formed json or corrupted 
files. I have unzipped and read in each file individually with no error.

The script runs fine on another set of data that I have, which is of a very 
similar structure, but several orders of magnitude smaller.

I am using the FileOutputCommitter. The algorithm version doesn't seem to 
matter.

Here's a simplified version of the script:

{code}
object Foo {

  def parseJson(json: String): Option[Map[String, Any]] = {
if (json == null)
  Some(Map())
else
  parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, 
Any]])
  }
  }
}

// read in as text and parse json using json4s
val jsonRDD: RDD[String] = sc.textFile(inputPath)
.map(row -> Foo.parseJson(row))

// infer a schema that will encapsulate the most rows in a sample of size 
sampleRowNum
val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, 
sampleRowNum)

// get documents compatibility with schema
val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
  .map(js => (js, Infer.getSchemaCompatibility(schema, 
Infer.inferSchema(js)).toBoolean))
  .repartition(partitions)

val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
  .filter { case (js: String, compatible: Boolean) => compatible }
  .map { case (js: String, _: Boolean) => js }

// create a dataframe from documents with compatible schema
val dataFrame: DataFrame = spark.read.schema(schema).json(jsonCompatibleRDD)

dataFrame.write.parquet("s3a://foo/foo")
{code}

It completes the earlier schema inferring steps successfully. The error itself 
occurs on the last line, but I suppose that could encompass at least the 
immediately preceding statement, if not earlier:

{code}
org.apache.spark.SparkException: Task failed while writing rows
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to commit task
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
... 8 more
Suppressed: java.lang.NullPointerException
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
at 

[jira] [Created] (SPARK-18402) spark: SAXParseException while writing from json to parquet on s3

2016-11-10 Thread Luke Miner (JIRA)
Luke Miner created SPARK-18402:
--

 Summary: spark: SAXParseException while writing from json to 
parquet on s3
 Key: SPARK-18402
 URL: https://issues.apache.org/jira/browse/SPARK-18402
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Spark Submit
Affects Versions: 2.0.1, 1.6.2
 Environment: spark 2.0.1 hadoop 2.7.1
hadoop aws 2.7.1
ubuntu 14.04.5 on aws
Reporter: Luke Miner


I'm trying to read in some json, infer a schema, and write it out again as 
parquet to s3 (s3a). For some reason, about a third of the way through the 
writing portion of the run, spark always errors out with the error included 
below. 

I can't find any obvious reasons for the issue:
- it isn't out of memory and I have tried increasing the overhead memory
- there are no long GC pauses.
- There don't seem to be any additional error messages in the logs of the 
individual executors.
- This does not appear to be a problem with badly formed json or corrupted 
files. I have unzipped and read in each file individually with no error.

The script runs fine on another set of data that I have, which is of a very 
similar structure, but several orders of magnitude smaller.

I am using the FileOutputCommitter. The algorithm version doesn't seem to 
matter.

Here's a simplified version of the script:

{code}
object Foo {

  def parseJson(json: String): Option[Map[String, Any]] = {
if (json == null)
  Some(Map())
else
  parseOpt(json).map((j: JValue) => j.values.asInstanceOf[Map[String, 
Any]])
  }
  }
}

// read in as text and parse json using json4s
val jsonRDD: RDD[String] = sc.textFile(inputPath)
.map(row -> Foo.parseJson(row))

// infer a schema that will encapsulate the most rows in a sample of size 
sampleRowNum
val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD, 
sampleRowNum)

// get documents compatibility with schema
val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
  .map(js => (js, Infer.getSchemaCompatibility(schema, 
Infer.inferSchema(js)).toBoolean))
  .repartition(partitions)

val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
  .filter { case (js: String, compatible: Boolean) => compatible }
  .map { case (js: String, _: Boolean) => js }

// create a dataframe from documents with compatible schema
val dataFrame: DataFrame = spark.read.schema(schema).json(jsonCompatibleRDD)
{code}

It completes the earlier schema inferring steps successfully. The error itself 
occurs on the last line, but I suppose that could encompass at least the 
immediately preceding statement, if not earlier:

{code}
org.apache.spark.SparkException: Task failed while writing rows
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to commit task
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
... 8 more
Suppressed: java.lang.NullPointerException
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
at 

[jira] [Commented] (SPARK-18367) limit() makes the lame walk again

2016-11-10 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654873#comment-15654873
 ] 

Herman van Hovell commented on SPARK-18367:
---

I think this one is on Spark. As a user you cannot easily control the number of 
files opened. I thought for a moment we were hitting a co-prime related issue 
here. 

> limit() makes the lame walk again
> -
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: plan-with-limit.txt, plan-without-limit.txt
>
>
> I have a complex DataFrame query that fails to run normally but succeeds if I 
> add a dummy {{limit()}} upstream in the query tree.
> The failure presents itself like this:
> {code}
> ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial 
> writes to file 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
> java.io.FileNotFoundException: 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
>  (Too many open files in system)
> {code}
> My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
> macOS. However, I don't think that's the issue, since if I add a dummy 
> {{limit()}} early on the query tree -- dummy as in it does not actually 
> reduce the number of rows queried -- then the same query works.
> I've diffed the physical query plans to see what this {{limit()}} is actually 
> doing, and the diff is as follows:
> {code}
> diff plan-with-limit.txt plan-without-limit.txt
> 24,28c24
> <:  : : +- *GlobalLimit 100
> <:  : :+- Exchange SinglePartition
> <:  : :   +- *LocalLimit 100
> <:  : :  +- *Project [...]
> <:  : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >:  : : +- *Scan orc [...] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> 49,53c45
> <   : : +- *GlobalLimit 100
> <   : :+- Exchange SinglePartition
> <   : :   +- *LocalLimit 100
> <   : :  +- *Project [...]
> <   : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >   : : +- *Scan orc [] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> {code}
> Does this give any clues as to why this {{limit()}} is helping? Again, the 
> 100 limit you can see in the plan is much higher than the cardinality of 
> the dataset I'm reading, so there is no theoretical impact on the output. You 
> can see the full query plans attached to this ticket.
> Unfortunately, I don't have a minimal reproduction for this issue, but I can 
> work towards one with some clues.
> I'm seeing this behavior on 2.0.1 and on master at commit 
> {{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2620) case class cannot be used as key for reduce

2016-11-10 Thread Shirish (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654830#comment-15654830
 ] 

Shirish commented on SPARK-2620:


Has this been resolved in 1.6?  I am facing similar issue - not sure if my 
problem is because of this issue.

> case class cannot be used as key for reduce
> ---
>
> Key: SPARK-2620
> URL: https://issues.apache.org/jira/browse/SPARK-2620
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 1.0.0, 1.1.0, 1.3.0, 1.4.0, 1.5.0, 1.6.0, 2.0.0, 2.1.0
> Environment: reproduced on spark-shell local[4]
>Reporter: Gerard Maas
>Assignee: Tobias Schlatter
>Priority: Critical
>  Labels: case-class, core
>
> Using a case class as a key doesn't seem to work properly on Spark 1.0.0
> A minimal example:
> case class P(name:String)
> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1), 
> (P(bob),1), (P(abe),1), (P(charly),1))
> In contrast to the expected behavior, that should be equivalent to:
> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
> groupByKey and distinct also present the same behavior.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18392) LSH API, algorithm, and documentation follow-ups

2016-11-10 Thread Yun Ni (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654822#comment-15654822
 ] 

Yun Ni commented on SPARK-18392:


I think your summary is very good in general. For class names, I think 
`BucketedRandomProjectionLSH` and `SignRandomProjectionLSH` looks intuitive and 
won't cause confusion.

If we are breaking into subtasks, let us prioritize output schema first because 
it will be a big change that affect most parts of current implementation. If no 
objection, shall I open a subtask and make a PR for that?

> LSH API, algorithm, and documentation follow-ups
> 
>
> Key: SPARK-18392
> URL: https://issues.apache.org/jira/browse/SPARK-18392
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: Joseph K. Bradley
>
> This JIRA summarizes discussions from the initial LSH PR 
> [https://github.com/apache/spark/pull/15148] as well as the follow-up for 
> hash distance [https://github.com/apache/spark/pull/15800].  This will be 
> broken into subtasks:
> * API changes (targeted for 2.1)
> * algorithmic fixes (targeted for 2.1)
> * documentation improvements (ideally 2.1, but could slip)
> The major issues we have mentioned are as follows:
> * OR vs AND amplification
> ** Need to make API flexible enough to support both types of amplification in 
> the future
> ** Need to clarify which we support, including in each model function 
> (transform, similarity join, neighbors)
> * Need to clarify which algorithms we have implemented, improve docs and 
> references, and fix the algorithms if needed.
> These major issues are broken down into detailed issues below.
> h3. LSH abstraction
> * Rename {{outputDim}} to something indicative of OR-amplification.
> ** My current top pick is {{numHashTables}}, with {{numHashFunctions}} used 
> in the future for AND amplification (Thanks [~mlnick]!)
> * transform
> ** Update output schema to {{Array of Vector}} instead of {{Vector}}.  This 
> is the "raw" output of all hash functions, i.e., with no aggregation for 
> amplification.
> ** Clarify meaning of output in terms of multiple hash functions and 
> amplification.
> ** Note: We will _not_ worry about users using this output for dimensionality 
> reduction; if anything, that use case can be explained in the User Guide.
> * Documentation
> ** Clarify terminology used everywhere
> *** hash function {{h_i}}: basic hash function without amplification
> *** hash value {{h_i(key)}}: output of a hash function
> *** compound hash function {{g = (h_0,h_1,...h_{K-1})}}: hash function with 
> AND-amplification using K base hash functions
> *** compound hash function value {{g(key)}}: vector-valued output
> *** hash table {{H = (g_0,g_1,...g_{L-1})}}: hash function with 
> OR-amplification using L compound hash functions
> *** hash table value {{H(key)}}: output of array of vectors
> *** This terminology is largely pulled from Wang et al.'s survey and the 
> multi-probe LSH paper.
> ** Link clearly to documentation (Wikipedia or papers) which matches our 
> terminology and what we implemented
> h3. RandomProjection (or P-Stable Distributions)
> * Rename {{RandomProjection}}
> ** Options include: {{ScalarRandomProjectionLSH}}, 
> {{BucketedRandomProjectionLSH}}, {{PStableLSH}}
> * API privacy
> ** Make randUnitVectors private
> * hashFunction
> ** Currently, this uses OR-amplification for single probing, as we intended.
> ** It does *not* do multiple probing, at least not in the sense of the 
> original MPLSH paper.  We should fix that or at least document its behavior.
> * Documentation
> ** Clarify this is the P-Stable Distribution LSH method listed in Wikipedia
> ** Also link to the multi-probe LSH paper since that explains this method 
> very clearly.
> ** Clarify hash function and distance metric
> h3. MinHash
> * Rename {{MinHash}} -> {{MinHashLSH}}
> * API privacy
> ** Make randCoefficients, numEntries private
> * hashDistance (used in approxNearestNeighbors)
> ** Update to use average of indicators of hash collisions [SPARK-18334]
> ** See [Wikipedia | 
> https://en.wikipedia.org/wiki/MinHash#Variant_with_many_hash_functions] for a 
> reference
> h3. All references
> I'm just listing references I looked at.
> Papers
> * [http://cseweb.ucsd.edu/~dasgupta/254-embeddings/lawrence.pdf]
> * [https://people.csail.mit.edu/indyk/p117-andoni.pdf]
> * [http://web.stanford.edu/class/cs345a/slides/05-LSH.pdf]
> * [http://www.cs.princeton.edu/cass/papers/mplsh_vldb07.pdf] - Multi-probe 
> LSH paper
> Wikipedia
> * 
> [https://en.wikipedia.org/wiki/Locality-sensitive_hashing#LSH_algorithm_for_nearest_neighbor_search]
> * [https://en.wikipedia.org/wiki/Locality-sensitive_hashing#Amplification]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-18367) limit() makes the lame walk again

2016-11-10 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654770#comment-15654770
 ] 

Nicholas Chammas commented on SPARK-18367:
--

Looks like this is a fundamental problem with Python UDFs. They somehow just 
hemorrhage open files.

Trying to write up a minimal repro now.

> limit() makes the lame walk again
> -
>
> Key: SPARK-18367
> URL: https://issues.apache.org/jira/browse/SPARK-18367
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1, 2.1.0
> Environment: Python 3.5, Java 8
>Reporter: Nicholas Chammas
> Attachments: plan-with-limit.txt, plan-without-limit.txt
>
>
> I have a complex DataFrame query that fails to run normally but succeeds if I 
> add a dummy {{limit()}} upstream in the query tree.
> The failure presents itself like this:
> {code}
> ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial 
> writes to file 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
> java.io.FileNotFoundException: 
> /private/var/folders/f5/t48vxz555b51mr3g6jjhxv40gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
>  (Too many open files in system)
> {code}
> My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
> macOS. However, I don't think that's the issue, since if I add a dummy 
> {{limit()}} early on the query tree -- dummy as in it does not actually 
> reduce the number of rows queried -- then the same query works.
> I've diffed the physical query plans to see what this {{limit()}} is actually 
> doing, and the diff is as follows:
> {code}
> diff plan-with-limit.txt plan-without-limit.txt
> 24,28c24
> <:  : : +- *GlobalLimit 100
> <:  : :+- Exchange SinglePartition
> <:  : :   +- *LocalLimit 100
> <:  : :  +- *Project [...]
> <:  : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >:  : : +- *Scan orc [...] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> 49,53c45
> <   : : +- *GlobalLimit 100
> <   : :+- Exchange SinglePartition
> <   : :   +- *LocalLimit 100
> <   : :  +- *Project [...]
> <   : : +- *Scan orc [...] 
> Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct<...
> ---
> >   : : +- *Scan orc [] Format: ORC, 
> > InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> > struct<...
> {code}
> Does this give any clues as to why this {{limit()}} is helping? Again, the 
> 100 limit you can see in the plan is much higher than the cardinality of 
> the dataset I'm reading, so there is no theoretical impact on the output. You 
> can see the full query plans attached to this ticket.
> Unfortunately, I don't have a minimal reproduction for this issue, but I can 
> work towards one with some clues.
> I'm seeing this behavior on 2.0.1 and on master at commit 
> {{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >