[jira] [Updated] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00

2016-02-10 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-13268:
-
Description: 
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{code}
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
{code}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 

The suggested fix for this is to update the timestamp toString representation 
to either a) Include timezone or b) Correctly display in GMT.

This change may well introduce substantial and insidious bugs so I'm not sure 
how best to resolve this.


  was:
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{code}
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
{code}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 



> SQL Timestamp stored as GMT but toString returns GMT-08:00
> --
>
> Key: SPARK-13268
> URL: https://issues.apache.org/jira/browse/SPARK-13268
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Ilya Ganelin
>
> There is an issue with how timestamps are displayed/converted to Strings in 
> Spark SQL. The documentation states that the timestamp should be created in 
> the GMT time zone, however, if we do so, we see that the output actually 
> contains a -8 hour offset:
> {code}
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
> res144: java.sql.Timestamp = 2014-12-31 16:00:00.0
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
> res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
> {code}
> This result is confusing, unintuitive, and introduces issues when converting 
> from DataFrames containing timestamps to RDDs which are then saved as text. 
> This has the effect of essentially shifting all dates in a dataset by 1 day. 
> The suggested fix for this is to update the timestamp toString representation 
> to either a) Include timezone or b) Correctly display in GMT.
> This change may well introduce substantial and insidious bugs so I'm not sure 
> how best to resolve this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00

2016-02-10 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-13268:
-
Description: 
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{code}
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
{code}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 


  was:
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{{ 
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
}}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 



> SQL Timestamp stored as GMT but toString returns GMT-08:00
> --
>
> Key: SPARK-13268
> URL: https://issues.apache.org/jira/browse/SPARK-13268
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Ilya Ganelin
>
> There is an issue with how timestamps are displayed/converted to Strings in 
> Spark SQL. The documentation states that the timestamp should be created in 
> the GMT time zone, however, if we do so, we see that the output actually 
> contains a -8 hour offset:
> {code}
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
> res144: java.sql.Timestamp = 2014-12-31 16:00:00.0
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
> res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
> {code}
> This result is confusing, unintuitive, and introduces issues when converting 
> from DataFrames containing timestamps to RDDs which are then saved as text. 
> This has the effect of essentially shifting all dates in a dataset by 1 day. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00

2016-02-10 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-13268:


 Summary: SQL Timestamp stored as GMT but toString returns GMT-08:00
 Key: SPARK-13268
 URL: https://issues.apache.org/jira/browse/SPARK-13268
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.0
Reporter: Ilya Ganelin


There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{{ 
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
}}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-28 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15073236#comment-15073236
 ] 

Ilya Ganelin commented on SPARK-12488:
--

I'll submit a dataset that causes this when I have a moment. Thanks!



Thank you,
Ilya Ganelin





> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068827#comment-15068827
 ] 

Ilya Ganelin commented on SPARK-12488:
--

Further investigation identifies the issue as stemming from the docTermVector 
containing zero-vectors (as in no words from the vocabulary present in the 
document).

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068761#comment-15068761
 ] 

Ilya Ganelin edited comment on SPARK-12488 at 12/22/15 9:32 PM:


[~josephkb] Would love your feedback here. Thanks!


was (Author: ilganeli):
@jkbradley Would love your feedback here. Thanks!

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068761#comment-15068761
 ] 

Ilya Ganelin commented on SPARK-12488:
--

@jkbradley Would love your feedback here. Thanks!

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-12488:
-
Description: 
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

The below example generates 10 topics on a data set with a vocabulary of 685.

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted.reverse
res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 
585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 
569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 
553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 
537, 536, 535, 534, 533, 532, 53...
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted
res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
-1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
-1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
-1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
-1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
-1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
-1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
-563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
-26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 
39, 40, 41, 42, 43, 44, 45, 4...
{code}

  was:
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

The below example generated 10 topics on a data set with a vocabulary of 685.

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted.reverse
res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 
585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 
569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 
553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 
537, 536, 535, 534, 533, 532, 53...
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted
res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
-1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
-1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
-1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
-1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
-1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
-1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
-563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
-26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 
39, 40, 41, 42, 43, 44, 45, 4...
{code}


> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2

[jira] [Updated] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-12488:
-
Summary: LDA describeTopics() Generates Invalid Term IDs  (was: LDA 
Describe Topics Generates Invalid Term IDs)

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generated 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12488) LDA Describe Topics Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-12488:
-
Description: 
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

The below example generated 10 topics on a data set with a vocabulary of 685.

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted.reverse
res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 
585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 
569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 
553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 
537, 536, 535, 534, 533, 532, 53...
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted
res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
-1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
-1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
-1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
-1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
-1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
-1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
-563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
-26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 
39, 40, 41, 42, 43, 44, 45, 4...
{code}

  was:
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}


> LDA Describe Topics Generates Invalid Term IDs
> --
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generated 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -

[jira] [Created] (SPARK-12488) LDA Describe Topics Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-12488:


 Summary: LDA Describe Topics Generates Invalid Term IDs
 Key: SPARK-12488
 URL: https://issues.apache.org/jira/browse/SPARK-12488
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Affects Versions: 1.5.2
Reporter: Ilya Ganelin


When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:57 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with {{key=1}}, then close that output 
buffer, write the next one etc.

Operational flow would become:
1) Attempt to create  new {{outputWriter}} for each possible {{key}}
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by {{key}} (and persist this sorted set of 
{{InternalRow}} objects in memory). 
4) One key at a time, create an {{outputWriter}} and write all rows associated 
with that key
5) Close outputWriter for that {{key}} and open a new {{outputWriter}}, 
continue from step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{{InternalRow}} objects in memory). 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:57 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{{InternalRow}} objects in memory). 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of InternalRow 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:55 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of {InternalRow} 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{code}InternalRow{code} objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin commented on SPARK-8890:
-

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{code}InternalRow{code} objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:55 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of InternalRow 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of {InternalRow} 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628533#comment-14628533
 ] 

Ilya Ganelin commented on SPARK-8890:
-

Once data is sorted, is the number of partitions guaranteed to be under that 
limit? When we're talking about sorting, are we talking about which columns are 
in which partition?

I want to make sure I understand what is happening. When we ingest a data 
frame, we consume a set of data organized by columns (the schema). When this 
data is partitioned, does all data under a certain column go to the same 
partition? If not, what happens in this stage?

We create a new ```outputWriter``` for each row based on the columns within 
that row (from the projected columns). New ```outputWriters``` become necessary 
when the columns within a row are different. However, given that the schema is 
fixed, where does this variability come from and what does it mean to "sort" in 
this context? 

> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628533#comment-14628533
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 6:46 PM:
--

Once data is sorted, is the number of partitions guaranteed to be under that 
limit? When we're talking about sorting, are we talking about which columns are 
in which partition?

I want to make sure I understand what is happening. When we ingest a data 
frame, we consume a set of data organized by columns (the schema). When this 
data is partitioned, does all data under a certain column go to the same 
partition? If not, what happens in this stage?

We create a new outputWriter for each row based on the columns within that row 
(from the projected columns). New outputWriters become necessary when the 
columns within a row are different. However, given that the schema is fixed, 
where does this variability come from and what does it mean to "sort" in this 
context? 


was (Author: ilganeli):
Once data is sorted, is the number of partitions guaranteed to be under that 
limit? When we're talking about sorting, are we talking about which columns are 
in which partition?

I want to make sure I understand what is happening. When we ingest a data 
frame, we consume a set of data organized by columns (the schema). When this 
data is partitioned, does all data under a certain column go to the same 
partition? If not, what happens in this stage?

We create a new ```outputWriter``` for each row based on the columns within 
that row (from the projected columns). New ```outputWriters``` become necessary 
when the columns within a row are different. However, given that the schema is 
fixed, where does this variability come from and what does it mean to "sort" in 
this context? 

> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628488#comment-14628488
 ] 

Ilya Ganelin commented on SPARK-8890:
-

[~rxin] I want to make sure I correctly understand your solution. Are you 
proposing that if the number of active partitions is beyond 50 we repartition 
the data into 50 partitions? 

I think we could approach this differently by creating a pool of OutputWriters 
(of size 50) and only create new OutputWriters once the previous partition has 
been written. This could be handled by blocking within the outputWriterForRow 
call when the new outputWriter is created. 

Does that seem reasonable? Please let me know, thanks!

> Reduce memory consumption for dynamic partition insert
> --
>
> Key: SPARK-8890
> URL: https://issues.apache.org/jira/browse/SPARK-8890
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Priority: Critical
>
> Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
> table partitions is large. The problem is that we open one output writer for 
> each partition, and when data are randomized and when the number of 
> partitions is large, we open a large number of output writers, leading to OOM.
> The solution here is to inject a sorting operation once the number of active 
> partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8907) Speed up path construction in DynamicPartitionWriterContainer.outputWriterForRow

2015-07-13 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625150#comment-14625150
 ] 

Ilya Ganelin commented on SPARK-8907:
-

[~rxin] The code for this in master has eliminated usage of zip and map as of 
[SPARK-8961|https://github.com/apache/spark/commit/33630883685eafcc3ee4521ea8363be342f6e6b4].
 Do you think this can be further optimized and if so, how? There doesn't seem 
to be much within the existing catalyst expressions that would facilitate this, 
but I could be wrong. 

The relevant code fragment is below:
{code}
val partitionPath = {
  val partitionPathBuilder = new StringBuilder
  var i = 0

  while (i < partitionColumns.length) {
val col = partitionColumns(i)
val partitionValueString = {
  val string = row.getString(i)
  if (string.eq(null)) defaultPartitionName else 
PartitioningUtils.escapePathName(string)
}

if (i > 0) {
  partitionPathBuilder.append(Path.SEPARATOR_CHAR)
}

partitionPathBuilder.append(s"$col=$partitionValueString")
i += 1
  }

  partitionPathBuilder.toString()
}
{code}

> Speed up path construction in 
> DynamicPartitionWriterContainer.outputWriterForRow
> 
>
> Key: SPARK-8907
> URL: https://issues.apache.org/jira/browse/SPARK-8907
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> Don't use zip and scala collection methods to avoid garbage collection
> {code}
> val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, 
> rawValue) =>
>   val string = if (rawValue == null) null else String.valueOf(rawValue)
>   val valueString = if (string == null || string.isEmpty) {
> defaultPartitionName
>   } else {
> PartitioningUtils.escapePathName(string)
>   }
>   s"/$col=$valueString"
> }.mkString.stripPrefix(Path.SEPARATOR)
> {code}
> We can probably use catalyst expressions themselves to construct the path, 
> and then we can leverage code generation to do this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3153) shuffle will run out of space when disks have different free space

2015-06-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608993#comment-14608993
 ] 

Ilya Ganelin commented on SPARK-3153:
-

cc [~davies] Believe this issue can be closed given duplication of SPARK-5418, 
no?

> shuffle will run out of space when disks have different free space
> --
>
> Key: SPARK-3153
> URL: https://issues.apache.org/jira/browse/SPARK-3153
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Reporter: Davies Liu
>
> If we have several disks in SPARK_LOCAL_DIRS, and one of them is much smaller 
> than others (maybe added in my mistake, or special disk, SSD), them the 
> shuffle will meet the problem of run out of space with this smaller disk.
> PySpark also has this issue during spilling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8464) Consider separating aggregator and non-aggregator paths in ExternalSorter

2015-06-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608292#comment-14608292
 ] 

Ilya Ganelin commented on SPARK-8464:
-

Josh - I'd be happy to look into this, I'll submit a PR shortly.

> Consider separating aggregator and non-aggregator paths in ExternalSorter
> -
>
> Key: SPARK-8464
> URL: https://issues.apache.org/jira/browse/SPARK-8464
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Reporter: Josh Rosen
>
> ExternalSorter is still really complicated and hard to understand.  We should 
> investigate whether separating the aggregator and non-aggregator paths into 
> separate files would make the code easier to understand without introducing 
> significant duplication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7996) Deprecate the developer api SparkEnv.actorSystem

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579643#comment-14579643
 ] 

Ilya Ganelin commented on SPARK-7996:
-

This seems to be mutually exclusive with 
https://issues.apache.org/jira/browse/SPARK-7997. Is the latter a placeholder?

> Deprecate the developer api SparkEnv.actorSystem
> 
>
> Key: SPARK-7996
> URL: https://issues.apache.org/jira/browse/SPARK-7996
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Shixiong Zhu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:38 PM:
-

How is this functionality different from the existing {{union}} functions 
within {{VertexRDD}} and {{EdgeRDD}} ?


was (Author: ilganeli):
How is this functionality different from the existing `union` functions within 
`VertexRDD` and `EdgeRDD` ?

> Graph Union Operator
> 
>
> Key: SPARK-7894
> URL: https://issues.apache.org/jira/browse/SPARK-7894
> Project: Spark
>  Issue Type: Sub-task
>  Components: GraphX
>Reporter: Andy Huang
>  Labels: graph, union
> Attachments: union_operator.png
>
>
> This operator aims to union two graphs and generate a new graph directly. The 
> union of two graphs is the union of their vertex sets and their edge 
> families.Vertexes and edges which are included in either graph will be part 
> of the new graph.
> bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
> The below image shows a union of graph G and graph H
> !union_operator.png|width=600px,align=center!
> A Simple interface would be:
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
> However, inevitably vertexes and edges overlapping will happen between 
> borders of graphs. For vertex, it's quite nature to just make a union and 
> remove those duplicate ones. But for edges, a mergeEdges function seems to be 
> more reasonable.
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
> (ED, ED) => ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:37 PM:
-

How is this functionality different from the existing `union` functions within 
`VertexRDD` and `EdgeRDD` ?


was (Author: ilganeli):
How is this functionality different from the existing {union} functions within 
{VertexRDD} and {EdgeRDD} ?

> Graph Union Operator
> 
>
> Key: SPARK-7894
> URL: https://issues.apache.org/jira/browse/SPARK-7894
> Project: Spark
>  Issue Type: Sub-task
>  Components: GraphX
>Reporter: Andy Huang
>  Labels: graph, union
> Attachments: union_operator.png
>
>
> This operator aims to union two graphs and generate a new graph directly. The 
> union of two graphs is the union of their vertex sets and their edge 
> families.Vertexes and edges which are included in either graph will be part 
> of the new graph.
> bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
> The below image shows a union of graph G and graph H
> !union_operator.png|width=600px,align=center!
> A Simple interface would be:
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
> However, inevitably vertexes and edges overlapping will happen between 
> borders of graphs. For vertex, it's quite nature to just make a union and 
> remove those duplicate ones. But for edges, a mergeEdges function seems to be 
> more reasonable.
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
> (ED, ED) => ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:35 PM:
-

How is this functionality different from the existing {union} functions within 
{VertexRDD} and {EdgeRDD} ?


was (Author: ilganeli):
How is this functionality different from the existing {code}union{code} 
functions within {code}VertexRDD{code} and {code}EdgeRDD{code} ?

> Graph Union Operator
> 
>
> Key: SPARK-7894
> URL: https://issues.apache.org/jira/browse/SPARK-7894
> Project: Spark
>  Issue Type: Sub-task
>  Components: GraphX
>Reporter: Andy Huang
>  Labels: graph, union
> Attachments: union_operator.png
>
>
> This operator aims to union two graphs and generate a new graph directly. The 
> union of two graphs is the union of their vertex sets and their edge 
> families.Vertexes and edges which are included in either graph will be part 
> of the new graph.
> bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
> The below image shows a union of graph G and graph H
> !union_operator.png|width=600px,align=center!
> A Simple interface would be:
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
> However, inevitably vertexes and edges overlapping will happen between 
> borders of graphs. For vertex, it's quite nature to just make a union and 
> remove those duplicate ones. But for edges, a mergeEdges function seems to be 
> more reasonable.
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
> (ED, ED) => ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:35 PM:
-

How is this functionality different from the existing {code}union{code} 
functions within {code}VertexRDD{code} and {code}EdgeRDD{code} ?


was (Author: ilganeli):
How is this functionality different from the existing ```union``` functions 
within ```VertexRDD``` and ```EdgeRDD``` ?

> Graph Union Operator
> 
>
> Key: SPARK-7894
> URL: https://issues.apache.org/jira/browse/SPARK-7894
> Project: Spark
>  Issue Type: Sub-task
>  Components: GraphX
>Reporter: Andy Huang
>  Labels: graph, union
> Attachments: union_operator.png
>
>
> This operator aims to union two graphs and generate a new graph directly. The 
> union of two graphs is the union of their vertex sets and their edge 
> families.Vertexes and edges which are included in either graph will be part 
> of the new graph.
> bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
> The below image shows a union of graph G and graph H
> !union_operator.png|width=600px,align=center!
> A Simple interface would be:
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
> However, inevitably vertexes and edges overlapping will happen between 
> borders of graphs. For vertex, it's quite nature to just make a union and 
> remove those duplicate ones. But for edges, a mergeEdges function seems to be 
> more reasonable.
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
> (ED, ED) => ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin commented on SPARK-7894:
-

How is this functionality different from the existing ```union``` functions 
within ```VertexRDD``` and ```EdgeRDD``` ?

> Graph Union Operator
> 
>
> Key: SPARK-7894
> URL: https://issues.apache.org/jira/browse/SPARK-7894
> Project: Spark
>  Issue Type: Sub-task
>  Components: GraphX
>Reporter: Andy Huang
>  Labels: graph, union
> Attachments: union_operator.png
>
>
> This operator aims to union two graphs and generate a new graph directly. The 
> union of two graphs is the union of their vertex sets and their edge 
> families.Vertexes and edges which are included in either graph will be part 
> of the new graph.
> bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
> The below image shows a union of graph G and graph H
> !union_operator.png|width=600px,align=center!
> A Simple interface would be:
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
> However, inevitably vertexes and edges overlapping will happen between 
> borders of graphs. For vertex, it's quite nature to just make a union and 
> remove those duplicate ones. But for edges, a mergeEdges function seems to be 
> more reasonable.
> bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
> (ED, ED) => ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-05 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14574847#comment-14574847
 ] 

Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 5:18 PM:
-

[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

With regards to also supporting a string for simple types, I think it's safer 
to enforce usage of DataType since I think the intent is for the SQL schema to 
be strictly typed. Were you suggesting that we allow passing "int" or "long" as 
the type argument or for us to infer it automatically by parsing the string? 
That approach seems a little more dangerous.


was (Author: ilganeli):
[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

With regards to also supporting a string for simple types, I think it's safer 
to enforce usage of DataType since the SQL schema should be strictly typed. 
Were you suggesting that we allow passing "int" or "long" as the type argument 
or for us to infer it automatically by parsing the string? That approach seems 
a little more dangerous.

> Design an easier way to construct schema for both Scala and Python
> --
>
> Key: SPARK-8056
> URL: https://issues.apache.org/jira/browse/SPARK-8056
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-05 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14574847#comment-14574847
 ] 

Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 5:17 PM:
-

[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

With regards to also supporting a string for simple types, I think it's safer 
to enforce usage of DataType since the SQL schema should be strictly typed. 
Were you suggesting that we allow passing "int" or "long" as the type argument 
or for us to infer it automatically by parsing the string? That approach seems 
a little more dangerous.


was (Author: ilganeli):
[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

> Design an easier way to construct schema for both Scala and Python
> --
>
> Key: SPARK-8056
> URL: https://issues.apache.org/jira/browse/SPARK-8056
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-05 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14574847#comment-14574847
 ] 

Ilya Ganelin commented on SPARK-8056:
-

[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

> Design an easier way to construct schema for both Scala and Python
> --
>
> Key: SPARK-8056
> URL: https://issues.apache.org/jira/browse/SPARK-8056
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-04 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573874#comment-14573874
 ] 

Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 12:35 AM:
--

[~rxin] Are you actively working on this? I think this could be readily solved 
by providing an interface to construct StructType the way we construct 
SparkConf, e.g.
new StructType().add("f1","v1).add("f1","v2") etc


was (Author: ilganeli):
[~rxin] Are you actively working on this? I think this could be readily solved 
by providing interface to construct StructType the way we construct SparkConf, 
e.g.
new StructType().add("f1","v1).add("f1","v2") etc

> Design an easier way to construct schema for both Scala and Python
> --
>
> Key: SPARK-8056
> URL: https://issues.apache.org/jira/browse/SPARK-8056
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-04 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573874#comment-14573874
 ] 

Ilya Ganelin commented on SPARK-8056:
-

[~rxin] Are you actively working on this? I think this could be readily solved 
by providing interface to construct StructType the way we construct SparkConf, 
e.g.
new StructType().add("f1","v1).add("f1","v2") etc

> Design an easier way to construct schema for both Scala and Python
> --
>
> Key: SPARK-8056
> URL: https://issues.apache.org/jira/browse/SPARK-8056
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility

2015-06-04 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin closed SPARK-6746.
---
Resolution: Won't Fix

> Refactor large functions in DAGScheduler to improve readibility
> ---
>
> Key: SPARK-6746
> URL: https://issues.apache.org/jira/browse/SPARK-6746
> Project: Spark
>  Issue Type: Sub-task
>  Components: Scheduler
>Reporter: Ilya Ganelin
>
> The DAGScheduler class contains two huge functions that make it 
> very hard to understand what's going on in the code. These are:
> 1) The monolithic handleTaskCompletion 
> 2) The cleanupStateForJobAndIndependentStages function
> These can be simply modularized to eliminate some awkward type casting and 
> improve code readability. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7075) Project Tungsten: Improving Physical Execution and Memory Management

2015-04-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521944#comment-14521944
 ] 

Ilya Ganelin commented on SPARK-7075:
-

This looks like the result of a large internal Databricks effort - are there 
pieces of this where you could use external help or is this issue in place 
primarily to document migration of internal code?

> Project Tungsten: Improving Physical Execution and Memory Management
> 
>
> Key: SPARK-7075
> URL: https://issues.apache.org/jira/browse/SPARK-7075
> Project: Spark
>  Issue Type: Epic
>  Components: Block Manager, Shuffle, Spark Core, SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> Based on our observation, majority of Spark workloads are not bottlenecked by 
> I/O or network, but rather CPU and memory. This project focuses on 3 areas to 
> improve the efficiency of memory and CPU for Spark applications, to push 
> performance closer to the limits of the underlying hardware.
> 1. Memory Management and Binary Processing: leveraging application semantics 
> to manage memory explicitly and eliminate the overhead of JVM object model 
> and garbage collection
> 2. Cache-aware computation: algorithms and data structures to exploit memory 
> hierarchy
> 3. Code generation: using code generation to exploit modern compilers and CPUs
> Several parts of project Tungsten leverage the DataFrame model, which gives 
> us more semantics about the application. We will also retrofit the 
> improvements onto Spark’s RDD API whenever possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4514) SparkContext localProperties does not inherit property updates across thread reuse

2015-04-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511537#comment-14511537
 ] 

Ilya Ganelin edited comment on SPARK-4514 at 4/24/15 7:37 PM:
--

[~joshrosen] - given your work on SPARK-6629, is this still relevant? I saw 
that there was a comment there stating that issue may not be a problem. I can 
knock this one out if it's still necessary.


was (Author: ilganeli):
[~joshrosen] - given your work on SPARK-6629 is this still relevant - I saw 
that there was a comment there stating that issue may not be a problem? I can 
knock this one out if it's still necessary.

> SparkContext localProperties does not inherit property updates across thread 
> reuse
> --
>
> Key: SPARK-4514
> URL: https://issues.apache.org/jira/browse/SPARK-4514
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0, 1.1.1, 1.2.0
>Reporter: Erik Erlandson
>Assignee: Josh Rosen
>Priority: Critical
>
> The current job group id of a Spark context is stored in the 
> {{localProperties}} member value.   This data structure is designed to be 
> thread local, and its settings are not preserved when {{ComplexFutureAction}} 
> instantiates a new {{Future}}.  
> One consequence of this is that {{takeAsync()}} does not behave in the same 
> way as other async actions, e.g. {{countAsync()}}.  For example, this test 
> (if copied into StatusTrackerSuite.scala), will fail, because 
> {{"my-job-group2"}} is not propagated to the Future which actually 
> instantiates the job:
> {code:java}
>   test("getJobIdsForGroup() with takeAsync()") {
> sc = new SparkContext("local", "test", new SparkConf(false))
> sc.setJobGroup("my-job-group2", "description")
> sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty)
> val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
> val firstJobId = eventually(timeout(10 seconds)) {
>   firstJobFuture.jobIds.head
> }
> eventually(timeout(10 seconds)) {
>   sc.statusTracker.getJobIdsForGroup("my-job-group2") should be 
> (Seq(firstJobId))
> }
>   }
> {code}
> It also impacts current PR for SPARK-1021, which involves additional uses of 
> {{ComplexFutureAction}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4514) SparkContext localProperties does not inherit property updates across thread reuse

2015-04-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511537#comment-14511537
 ] 

Ilya Ganelin commented on SPARK-4514:
-

[~joshrosen] - given your work on SPARK-6629 is this still relevant - I saw 
that there was a comment there stating that issue may not be a problem? I can 
knock this one out if it's still necessary.

> SparkContext localProperties does not inherit property updates across thread 
> reuse
> --
>
> Key: SPARK-4514
> URL: https://issues.apache.org/jira/browse/SPARK-4514
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0, 1.1.1, 1.2.0
>Reporter: Erik Erlandson
>Assignee: Josh Rosen
>Priority: Critical
>
> The current job group id of a Spark context is stored in the 
> {{localProperties}} member value.   This data structure is designed to be 
> thread local, and its settings are not preserved when {{ComplexFutureAction}} 
> instantiates a new {{Future}}.  
> One consequence of this is that {{takeAsync()}} does not behave in the same 
> way as other async actions, e.g. {{countAsync()}}.  For example, this test 
> (if copied into StatusTrackerSuite.scala), will fail, because 
> {{"my-job-group2"}} is not propagated to the Future which actually 
> instantiates the job:
> {code:java}
>   test("getJobIdsForGroup() with takeAsync()") {
> sc = new SparkContext("local", "test", new SparkConf(false))
> sc.setJobGroup("my-job-group2", "description")
> sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty)
> val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
> val firstJobId = eventually(timeout(10 seconds)) {
>   firstJobFuture.jobIds.head
> }
> eventually(timeout(10 seconds)) {
>   sc.statusTracker.getJobIdsForGroup("my-job-group2") should be 
> (Seq(firstJobId))
> }
>   }
> {code}
> It also impacts current PR for SPARK-1021, which involves additional uses of 
> {{ComplexFutureAction}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1021) sortByKey() launches a cluster job when it shouldn't

2015-04-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511230#comment-14511230
 ] 

Ilya Ganelin commented on SPARK-1021:
-

I'd be happy to look into this and 
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4514 .

> sortByKey() launches a cluster job when it shouldn't
> 
>
> Key: SPARK-1021
> URL: https://issues.apache.org/jira/browse/SPARK-1021
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 0.8.0, 0.9.0, 1.0.0, 1.1.0
>Reporter: Andrew Ash
>Assignee: Erik Erlandson
>  Labels: starter
>
> The sortByKey() method is listed as a transformation, not an action, in the 
> documentation.  But it launches a cluster job regardless.
> http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html
> Some discussion on the mailing list suggested that this is a problem with the 
> rdd.count() call inside Partitioner.scala's rangeBounds method.
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L102
> Josh Rosen suggests that rangeBounds should be made into a lazy variable:
> {quote}
> I wonder whether making RangePartitoner .rangeBounds into a lazy val would 
> fix this 
> (https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95).
>   We'd need to make sure that rangeBounds() is never called before an action 
> is performed.  This could be tricky because it's called in the 
> RangePartitioner.equals() method.  Maybe it's sufficient to just compare the 
> number of partitions, the ids of the RDDs used to create the 
> RangePartitioner, and the sort ordering.  This still supports the case where 
> I range-partition one RDD and pass the same partitioner to a different RDD.  
> It breaks support for the case where two range partitioners created on 
> different RDDs happened to have the same rangeBounds(), but it seems unlikely 
> that this would really harm performance since it's probably unlikely that the 
> range partitioners are equal by chance.
> {quote}
> Can we please make this happen?  I'll send a PR on GitHub to start the 
> discussion and testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException

2015-04-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510154#comment-14510154
 ] 

Ilya Ganelin commented on SPARK-5945:
-

So to recap:
a) Move failure count tracking into Stage
b) Reset failure count on Stage success, so even if that stage is re-submitted 
due to failures downstream, we never hit the cap
c) Remove config parameter. 

> Spark should not retry a stage infinitely on a FetchFailedException
> ---
>
> Key: SPARK-5945
> URL: https://issues.apache.org/jira/browse/SPARK-5945
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>Assignee: Ilya Ganelin
>
> While investigating SPARK-5928, I noticed some very strange behavior in the 
> way spark retries stages after a FetchFailedException.  It seems that on a 
> FetchFailedException, instead of simply killing the task and retrying, Spark 
> aborts the stage and retries.  If it just retried the task, the task might 
> fail 4 times and then trigger the usual job killing mechanism.  But by 
> killing the stage instead, the max retry logic is skipped (it looks to me 
> like there is no limit for retries on a stage).
> After a bit of discussion with Kay Ousterhout, it seems the idea is that if a 
> fetch fails, we assume that the block manager we are fetching from has 
> failed, and that it will succeed if we retry the stage w/out that block 
> manager.  In that case, it wouldn't make any sense to retry the task, since 
> its doomed to fail every time, so we might as well kill the whole stage.  But 
> this raises two questions:
> 1) Is it really safe to assume that a FetchFailedException means that the 
> BlockManager has failed, and ti will work if we just try another one?  
> SPARK-5928 shows that there are at least some cases where that assumption is 
> wrong.  Even if we fix that case, this logic seems brittle to the next case 
> we find.  I guess the idea is that this behavior is what gives us the "R" in 
> RDD ... but it seems like its not really that robust and maybe should be 
> reconsidered.
> 2) Should stages only be retried a limited number of times?  It would be 
> pretty easy to put in a limited number of retries per stage.  Though again, 
> we encounter issues with keeping things resilient.  Theoretically one stage 
> could have many retries, but due to failures in different stages further 
> downstream, so we might need to track the cause of each retry as well to 
> still have the desired behavior.
> In general it just seems there is some flakiness in the retry logic.  This is 
> the only reproducible example I have at the moment, but I vaguely recall 
> hitting other cases of strange behavior w/ retries when trying to run long 
> pipelines.  Eg., if one executor is stuck in a GC during a fetch, the fetch 
> fails, but the executor eventually comes back and the stage gets retried 
> again, but the same GC issues happen the second time around, etc.
> Copied from SPARK-5928, here's the example program that can regularly produce 
> a loop of stage failures.  Note that it will only fail from a remote fetch, 
> so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell 
> --num-executors 2 --executor-memory 4000m}}
> {code}
> val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>   val n = 3e3.toInt
>   val arr = new Array[Byte](n)
>   //need to make sure the array doesn't compress to something small
>   scala.util.Random.nextBytes(arr)
>   arr
> }
> rdd.map { x => (1, x)}.groupByKey().count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException

2015-04-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14508019#comment-14508019
 ] 

Ilya Ganelin commented on SPARK-5945:
-

[~kayousterhout] - thanks for the review. If I understand correctly, your 
suggestion would still address [~imranr]'s second comment since the first stage 
would always (or mostly succeed), e.g. it wouldn't have N consecutive failures 
so even if subsequent stages fail, those wouldn't count towards the failure 
count for this particular stage since it would have been reset when it 
succeeded. 

Do you have any thoughts on the first comment? Specifically, is retrying a 
stage likely to succeed at all or is it a waste of effort in the first place?


> Spark should not retry a stage infinitely on a FetchFailedException
> ---
>
> Key: SPARK-5945
> URL: https://issues.apache.org/jira/browse/SPARK-5945
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>Assignee: Ilya Ganelin
>
> While investigating SPARK-5928, I noticed some very strange behavior in the 
> way spark retries stages after a FetchFailedException.  It seems that on a 
> FetchFailedException, instead of simply killing the task and retrying, Spark 
> aborts the stage and retries.  If it just retried the task, the task might 
> fail 4 times and then trigger the usual job killing mechanism.  But by 
> killing the stage instead, the max retry logic is skipped (it looks to me 
> like there is no limit for retries on a stage).
> After a bit of discussion with Kay Ousterhout, it seems the idea is that if a 
> fetch fails, we assume that the block manager we are fetching from has 
> failed, and that it will succeed if we retry the stage w/out that block 
> manager.  In that case, it wouldn't make any sense to retry the task, since 
> its doomed to fail every time, so we might as well kill the whole stage.  But 
> this raises two questions:
> 1) Is it really safe to assume that a FetchFailedException means that the 
> BlockManager has failed, and ti will work if we just try another one?  
> SPARK-5928 shows that there are at least some cases where that assumption is 
> wrong.  Even if we fix that case, this logic seems brittle to the next case 
> we find.  I guess the idea is that this behavior is what gives us the "R" in 
> RDD ... but it seems like its not really that robust and maybe should be 
> reconsidered.
> 2) Should stages only be retried a limited number of times?  It would be 
> pretty easy to put in a limited number of retries per stage.  Though again, 
> we encounter issues with keeping things resilient.  Theoretically one stage 
> could have many retries, but due to failures in different stages further 
> downstream, so we might need to track the cause of each retry as well to 
> still have the desired behavior.
> In general it just seems there is some flakiness in the retry logic.  This is 
> the only reproducible example I have at the moment, but I vaguely recall 
> hitting other cases of strange behavior w/ retries when trying to run long 
> pipelines.  Eg., if one executor is stuck in a GC during a fetch, the fetch 
> fails, but the executor eventually comes back and the stage gets retried 
> again, but the same GC issues happen the second time around, etc.
> Copied from SPARK-5928, here's the example program that can regularly produce 
> a loop of stage failures.  Note that it will only fail from a remote fetch, 
> so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell 
> --num-executors 2 --executor-memory 4000m}}
> {code}
> val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>   val n = 3e3.toInt
>   val arr = new Array[Byte](n)
>   //need to make sure the array doesn't compress to something small
>   scala.util.Random.nextBytes(arr)
>   arr
> }
> rdd.map { x => (1, x)}.groupByKey().count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6891) ExecutorAllocationManager will request negative number executors

2015-04-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507886#comment-14507886
 ] 

Ilya Ganelin commented on SPARK-6891:
-

[~meiyoula]
I'm running Spark 1.3 (from the released builds) and I tried your code in the 
spark shell on yarn as spark-shell --master. I'm able to run it without issue, 
I can successfully run multiple calls to runSparkPi. 

Were you seeing this issue when running the trunk?  

> ExecutorAllocationManager will request negative number executors
> 
>
> Key: SPARK-6891
> URL: https://issues.apache.org/jira/browse/SPARK-6891
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: meiyoula
>Priority: Critical
> Attachments: DynamicExecutorTest.scala
>
>
> Below is the exception:
>15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread 
> spark-dynamic-executor-allocation-0
> java.lang.IllegalArgumentException: Attempted to request a negative 
> number of executor(s) -1 from the cluster manager. Please specify a positive 
> number!
>  at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342)
>  at 
> org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170)
>  at 
> org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
>  at 
> org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
>  at 
> org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>  at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> Below is the configurations I  setted:
>spark.dynamicAllocation.enabled true
>spark.dynamicAllocation.minExecutors   0
>spark.dynamicAllocation.initialExecutors3
>spark.dynamicAllocation.maxExecutors7
>spark.dynamicAllocation.executorIdleTimeout 30
>spark.shuffle.service.enabled   true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6891) ExecutorAllocationManager will request negative number executors

2015-04-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507886#comment-14507886
 ] 

Ilya Ganelin edited comment on SPARK-6891 at 4/22/15 8:59 PM:
--

[~meiyoula]
I'm running Spark 1.3 (from the released builds) and I tried your code in the 
spark shell on yarn as spark-shell --master yarn. I'm able to run it without 
issue, I can successfully run multiple calls to runSparkPi. 

Were you seeing this issue when running the trunk?  


was (Author: ilganeli):
[~meiyoula]
I'm running Spark 1.3 (from the released builds) and I tried your code in the 
spark shell on yarn as spark-shell --master. I'm able to run it without issue, 
I can successfully run multiple calls to runSparkPi. 

Were you seeing this issue when running the trunk?  

> ExecutorAllocationManager will request negative number executors
> 
>
> Key: SPARK-6891
> URL: https://issues.apache.org/jira/browse/SPARK-6891
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: meiyoula
>Priority: Critical
> Attachments: DynamicExecutorTest.scala
>
>
> Below is the exception:
>15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread 
> spark-dynamic-executor-allocation-0
> java.lang.IllegalArgumentException: Attempted to request a negative 
> number of executor(s) -1 from the cluster manager. Please specify a positive 
> number!
>  at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342)
>  at 
> org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170)
>  at 
> org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
>  at 
> org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
>  at 
> org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>  at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> Below is the configurations I  setted:
>spark.dynamicAllocation.enabled true
>spark.dynamicAllocation.minExecutors   0
>spark.dynamicAllocation.initialExecutors3
>spark.dynamicAllocation.maxExecutors7
>spark.dynamicAllocation.executorIdleTimeout 30
>spark.shuffle.service.enabled   true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6891) ExecutorAllocationManager will request negative number executors

2015-04-19 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14502322#comment-14502322
 ] 

Ilya Ganelin commented on SPARK-6891:
-

[~meiyoula] Any hints on reproducing this aside from your configuration? E.g. 
simple test code to execute?

> ExecutorAllocationManager will request negative number executors
> 
>
> Key: SPARK-6891
> URL: https://issues.apache.org/jira/browse/SPARK-6891
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: meiyoula
>Priority: Critical
>
> Below is the exception:
>15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread 
> spark-dynamic-executor-allocation-0
> java.lang.IllegalArgumentException: Attempted to request a negative 
> number of executor(s) -1 from the cluster manager. Please specify a positive 
> number!
>  at 
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342)
>  at 
> org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170)
>  at 
> org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
>  at 
> org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
>  at 
> org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>  at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723)
>  at 
> org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>  at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> Below is the configurations I  setted:
>spark.dynamicAllocation.enabled true
>spark.dynamicAllocation.minExecutors   0
>spark.dynamicAllocation.initialExecutors3
>spark.dynamicAllocation.maxExecutors7
>spark.dynamicAllocation.executorIdleTimeout 30
>spark.shuffle.service.enabled   true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6932) A Prototype of Parameter Server

2015-04-17 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6932:

Labels:   (was: kjhghbg)

> A Prototype of Parameter Server
> ---
>
> Key: SPARK-6932
> URL: https://issues.apache.org/jira/browse/SPARK-6932
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, MLlib, Spark Core
>Reporter: Qiping Li
>
>  h2. Introduction
> As specified in 
> [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590],it would be 
> very helpful to integrate parameter server into Spark for machine learning 
> algorithms, especially for those with ultra high dimensions features. 
> After carefully studying the design doc of [Parameter 
> Servers|https://docs.google.com/document/d/1SX3nkmF41wFXAAIr9BgqvrHSS5mW362fJ7roBXJm06o/edit?usp=sharing],and
>  the paper of [Factorbird|http://stanford.edu/~rezab/papers/factorbird.pdf], 
> we proposed a prototype of Parameter Server on Spark(Ps-on-Spark), with 
> several key design concerns:
> * *User friendly interface*
>   Careful investigation is done to most existing Parameter Server 
> systems(including:  [petuum|http://petuum.github.io], [parameter 
> server|http://parameterserver.org], 
> [paracel|https://github.com/douban/paracel]) and a user friendly interface is 
> design by absorbing essence from all these system. 
> * *Prototype of distributed array*
> IndexRDD (see 
> [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590]) doesn't seem 
> to be a good option for distributed array, because in most case, the #key 
> updates/second is not be very high. 
> So we implement a distributed HashMap to store the parameters, which can 
> be easily extended to get better performance.
> 
> * *Minimal code change*
>   Quite a lot of effort in done to avoid code change of Spark core. Tasks 
> which need parameter server are still created and scheduled by Spark's 
> scheduler. Tasks communicate with parameter server with a client object, 
> through *akka* or *netty*.
> With all these concerns we propose the following architecture:
> h2. Architecture
> !https://cloud.githubusercontent.com/assets/1285855/7158179/f2d25cc4-e3a9-11e4-835e-89681596c478.jpg!
> Data is stored in RDD and is partitioned across workers. During each 
> iteration, each worker gets parameters from parameter server then computes 
> new parameters based on old parameters and data in the partition. Finally 
> each worker updates parameters to parameter server.Worker communicates with 
> parameter server through a parameter server client,which is initialized in 
> `TaskContext` of this worker.
> The current implementation is based on YARN cluster mode, 
> but it should not be a problem to transplanted it to other modes. 
> h3. Interface
> We refer to existing parameter server systems(petuum, parameter server, 
> paracel) when design the interface of parameter server. 
> *`PSClient` provides the following interface for workers to use:*
> {code}
> //  get parameter indexed by key from parameter server
> def get[T](key: String): T
> // get multiple parameters from parameter server
> def multiGet[T](keys: Array[String]): Array[T]
> // add parameter indexed by `key` by `delta`, 
> // if multiple `delta` to update on the same parameter,
> // use `reduceFunc` to reduce these `delta`s frist.
> def update[T](key: String, delta: T, reduceFunc: (T, T) => T): Unit
> // update multiple parameters at the same time, use the same `reduceFunc`.
> def multiUpdate(keys: Array[String], delta: Array[T], reduceFunc: (T, T) => 
> T: Unit
> 
> // advance clock to indicate that current iteration is finished.
> def clock(): Unit
>  
> // block until all workers have reached this line of code.
> def sync(): Unit
> {code}
> *`PSContext` provides following functions to use on driver:*
> {code}
> // load parameters from existing rdd.
> def loadPSModel[T](model: RDD[String, T]) 
> // fetch parameters from parameter server to construct model.
> def fetchPSModel[T](keys: Array[String]): Array[T]
> {code} 
> 
> *A new function has been add to `RDD` to run parameter server tasks:*
> {code}
> // run the provided `func` on each partition of this RDD. 
> // This function can use data of this partition(the first argument) 
> // and a parameter server client(the second argument). 
> // See the following Logistic Regression for an example.
> def runWithPS[U: ClassTag](func: (Array[T], PSClient) => U): Array[U]
>
> {code}
> h2. Example
> Here is an example of using our prototype to implement logistic regression:
> {code:title=LogisticRegression.scala|borderStyle=solid}
> def train(
> sc: SparkContext,
> input: RDD[LabeledPoint],
> numIterations: Int,
> stepSize: Double,
> miniBatchFraction: Double): LogisticRegressionModel = {
> 
> // initialize weights
>

[jira] [Updated] (SPARK-6932) A Prototype of Parameter Server

2015-04-17 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6932:

Description: 
 h2. Introduction

As specified in 
[SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590],it would be very 
helpful to integrate parameter server into Spark for machine learning 
algorithms, especially for those with ultra high dimensions features. 

After carefully studying the design doc of [Parameter 
Servers|https://docs.google.com/document/d/1SX3nkmF41wFXAAIr9BgqvrHSS5mW362fJ7roBXJm06o/edit?usp=sharing],and
 the paper of [Factorbird|http://stanford.edu/~rezab/papers/factorbird.pdf], we 
proposed a prototype of Parameter Server on Spark(Ps-on-Spark), with several 
key design concerns:

* *User friendly interface*
Careful investigation is done to most existing Parameter Server 
systems(including:  [petuum|http://petuum.github.io], [parameter 
server|http://parameterserver.org], 
[paracel|https://github.com/douban/paracel]) and a user friendly interface is 
design by absorbing essence from all these system. 

* *Prototype of distributed array*
IndexRDD (see 
[SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590]) doesn't seem to 
be a good option for distributed array, because in most case, the #key 
updates/second is not be very high. 
So we implement a distributed HashMap to store the parameters, which can be 
easily extended to get better performance.

* *Minimal code change*
Quite a lot of effort in done to avoid code change of Spark core. Tasks 
which need parameter server are still created and scheduled by Spark's 
scheduler. Tasks communicate with parameter server with a client object, 
through *akka* or *netty*.

With all these concerns we propose the following architecture:

h2. Architecture

!https://cloud.githubusercontent.com/assets/1285855/7158179/f2d25cc4-e3a9-11e4-835e-89681596c478.jpg!

Data is stored in RDD and is partitioned across workers. During each iteration, 
each worker gets parameters from parameter server then computes new parameters 
based on old parameters and data in the partition. Finally each worker updates 
parameters to parameter server.Worker communicates with parameter server 
through a parameter server client,which is initialized in `TaskContext` of this 
worker.

The current implementation is based on YARN cluster mode, 
but it should not be a problem to transplanted it to other modes. 

h3. Interface

We refer to existing parameter server systems(petuum, parameter server, 
paracel) when design the interface of parameter server. 

*`PSClient` provides the following interface for workers to use:*

{code}

//  get parameter indexed by key from parameter server
def get[T](key: String): T

// get multiple parameters from parameter server
def multiGet[T](keys: Array[String]): Array[T]

// add parameter indexed by `key` by `delta`, 
// if multiple `delta` to update on the same parameter,
// use `reduceFunc` to reduce these `delta`s frist.
def update[T](key: String, delta: T, reduceFunc: (T, T) => T): Unit

// update multiple parameters at the same time, use the same `reduceFunc`.
def multiUpdate(keys: Array[String], delta: Array[T], reduceFunc: (T, T) => T: 
Unit

// advance clock to indicate that current iteration is finished.
def clock(): Unit
 
// block until all workers have reached this line of code.
def sync(): Unit
{code}

*`PSContext` provides following functions to use on driver:*

{code}

// load parameters from existing rdd.
def loadPSModel[T](model: RDD[String, T]) 

// fetch parameters from parameter server to construct model.
def fetchPSModel[T](keys: Array[String]): Array[T]

{code} 

*A new function has been add to `RDD` to run parameter server tasks:*

{code}

// run the provided `func` on each partition of this RDD. 
// This function can use data of this partition(the first argument) 
// and a parameter server client(the second argument). 
// See the following Logistic Regression for an example.
def runWithPS[U: ClassTag](func: (Array[T], PSClient) => U): Array[U]
   
{code}

h2. Example

Here is an example of using our prototype to implement logistic regression:

{code:title=LogisticRegression.scala|borderStyle=solid}

def train(
sc: SparkContext,
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double): LogisticRegressionModel = {

// initialize weights
val numFeatures = input.map(_.features.size).first()
val initialWeights = new Array[Double](numFeatures)

// initialize parameter server context
val pssc = new PSContext(sc)

// load initialized weights into parameter server
val initialModelRDD = sc.parallelize(Array(("w", initialWeights)), 1)
pssc.loadPSModel(initialModelRDD)

// run logistic regression algorithm on input data   
input.runWithPS((arr, client) => {
  val sampler = new Bernoull

[jira] [Comment Edited] (SPARK-6703) Provide a way to discover existing SparkContext's

2015-04-13 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492909#comment-14492909
 ] 

Ilya Ganelin edited comment on SPARK-6703 at 4/13/15 11:15 PM:
---

Patrick - what¹s the time line for the 1.4 release? Just want to have a
sense for it so I can schedule accordingly.



was (Author: ilganeli):
Patrick - what¹s the time line for the 1.4 release? Just want to have a
sense for it so I can schedule accordingly.

Thank you, 
Ilya Ganelin










The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



> Provide a way to discover existing SparkContext's
> -
>
> Key: SPARK-6703
> URL: https://issues.apache.org/jira/browse/SPARK-6703
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Patrick Wendell
>Assignee: Ilya Ganelin
>Priority: Critical
>
> Right now it is difficult to write a Spark application in a way that can be 
> run independently and also be composed with other Spark applications in an 
> environment such as the JobServer, notebook servers, etc where there is a 
> shared SparkContext.
> It would be nice to provide a rendez-vous point so that applications can 
> learn whether an existing SparkContext already exists before creating one.
> The most simple/surgical way I see to do this is to have an optional static 
> SparkContext singleton that people can be retrieved as follows:
> {code}
> val sc = SparkContext.getOrCreate(conf = new SparkConf())
> {code}
> And you could also have a setter where some outer framework/server can set it 
> for use by multiple downstream applications.
> A more advanced version of this would have some named registry or something, 
> but since we only support a single SparkContext in one JVM at this point 
> anyways, this seems sufficient and much simpler. Another advanced option 
> would be to allow plugging in some other notion of configuration you'd pass 
> when retrieving an existing context.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's

2015-04-13 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492909#comment-14492909
 ] 

Ilya Ganelin commented on SPARK-6703:
-

Patrick - what¹s the time line for the 1.4 release? Just want to have a
sense for it so I can schedule accordingly.

Thank you, 
Ilya Ganelin










The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



> Provide a way to discover existing SparkContext's
> -
>
> Key: SPARK-6703
> URL: https://issues.apache.org/jira/browse/SPARK-6703
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Patrick Wendell
>Assignee: Ilya Ganelin
>Priority: Critical
>
> Right now it is difficult to write a Spark application in a way that can be 
> run independently and also be composed with other Spark applications in an 
> environment such as the JobServer, notebook servers, etc where there is a 
> shared SparkContext.
> It would be nice to provide a rendez-vous point so that applications can 
> learn whether an existing SparkContext already exists before creating one.
> The most simple/surgical way I see to do this is to have an optional static 
> SparkContext singleton that people can be retrieved as follows:
> {code}
> val sc = SparkContext.getOrCreate(conf = new SparkConf())
> {code}
> And you could also have a setter where some outer framework/server can set it 
> for use by multiple downstream applications.
> A more advanced version of this would have some named registry or something, 
> but since we only support a single SparkContext in one JVM at this point 
> anyways, this seems sufficient and much simpler. Another advanced option 
> would be to allow plugging in some other notion of configuration you'd pass 
> when retrieving an existing context.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's

2015-04-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491877#comment-14491877
 ] 

Ilya Ganelin commented on SPARK-6703:
-

Patrick - I can look into this. Thank you.

> Provide a way to discover existing SparkContext's
> -
>
> Key: SPARK-6703
> URL: https://issues.apache.org/jira/browse/SPARK-6703
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Patrick Wendell
>
> Right now it is difficult to write a Spark application in a way that can be 
> run independently and also be composed with other Spark applications in an 
> environment such as the JobServer, notebook servers, etc where there is a 
> shared SparkContext.
> It would be nice to provide a rendez-vous point so that applications can 
> learn whether an existing SparkContext already exists before creating one.
> The most simple/surgical way I see to do this is to have an optional static 
> SparkContext singleton that people can be retrieved as follows:
> {code}
> val sc = SparkContext.getOrCreate(conf = new SparkConf())
> {code}
> And you could also have a setter where some outer framework/server can set it 
> for use by multiple downstream applications.
> A more advanced version of this would have some named registry or something, 
> but since we only support a single SparkContext in one JVM at this point 
> anyways, this seems sufficient and much simpler. Another advanced option 
> would be to allow plugging in some other notion of configuration you'd pass 
> when retrieving an existing context.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490576#comment-14490576
 ] 

Ilya Ganelin edited comment on SPARK-6839 at 4/11/15 12:09 AM:
---

The obvious solution won't work. 

Adding a {{TaskContext}} to {{dataSerialize()}} won't work because it's called 
from within both {{MemoryStore}} and {{TachyonStore}} which are instantiated 
within the {{BlockManager}} constructor. The {{TaskContext}} also can't be 
created within the constructor for {{BlockManager}} since that's created within 
the {{SparkEnv}} constructor which has no tasks associated with it.

The only workable solution that I can see is to assign a {{TaskContext}} to the 
{{BlockManager}} at run-time but that sounds very sketchy to me since the block 
manager is a singleton and we may have multiple tasks going at once. Any 
thoughts on this conundrum?


was (Author: ilganeli):
The obvious solution won't work. 

Adding a {code}TaskContext{code} to {code}dataSerialize(){code} won't work 
because it's called from within both {code}MemoryStore{code} and 
{code}TachyonStore{code} which are instantiated within the 
{code}BlockManager{code} constructor. The {code}TaskContext{code} also can't be 
created within the constructor for {code}BlockManager{code} since that's 
created within the {code}SparkEnv{code} constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a 
{code}TaskContext{code} to the {code}BlockManager{code} at run-time but that 
sounds very sketchy to me since the block manager is a singleton and we may 
have multiple tasks going at once. Any thoughts on this conundrum?

> BlockManager.dataDeserialize leaks resources on user exceptions
> ---
>
> Key: SPARK-6839
> URL: https://issues.apache.org/jira/browse/SPARK-6839
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>
> From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
> that 
> [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
>  doesn't  guarantee the underlying InputStream is properly closed.  In 
> particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
> there is an exception in user code.
> The problem is that right now, we convert the input streams to iterators, and 
> only close the input stream if the end of the iterator is reached.  But, we 
> might never reach the end of the iterator -- the obvious case is if there is 
> a bug in the user code, so tasks fail part of the way through the iterator.
> I think the solution is to give {{BlockManager.dataDeserialize}} a 
> {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
> the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490576#comment-14490576
 ] 

Ilya Ganelin edited comment on SPARK-6839 at 4/11/15 12:07 AM:
---

The obvious solution won't work. 

Adding a {code}TaskContext{code} to {code}dataSerialize(){code} won't work 
because it's called from within both {code}MemoryStore{code} and 
{code}TachyonStore{code} which are instantiated within the 
{code}BlockManager{code} constructor. The {code}TaskContext{code} also can't be 
created within the constructor for {code}BlockManager{code} since that's 
created within the {code}SparkEnv{code} constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a 
{code}TaskContext{code} to the {code}BlockManager{code} at run-time but that 
sounds very sketchy to me since the block manager is a singleton and we may 
have multiple tasks going at once. Any thoughts on this conundrum?


was (Author: ilganeli):
The obvious solution won't work. 

Adding a ```TaskContext``` to ```dataSerialize()``` won't work because it's 
called from within both ```MemoryStore``` and ```TachyonStore``` which are 
instantiated within the ```BlockManager``` constructor. The ```TaskContext``` 
also can't be created within the constructor for ```BlockManager``` since 
that's created within the ```SparkEnv``` constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a ```TaskContext``` to 
the ```BlockManager``` at run-time but that sounds very sketchy to me since the 
block manager is a singleton and we may have multiple tasks going at once. Any 
thoughts on this conundrum?

> BlockManager.dataDeserialize leaks resources on user exceptions
> ---
>
> Key: SPARK-6839
> URL: https://issues.apache.org/jira/browse/SPARK-6839
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>
> From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
> that 
> [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
>  doesn't  guarantee the underlying InputStream is properly closed.  In 
> particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
> there is an exception in user code.
> The problem is that right now, we convert the input streams to iterators, and 
> only close the input stream if the end of the iterator is reached.  But, we 
> might never reach the end of the iterator -- the obvious case is if there is 
> a bug in the user code, so tasks fail part of the way through the iterator.
> I think the solution is to give {{BlockManager.dataDeserialize}} a 
> {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
> the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490576#comment-14490576
 ] 

Ilya Ganelin commented on SPARK-6839:
-

The obvious solution won't work. 

Adding a ```TaskContext``` to ```dataSerialize()``` won't work because it's 
called from within both ```MemoryStore``` and ```TachyonStore``` which are 
instantiated within the ```BlockManager``` constructor. The ```TaskContext``` 
also can't be created within the constructor for ```BlockManager``` since 
that's created within the ```SparkEnv``` constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a ```TaskContext``` to 
the ```BlockManager``` at run-time but that sounds very sketchy to me since the 
block manager is a singleton and we may have multiple tasks going at once. Any 
thoughts on this conundrum?

> BlockManager.dataDeserialize leaks resources on user exceptions
> ---
>
> Key: SPARK-6839
> URL: https://issues.apache.org/jira/browse/SPARK-6839
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>
> From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
> that 
> [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
>  doesn't  guarantee the underlying InputStream is properly closed.  In 
> particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
> there is an exception in user code.
> The problem is that right now, we convert the input streams to iterators, and 
> only close the input stream if the end of the iterator is reached.  But, we 
> might never reach the end of the iterator -- the obvious case is if there is 
> a bug in the user code, so tasks fail part of the way through the iterator.
> I think the solution is to give {{BlockManager.dataDeserialize}} a 
> {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
> the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490545#comment-14490545
 ] 

Ilya Ganelin commented on SPARK-6839:
-

Imran - I can knock this out. Thanks!

> BlockManager.dataDeserialize leaks resources on user exceptions
> ---
>
> Key: SPARK-6839
> URL: https://issues.apache.org/jira/browse/SPARK-6839
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>
> From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
> that 
> [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
>  doesn't  guarantee the underlying InputStream is properly closed.  In 
> particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
> there is an exception in user code.
> The problem is that right now, we convert the input streams to iterators, and 
> only close the input stream if the end of the iterator is reached.  But, we 
> might never reach the end of the iterator -- the obvious case is if there is 
> a bug in the user code, so tasks fail part of the way through the iterator.
> I think the solution is to give {{BlockManager.dataDeserialize}} a 
> {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
> the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485586#comment-14485586
 ] 

Ilya Ganelin commented on SPARK-6780:
-

Matching test code:

{code}
 test("saveAsHadoopFileByKey should generate a text file per key") {
val testPairs : JavaRDD[Array[Byte]] = sc.parallelize(
  Seq(
Array(1.toByte,1.toByte),
Array(2.toByte,4.toByte),
Array(3.toByte,9.toByte),
Array(4.toByte,16.toByte),
Array(5.toByte,25.toByte))
).toJavaRDD()

val fs = FileSystem.get(new Configuration())
val basePath = sc.conf.get("spark.local.dir", "/tmp")
val fullPath = basePath + "/testPath"
fs.delete(new Path(fullPath), true)

PythonRDD.saveAsHadoopFileByKey(
  testPairs,
  false,
  fullPath,
  classOf[RDDMultipleTextOutputFormat].toString,
  classOf[Int].toString,
  classOf[Int].toString,
  null,
  null,
  new java.util.HashMap(), "")

// Test that a file was created for each key
(1 to 5).foreach(key => {
  val testPath = new Path(fullPath + "/" + key)
  assert(fs.exists(testPath))

  // Read the file and test that the contents are the values matching that 
key split by line
  val input = fs.open(testPath)
  val reader = new BufferedReader(new InputStreamReader(input))
  val values = new HashSet[Int]
  val lines = Stream.continually(reader.readLine()).takeWhile(_ != null)
  lines.foreach(s => values += s.toInt)

  assert(values.contains(key*key))
})

fs.delete(new Path(fullPath), true)
  }

{code}

> Add saveAsTextFileByKey method for PySpark
> --
>
> Key: SPARK-6780
> URL: https://issues.apache.org/jira/browse/SPARK-6780
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Reporter: Ilya Ganelin
>
> The PySpark API should have a method to allow saving a key-value RDD to 
> subdirectories organized by key as in :
> https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485582#comment-14485582
 ] 

Ilya Ganelin commented on SPARK-6780:
-

This code was my attempt to implement this within PythonRDD.scala but I ran 
into run-time reflection issues I could not solve.

{code}
  /**
   * Output a Python RDD of key-value pairs to any Hadoop file system such that 
the values within
   * the rdd are written to sub-directories organized by the associated key.
   *
   * Keys and values are converted to suitable output types using either user 
specified converters
   * or, if not specified, 
[[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion
   * types `keyClass` and `valueClass` are automatically inferred if not 
specified. The passed-in
   * `confAsMap` is merged with the default Hadoop conf associated with the 
SparkContext of
   * this RDD.
   */
  def saveAsHadoopFileByKey[K, V, C <: CompressionCodec](
  pyRDD: JavaRDD[Array[Byte]],
  batchSerialized: Boolean,
  path: String,
  outputFormatClass: String,
  keyClass: String,
  valueClass: String,
  keyConverterClass: String,
  valueConverterClass: String,
  confAsMap: java.util.HashMap[String, String],
  compressionCodecClass: String) = {
val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
  inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val codec = 
Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
  new JavaToWritableConverter)

converted.saveAsHadoopFile(path,
  ClassUtils.primitiveToWrapper(kc),
  ClassUtils.primitiveToWrapper(vc),
  classOf[RDDMultipleTextOutputFormat[K,V]],
  new JobConf(mergedConf),
  codec=codec)
  }

{code}

> Add saveAsTextFileByKey method for PySpark
> --
>
> Key: SPARK-6780
> URL: https://issues.apache.org/jira/browse/SPARK-6780
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Reporter: Ilya Ganelin
>
> The PySpark API should have a method to allow saving a key-value RDD to 
> subdirectories organized by key as in :
> https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485577#comment-14485577
 ] 

Ilya Ganelin commented on SPARK-6780:
-

SPARK-3533 defines matching methods for Scala and Java APIs.

> Add saveAsTextFileByKey method for PySpark
> --
>
> Key: SPARK-6780
> URL: https://issues.apache.org/jira/browse/SPARK-6780
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Reporter: Ilya Ganelin
>
> The PySpark API should have a method to allow saving a key-value RDD to 
> subdirectories organized by key as in :
> https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-6780:
---

 Summary: Add saveAsTextFileByKey method for PySpark
 Key: SPARK-6780
 URL: https://issues.apache.org/jira/browse/SPARK-6780
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Reporter: Ilya Ganelin


The PySpark API should have a method to allow saving a key-value RDD to 
subdirectories organized by key as in :

https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility

2015-04-07 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14483561#comment-14483561
 ] 

Ilya Ganelin commented on SPARK-6746:
-

SPARK-5945 requires updating the logic for handling fetch failures. This will 
introduce merge conflicts with this patch so this one should be merged first 
since it cleans up the DAG Scheduler code to help understand what's going on.

> Refactor large functions in DAGScheduler to improve readibility
> ---
>
> Key: SPARK-6746
> URL: https://issues.apache.org/jira/browse/SPARK-6746
> Project: Spark
>  Issue Type: Sub-task
>  Components: Scheduler
>Reporter: Ilya Ganelin
>
> The DAGScheduler class contains two huge functions that make it 
> very hard to understand what's going on in the code. These are:
> 1) The monolithic handleTaskCompletion 
> 2) The cleanupStateForJobAndIndependentStages function
> These can be simply modularized to eliminate some awkward type casting and 
> improve code readability. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility

2015-04-07 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-6746:
---

 Summary: Refactor large functions in DAGScheduler to improve 
readibility
 Key: SPARK-6746
 URL: https://issues.apache.org/jira/browse/SPARK-6746
 Project: Spark
  Issue Type: Sub-task
Reporter: Ilya Ganelin


The DAGScheduler class contains two huge functions that make it 
very hard to understand what's going on in the code. These are:

1) The monolithic handleTaskCompletion 
2) The cleanupStateForJobAndIndependentStages function

These can be simply modularized to eliminate some awkward type casting and 
improve code readability. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {code}SparkContext.stop() {code}.

A cursory examination reveals this in {code}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {code}.



  was:
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.




> IsStopped set to true in before stop() is complete.
> ---
>
> Key: SPARK-6616
> URL: https://issues.apache.org/jira/browse/SPARK-6616
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Ilya Ganelin
>
> There are numerous instances throughout the code base of the following:
> {code}
> if (!stopped) {
> stopped = true
> ...
> }
> {code}
> In general, this is bad practice since it can cause an incomplete cleanup if 
> there is an error during shutdown and not all code executes. Incomplete 
> cleanup is harder to track down than a double cleanup that triggers some 
> error. I propose fixing this throughout the code, starting with the cleanup 
> sequence with {code}SparkContext.stop() {code}.
> A cursory examination reveals this in {code}SparkContext.stop(), 
> SparkEnv.stop(), and ContextCleaner.stop() {code}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {code}SparkContext.stop() {code}

A cursory examination reveals this in {code}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {code}



  was:
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {code}SparkContext.stop() {code}.

A cursory examination reveals this in {code}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {code}.




> IsStopped set to true in before stop() is complete.
> ---
>
> Key: SPARK-6616
> URL: https://issues.apache.org/jira/browse/SPARK-6616
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Ilya Ganelin
>
> There are numerous instances throughout the code base of the following:
> {code}
> if (!stopped) {
> stopped = true
> ...
> }
> {code}
> In general, this is bad practice since it can cause an incomplete cleanup if 
> there is an error during shutdown and not all code executes. Incomplete 
> cleanup is harder to track down than a double cleanup that triggers some 
> error. I propose fixing this throughout the code, starting with the cleanup 
> sequence with {code}SparkContext.stop() {code}
> A cursory examination reveals this in {code}SparkContext.stop(), 
> SparkEnv.stop(), and ContextCleaner.stop() {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



  was:
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.




> IsStopped set to true in before stop() is complete.
> ---
>
> Key: SPARK-6616
> URL: https://issues.apache.org/jira/browse/SPARK-6616
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Ilya Ganelin
>
> There are numerous instances throughout the code base of the following:
> {code}
> if (!stopped) {
> stopped = true
> ...
> }
> {code}
> In general, this is bad practice since it can cause an incomplete cleanup if 
> there is an error during shutdown and not all code executes. Incomplete 
> cleanup is harder to track down than a double cleanup that triggers some 
> error. I propose fixing this throughout the code, starting with the cleanup 
> sequence with {{code}}SparkContext.stop() {{code}}.
> A cursory examination reveals this in {{code}}SparkContext.stop(), 
> SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



  was:
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop().{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}




> IsStopped set to true in before stop() is complete.
> ---
>
> Key: SPARK-6616
> URL: https://issues.apache.org/jira/browse/SPARK-6616
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Ilya Ganelin
>
> There are numerous instances throughout the code base of the following:
> {{code}}
> if (!stopped) {
> stopped = true
> ...
> }
> {{code}}
> In general, this is bad practice since it can cause an incomplete cleanup if 
> there is an error during shutdown and not all code executes. Incomplete 
> cleanup is harder to track down than a double cleanup that triggers some 
> error. I propose fixing this throughout the code, starting with the cleanup 
> sequence with {{code}}SparkContext.stop() {{code}}.
> A cursory examination reveals this in {{code}}SparkContext.stop(), 
> SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop().{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}



  was:
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop()```.{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}




> IsStopped set to true in before stop() is complete.
> ---
>
> Key: SPARK-6616
> URL: https://issues.apache.org/jira/browse/SPARK-6616
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Ilya Ganelin
>
> There are numerous instances throughout the code base of the following:
> {{code}}
> if (!stopped) {
> stopped = true
> ...
> }
> {{code}}
> In general, this is bad practice since it can cause an incomplete cleanup if 
> there is an error during shutdown and not all code executes. Incomplete 
> cleanup is harder to track down than a double cleanup that triggers some 
> error. I propose fixing this throughout the code, starting with the cleanup 
> sequence with {{code}}SparkContext.stop().{{code}}
> A cursory examination reveals this in {{code}}SparkContext.stop(), 
> SparkEnv.stop(), and ContextCleaner.stop().{{code}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop()```.{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}



  was:
There are numerous instances throughout the code base of the following:

```
if (!stopped) {
stopped = true
...
}
```

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with ```SparkContext.stop()```.

A cursory examination reveals this in ```SparkContext.stop(), SparkEnv.stop(), 
and ContextCleaner.stop()```.




> IsStopped set to true in before stop() is complete.
> ---
>
> Key: SPARK-6616
> URL: https://issues.apache.org/jira/browse/SPARK-6616
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.3.0
>Reporter: Ilya Ganelin
>
> There are numerous instances throughout the code base of the following:
> {{code}}
> if (!stopped) {
> stopped = true
> ...
> }
> {{code}}
> In general, this is bad practice since it can cause an incomplete cleanup if 
> there is an error during shutdown and not all code executes. Incomplete 
> cleanup is harder to track down than a double cleanup that triggers some 
> error. I propose fixing this throughout the code, starting with the cleanup 
> sequence with {{code}}SparkContext.stop()```.{{code}}
> A cursory examination reveals this in {{code}}SparkContext.stop(), 
> SparkEnv.stop(), and ContextCleaner.stop().{{code}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-6616:
---

 Summary: IsStopped set to true in before stop() is complete.
 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin


There are numerous instances throughout the code base of the following:

```
if (!stopped) {
stopped = true
...
}
```

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with ```SparkContext.stop()```.

A cursory examination reveals this in ```SparkContext.stop(), SparkEnv.stop(), 
and ContextCleaner.stop()```.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6492) SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies

2015-03-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387499#comment-14387499
 ] 

Ilya Ganelin edited comment on SPARK-6492 at 3/30/15 10:03 PM:
---

Would it be reasonable to fix this by adding some timeout/retry logic in the 
SparkContext shutdown code? If so, I can take care of this. Thanks. 


was (Author: ilganeli):
Would it be reasonable to fix this by adding some timeout/retry logic in the 
DAGScheduler shutdown code? If so, I can take care of this. Thanks. 

> SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies
> ---
>
> Key: SPARK-6492
> URL: https://issues.apache.org/jira/browse/SPARK-6492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Josh Rosen
>Priority: Critical
>
> A deadlock can occur when DAGScheduler death causes a SparkContext to be shut 
> down while user code is concurrently racing to stop the SparkContext in a 
> finally block.
> For example:
> {code}
> try {
>   sc = new SparkContext("local", "test")
>   // start running a job that causes the DAGSchedulerEventProcessor to 
> crash
>   someRDD.doStuff()
> }
> } finally {
>   sc.stop() // stop the sparkcontext once the failure in DAGScheduler causes 
> the above job to fail with an exception
> }
> {code}
> This leads to a deadlock.  The event processor thread tries to lock on the 
> {{SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK}} and becomes blocked because 
> the thread that holds that lock is waiting for the event processor thread to 
> join:
> {code}
> "dag-scheduler-event-loop" daemon prio=5 tid=0x7ffa69456000 nid=0x9403 
> waiting for monitor entry [0x0001223ad000]
>java.lang.Thread.State: BLOCKED (on object monitor)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1398)
>   - waiting to lock <0x0007f5037b08> (a java.lang.Object)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:1412)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:52)
> {code}
> {code}
> "pool-1-thread-1-ScalaTest-running-SparkContextSuite" prio=5 
> tid=0x7ffa69864800 nid=0x5903 in Object.wait() [0x0001202dc000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0x0007f4b28000> (a 
> org.apache.spark.util.EventLoop$$anon$1)
>   at java.lang.Thread.join(Thread.java:1281)
>   - locked <0x0007f4b28000> (a 
> org.apache.spark.util.EventLoop$$anon$1)
>   at java.lang.Thread.join(Thread.java:1355)
>   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:79)
>   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1352)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1405)
>   - locked <0x0007f5037b08> (a java.lang.Object)
> [...]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6492) SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies

2015-03-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387499#comment-14387499
 ] 

Ilya Ganelin commented on SPARK-6492:
-

Would it be reasonable to fix this by adding some timeout/retry logic in the 
DAGScheduler shutdown code? If so, I can take care of this. Thanks. 

> SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies
> ---
>
> Key: SPARK-6492
> URL: https://issues.apache.org/jira/browse/SPARK-6492
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Josh Rosen
>Priority: Critical
>
> A deadlock can occur when DAGScheduler death causes a SparkContext to be shut 
> down while user code is concurrently racing to stop the SparkContext in a 
> finally block.
> For example:
> {code}
> try {
>   sc = new SparkContext("local", "test")
>   // start running a job that causes the DAGSchedulerEventProcessor to 
> crash
>   someRDD.doStuff()
> }
> } finally {
>   sc.stop() // stop the sparkcontext once the failure in DAGScheduler causes 
> the above job to fail with an exception
> }
> {code}
> This leads to a deadlock.  The event processor thread tries to lock on the 
> {{SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK}} and becomes blocked because 
> the thread that holds that lock is waiting for the event processor thread to 
> join:
> {code}
> "dag-scheduler-event-loop" daemon prio=5 tid=0x7ffa69456000 nid=0x9403 
> waiting for monitor entry [0x0001223ad000]
>java.lang.Thread.State: BLOCKED (on object monitor)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1398)
>   - waiting to lock <0x0007f5037b08> (a java.lang.Object)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:1412)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:52)
> {code}
> {code}
> "pool-1-thread-1-ScalaTest-running-SparkContextSuite" prio=5 
> tid=0x7ffa69864800 nid=0x5903 in Object.wait() [0x0001202dc000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0x0007f4b28000> (a 
> org.apache.spark.util.EventLoop$$anon$1)
>   at java.lang.Thread.join(Thread.java:1281)
>   - locked <0x0007f4b28000> (a 
> org.apache.spark.util.EventLoop$$anon$1)
>   at java.lang.Thread.join(Thread.java:1355)
>   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:79)
>   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1352)
>   at org.apache.spark.SparkContext.stop(SparkContext.scala:1405)
>   - locked <0x0007f5037b08> (a java.lang.Object)
> [...]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5931) Use consistent naming for time properties

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367915#comment-14367915
 ] 

Ilya Ganelin edited comment on SPARK-5931 at 3/18/15 9:09 PM:
--

[~andrewor14] - I can take this out. Thanks.


was (Author: ilganeli):
@andrewor - I can take this out. Thanks.

> Use consistent naming for time properties
> -
>
> Key: SPARK-5931
> URL: https://issues.apache.org/jira/browse/SPARK-5931
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Andrew Or
>Assignee: Andrew Or
>
> This is SPARK-5932's sister issue.
> The naming of existing time configs is inconsistent. We currently have the 
> following throughout the code base:
> {code}
> spark.network.timeout // seconds
> spark.executor.heartbeatInterval // milliseconds
> spark.storage.blockManagerSlaveTimeoutMs // milliseconds
> spark.yarn.scheduler.heartbeat.interval-ms // milliseconds
> {code}
> Instead, my proposal is to simplify the config name itself and make 
> everything accept time using the following format: 5s, 2ms, 100us. For 
> instance:
> {code}
> spark.network.timeout = 5s
> spark.executor.heartbeatInterval = 500ms
> spark.storage.blockManagerSlaveTimeout = 100ms
> spark.yarn.scheduler.heartbeatInterval = 400ms
> {code}
> All existing configs that are relevant will be deprecated in favor of the new 
> ones. We should do this soon before we keep introducing more time configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5931) Use consistent naming for time properties

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367915#comment-14367915
 ] 

Ilya Ganelin commented on SPARK-5931:
-

@andrewor - I can take this out. Thanks.

> Use consistent naming for time properties
> -
>
> Key: SPARK-5931
> URL: https://issues.apache.org/jira/browse/SPARK-5931
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Andrew Or
>Assignee: Andrew Or
>
> This is SPARK-5932's sister issue.
> The naming of existing time configs is inconsistent. We currently have the 
> following throughout the code base:
> {code}
> spark.network.timeout // seconds
> spark.executor.heartbeatInterval // milliseconds
> spark.storage.blockManagerSlaveTimeoutMs // milliseconds
> spark.yarn.scheduler.heartbeat.interval-ms // milliseconds
> {code}
> Instead, my proposal is to simplify the config name itself and make 
> everything accept time using the following format: 5s, 2ms, 100us. For 
> instance:
> {code}
> spark.network.timeout = 5s
> spark.executor.heartbeatInterval = 500ms
> spark.storage.blockManagerSlaveTimeout = 100ms
> spark.yarn.scheduler.heartbeatInterval = 400ms
> {code}
> All existing configs that are relevant will be deprecated in favor of the new 
> ones. We should do this soon before we keep introducing more time configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5932) Use consistent naming for byte properties

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367919#comment-14367919
 ] 

Ilya Ganelin commented on SPARK-5932:
-

[~andrewor14] - I can take this out. Thanks.

> Use consistent naming for byte properties
> -
>
> Key: SPARK-5932
> URL: https://issues.apache.org/jira/browse/SPARK-5932
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.0.0
>Reporter: Andrew Or
>Assignee: Andrew Or
>
> This is SPARK-5931's sister issue.
> The naming of existing byte configs is inconsistent. We currently have the 
> following throughout the code base:
> {code}
> spark.reducer.maxMbInFlight // megabytes
> spark.kryoserializer.buffer.mb // megabytes
> spark.shuffle.file.buffer.kb // kilobytes
> spark.broadcast.blockSize // kilobytes
> spark.executor.logs.rolling.size.maxBytes // bytes
> spark.io.compression.snappy.block.size // bytes
> {code}
> Instead, my proposal is to simplify the config name itself and make 
> everything accept time using the following format: 500b, 2k, 100m, 46g, 
> similar to what we currently use for our memory settings. For instance:
> {code}
> spark.reducer.maxSizeInFlight = 10m
> spark.kryoserializer.buffer = 2m
> spark.shuffle.file.buffer = 10k
> spark.broadcast.blockSize = 20k
> spark.executor.logs.rolling.maxSize = 500b
> spark.io.compression.snappy.blockSize = 200b
> {code}
> All existing configs that are relevant will be deprecated in favor of the new 
> ones. We should do this soon before we keep introducing more time configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367891#comment-14367891
 ] 

Ilya Ganelin commented on SPARK-5945:
-

Hi Imran - I'd be happy to tackle this. Could you please assign it to me? Thank 
you. 

> Spark should not retry a stage infinitely on a FetchFailedException
> ---
>
> Key: SPARK-5945
> URL: https://issues.apache.org/jira/browse/SPARK-5945
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>
> While investigating SPARK-5928, I noticed some very strange behavior in the 
> way spark retries stages after a FetchFailedException.  It seems that on a 
> FetchFailedException, instead of simply killing the task and retrying, Spark 
> aborts the stage and retries.  If it just retried the task, the task might 
> fail 4 times and then trigger the usual job killing mechanism.  But by 
> killing the stage instead, the max retry logic is skipped (it looks to me 
> like there is no limit for retries on a stage).
> After a bit of discussion with Kay Ousterhout, it seems the idea is that if a 
> fetch fails, we assume that the block manager we are fetching from has 
> failed, and that it will succeed if we retry the stage w/out that block 
> manager.  In that case, it wouldn't make any sense to retry the task, since 
> its doomed to fail every time, so we might as well kill the whole stage.  But 
> this raises two questions:
> 1) Is it really safe to assume that a FetchFailedException means that the 
> BlockManager has failed, and ti will work if we just try another one?  
> SPARK-5928 shows that there are at least some cases where that assumption is 
> wrong.  Even if we fix that case, this logic seems brittle to the next case 
> we find.  I guess the idea is that this behavior is what gives us the "R" in 
> RDD ... but it seems like its not really that robust and maybe should be 
> reconsidered.
> 2) Should stages only be retried a limited number of times?  It would be 
> pretty easy to put in a limited number of retries per stage.  Though again, 
> we encounter issues with keeping things resilient.  Theoretically one stage 
> could have many retries, but due to failures in different stages further 
> downstream, so we might need to track the cause of each retry as well to 
> still have the desired behavior.
> In general it just seems there is some flakiness in the retry logic.  This is 
> the only reproducible example I have at the moment, but I vaguely recall 
> hitting other cases of strange behavior w/ retries when trying to run long 
> pipelines.  Eg., if one executor is stuck in a GC during a fetch, the fetch 
> fails, but the executor eventually comes back and the stage gets retried 
> again, but the same GC issues happen the second time around, etc.
> Copied from SPARK-5928, here's the example program that can regularly produce 
> a loop of stage failures.  Note that it will only fail from a remote fetch, 
> so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell 
> --num-executors 2 --executor-memory 4000m}}
> {code}
> val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>   val n = 3e3.toInt
>   val arr = new Array[Byte](n)
>   //need to make sure the array doesn't compress to something small
>   scala.util.Random.nextBytes(arr)
>   arr
> }
> rdd.map { x => (1, x)}.groupByKey().count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.

2015-03-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14359294#comment-14359294
 ] 

Ilya Ganelin commented on SPARK-4927:
-

Are you running over yarn? My theory is that the memory usage has to do with 
data movement between nodes.



Sent with Good (www.good.com)




> Spark does not clean up properly during long jobs. 
> ---
>
> Key: SPARK-4927
> URL: https://issues.apache.org/jira/browse/SPARK-4927
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Ilya Ganelin
>
> On a long running Spark job, Spark will eventually run out of memory on the 
> driver node due to metadata overhead from the shuffle operation. Spark will 
> continue to operate, however with drastically decreased performance (since 
> swapping now occurs with every operation).
> The spark.cleanup.tll parameter allows a user to configure when cleanup 
> happens but the issue with doing this is that it isn’t done safely, e.g. If 
> this clears a cached RDD or active task in the middle of processing a stage, 
> this ultimately causes a KeyNotFoundException when the next stage attempts to 
> reference the cleared RDD or task.
> There should be a sustainable mechanism for cleaning up stale metadata that 
> allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

2015-03-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358958#comment-14358958
 ] 

Ilya Ganelin edited comment on SPARK-4927 at 3/12/15 6:50 PM:
--

Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.

Realized that I already had that code snippet posted. Running the above code 
doesn't reproduce the issue?



was (Author: ilganeli):
Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.



Sent with Good (www.good.com)




> Spark does not clean up properly during long jobs. 
> ---
>
> Key: SPARK-4927
> URL: https://issues.apache.org/jira/browse/SPARK-4927
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Ilya Ganelin
>
> On a long running Spark job, Spark will eventually run out of memory on the 
> driver node due to metadata overhead from the shuffle operation. Spark will 
> continue to operate, however with drastically decreased performance (since 
> swapping now occurs with every operation).
> The spark.cleanup.tll parameter allows a user to configure when cleanup 
> happens but the issue with doing this is that it isn’t done safely, e.g. If 
> this clears a cached RDD or active task in the middle of processing a stage, 
> this ultimately causes a KeyNotFoundException when the next stage attempts to 
> reference the cleared RDD or task.
> There should be a sustainable mechanism for cleaning up stale metadata that 
> allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.

2015-03-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358958#comment-14358958
 ] 

Ilya Ganelin commented on SPARK-4927:
-

Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.



Sent with Good (www.good.com)




> Spark does not clean up properly during long jobs. 
> ---
>
> Key: SPARK-4927
> URL: https://issues.apache.org/jira/browse/SPARK-4927
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.1.0
>Reporter: Ilya Ganelin
>
> On a long running Spark job, Spark will eventually run out of memory on the 
> driver node due to metadata overhead from the shuffle operation. Spark will 
> continue to operate, however with drastically decreased performance (since 
> swapping now occurs with every operation).
> The spark.cleanup.tll parameter allows a user to configure when cleanup 
> happens but the issue with doing this is that it isn’t done safely, e.g. If 
> this clears a cached RDD or active task in the middle of processing a stage, 
> this ultimately causes a KeyNotFoundException when the next stage attempts to 
> reference the cleared RDD or task.
> There should be a sustainable mechanism for cleaning up stale metadata that 
> allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs

2015-03-04 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347216#comment-14347216
 ] 

Ilya Ganelin commented on SPARK-3533:
-

[~aaronjosephs] - Let me see if that's it. Thanks!

> Add saveAsTextFileByKey() method to RDDs
> 
>
> Key: SPARK-3533
> URL: https://issues.apache.org/jira/browse/SPARK-3533
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 1.1.0
>Reporter: Nicholas Chammas
>
> Users often have a single RDD of key-value pairs that they want to save to 
> multiple locations based on the keys.
> For example, say I have an RDD like this:
> {code}
> >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 
> >>> 'Frankie']).keyBy(lambda x: x[0])
> >>> a.collect()
> [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')]
> >>> a.keys().distinct().collect()
> ['B', 'F', 'N']
> {code}
> Now I want to write the RDD out to different paths depending on the keys, so 
> that I have one output directory per distinct key. Each output directory 
> could potentially have multiple {{part-}} files, one per RDD partition.
> So the output would look something like:
> {code}
> /path/prefix/B [/part-1, /part-2, etc]
> /path/prefix/F [/part-1, /part-2, etc]
> /path/prefix/N [/part-1, /part-2, etc]
> {code}
> Though it may be possible to do this with some combination of 
> {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the 
> {{MultipleTextOutputFormat}} output format class, it isn't straightforward. 
> It's not clear if it's even possible at all in PySpark.
> Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs 
> that makes it easy to save RDDs out to multiple locations at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs

2015-03-02 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14343387#comment-14343387
 ] 

Ilya Ganelin commented on SPARK-3533:
-

Hey [~aaronjosephs], please feel free. I'm out of ideas for this one. You can 
see my code changes at the github link and the issue I ran into here. Thanks. 

> Add saveAsTextFileByKey() method to RDDs
> 
>
> Key: SPARK-3533
> URL: https://issues.apache.org/jira/browse/SPARK-3533
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 1.1.0
>Reporter: Nicholas Chammas
>
> Users often have a single RDD of key-value pairs that they want to save to 
> multiple locations based on the keys.
> For example, say I have an RDD like this:
> {code}
> >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 
> >>> 'Frankie']).keyBy(lambda x: x[0])
> >>> a.collect()
> [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')]
> >>> a.keys().distinct().collect()
> ['B', 'F', 'N']
> {code}
> Now I want to write the RDD out to different paths depending on the keys, so 
> that I have one output directory per distinct key. Each output directory 
> could potentially have multiple {{part-}} files, one per RDD partition.
> So the output would look something like:
> {code}
> /path/prefix/B [/part-1, /part-2, etc]
> /path/prefix/F [/part-1, /part-2, etc]
> /path/prefix/N [/part-1, /part-2, etc]
> {code}
> Though it may be possible to do this with some combination of 
> {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the 
> {{MultipleTextOutputFormat}} output format class, it isn't straightforward. 
> It's not clear if it's even possible at all in PySpark.
> Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs 
> that makes it easy to save RDDs out to multiple locations at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup spilled shuffle files not included in shuffle write time

2015-02-26 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14339794#comment-14339794
 ] 

Ilya Ganelin commented on SPARK-5845:
-

I'm code complete on this, will submit a PR shortly.

> Time to cleanup spilled shuffle files not included in shuffle write time
> 
>
> Key: SPARK-5845
> URL: https://issues.apache.org/jira/browse/SPARK-5845
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Kay Ousterhout
>Assignee: Ilya Ganelin
>Priority: Minor
>
> When the disk is contended, I've observed cases when it takes as long as 7 
> seconds to clean up all of the intermediate spill files for a shuffle (when 
> using the sort based shuffle, but bypassing merging because there are <=200 
> shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
> written from one of the tasks where I observed this).  This is effectively 
> part of the shuffle write time (because it's a necessary side effect of 
> writing data to disk) so should be added to the shuffle write time to 
> facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs

2015-02-25 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14337104#comment-14337104
 ] 

Ilya Ganelin commented on SPARK-5750:
-

I'd be happy to pull those in. Is it fine to submit the PR against this issue?

> Document that ordering of elements in shuffled partitions is not 
> deterministic across runs
> --
>
> Key: SPARK-5750
> URL: https://issues.apache.org/jira/browse/SPARK-5750
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>
> The ordering of elements in shuffled partitions is not deterministic across 
> runs.  For instance, consider the following example:
> {code}
> val largeFiles = sc.textFile(...)
> val airlines = largeFiles.repartition(2000).cache()
> println(airlines.first)
> {code}
> If this code is run twice, then each run will output a different result.  
> There is non-determinism in the shuffle read code that accounts for this:
> Spark's shuffle read path processes blocks as soon as they are fetched  Spark 
> uses 
> [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala]
>  to fetch shuffle data from mappers.  In this code, requests for multiple 
> blocks from the same host are batched together, so nondeterminism in where 
> tasks are run means that the set of requests can vary across runs.  In 
> addition, there's an [explicit 
> call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256]
>  to randomize the order of the batched fetch requests.  As a result, shuffle 
> operations cannot be guaranteed to produce the same ordering of the elements 
> in their partitions.
> Therefore, Spark should update its docs to clarify that the ordering of 
> elements in shuffle RDDs' partitions is non-deterministic.  Note, however, 
> that the _set_ of elements in each partition will be deterministic: if we 
> used {{mapPartitions}} to sort each partition, then the {{first()}} call 
> above would produce a deterministic result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs

2015-02-25 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14336795#comment-14336795
 ] 

Ilya Ganelin commented on SPARK-5750:
-

Did you have a particular doc in mind to update? I feel like this sort of 
comment should go in the programming guide but there's not really a good spot 
for it. One glaring omission in the guide is a general writeup of the shuffle 
operation and the role that it plays internally. Understanding shuffles is key 
to writing stable Spark applications yet there isn't really any mention of it 
outside of the tech talks and presentations from the Spark folks. My suggestion 
would be to create a section providing an overview of shuffle, what parameters 
influence its behavior and stability, and then add this comment to that 
section. 

> Document that ordering of elements in shuffled partitions is not 
> deterministic across runs
> --
>
> Key: SPARK-5750
> URL: https://issues.apache.org/jira/browse/SPARK-5750
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>
> The ordering of elements in shuffled partitions is not deterministic across 
> runs.  For instance, consider the following example:
> {code}
> val largeFiles = sc.textFile(...)
> val airlines = largeFiles.repartition(2000).cache()
> println(airlines.first)
> {code}
> If this code is run twice, then each run will output a different result.  
> There is non-determinism in the shuffle read code that accounts for this:
> Spark's shuffle read path processes blocks as soon as they are fetched  Spark 
> uses 
> [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala]
>  to fetch shuffle data from mappers.  In this code, requests for multiple 
> blocks from the same host are batched together, so nondeterminism in where 
> tasks are run means that the set of requests can vary across runs.  In 
> addition, there's an [explicit 
> call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256]
>  to randomize the order of the batched fetch requests.  As a result, shuffle 
> operations cannot be guaranteed to produce the same ordering of the elements 
> in their partitions.
> Therefore, Spark should update its docs to clarify that the ordering of 
> elements in shuffle RDDs' partitions is non-deterministic.  Note, however, 
> that the _set_ of elements in each partition will be deterministic: if we 
> used {{mapPartitions}} to sort each partition, then the {{first()}} call 
> above would produce a deterministic result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup spilled shuffle files not included in shuffle write time

2015-02-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335751#comment-14335751
 ] 

Ilya Ganelin commented on SPARK-5845:
-

My mistake - missed your comment about the spill files in the detailed 
description. Given that we're interested in cleaning up the spill files which 
appear to be cleaned up in ExternalSorter.stop() (please correct me if I'm 
wrong), I would like to either 

a) Pass the context to the stop() method - this is possible since the 
SortShuffleWriter has visibility of the TaskContext (which in turn stores the 
metrics we're interested in).

b) (My preference since it won't break the existing interface) Surround 
sorter.stop() on line 91 of SortShuffleWriter.scala with a timer. The only 
downside to this second approach is that it will also include the cleanup of 
the partition writers. I'm not sure whether that should be included in this 
time computation. 

> Time to cleanup spilled shuffle files not included in shuffle write time
> 
>
> Key: SPARK-5845
> URL: https://issues.apache.org/jira/browse/SPARK-5845
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Kay Ousterhout
>Assignee: Ilya Ganelin
>Priority: Minor
>
> When the disk is contended, I've observed cases when it takes as long as 7 
> seconds to clean up all of the intermediate spill files for a shuffle (when 
> using the sort based shuffle, but bypassing merging because there are <=200 
> shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
> written from one of the tasks where I observed this).  This is effectively 
> part of the shuffle write time (because it's a necessary side effect of 
> writing data to disk) so should be added to the shuffle write time to 
> facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time

2015-02-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335646#comment-14335646
 ] 

Ilya Ganelin edited comment on SPARK-5845 at 2/24/15 11:19 PM:
---

If I understand correctly, the file cleanup happens in 
IndexShuffleBlockManager:::removeDataByMap(), which is called from either the 
SortShuffleManager or the SortShuffleWriter. The problem is that these classes 
do not have any knowledge of the currently collected metrics. Furthermore, the 
disk cleanup is, unless configured in the SparkConf, triggered asynchronously 
via the RemoveShuffle message so there doesn't appear to be a straightforward 
way to provide a set of metrics to be updated. 

Do you have any suggestions for getting around this? Please let me know, thank 
you. 


was (Author: ilganeli):
If I understand correctly, the file cleanup happens in 
IndexShuffleBlockManager:::removeDataByMap(), which is called from either the 
SortShuffleManager or the SortShuffleWriter. The problem is that these classes 
do not have any knowledge of the currently collected metrics. Furthermore, the 
disk cleanup is, unless configured in the SparkConf, triggered asynchronously 
via the RemoveShuffle() message so there doesn't appear to be a straightforward 
way to provide a set of metrics to be updated. 

Do you have any suggestions for getting around this? Please let me know, thank 
you. 

> Time to cleanup intermediate shuffle files not included in shuffle write time
> -
>
> Key: SPARK-5845
> URL: https://issues.apache.org/jira/browse/SPARK-5845
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Kay Ousterhout
>Assignee: Ilya Ganelin
>Priority: Minor
>
> When the disk is contended, I've observed cases when it takes as long as 7 
> seconds to clean up all of the intermediate spill files for a shuffle (when 
> using the sort based shuffle, but bypassing merging because there are <=200 
> shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
> written from one of the tasks where I observed this).  This is effectively 
> part of the shuffle write time (because it's a necessary side effect of 
> writing data to disk) so should be added to the shuffle write time to 
> facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time

2015-02-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335646#comment-14335646
 ] 

Ilya Ganelin commented on SPARK-5845:
-

If I understand correctly, the file cleanup happens in 
IndexShuffleBlockManager:::removeDataByMap(), which is called from either the 
SortShuffleManager or the SortShuffleWriter. The problem is that these classes 
do not have any knowledge of the currently collected metrics. Furthermore, the 
disk cleanup is, unless configured in the SparkConf, triggered asynchronously 
via the RemoveShuffle() message so there doesn't appear to be a straightforward 
way to provide a set of metrics to be updated. 

Do you have any suggestions for getting around this? Please let me know, thank 
you. 

> Time to cleanup intermediate shuffle files not included in shuffle write time
> -
>
> Key: SPARK-5845
> URL: https://issues.apache.org/jira/browse/SPARK-5845
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Kay Ousterhout
>Assignee: Ilya Ganelin
>Priority: Minor
>
> When the disk is contended, I've observed cases when it takes as long as 7 
> seconds to clean up all of the intermediate spill files for a shuffle (when 
> using the sort based shuffle, but bypassing merging because there are <=200 
> shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
> written from one of the tasks where I observed this).  This is effectively 
> part of the shuffle write time (because it's a necessary side effect of 
> writing data to disk) so should be added to the shuffle write time to 
> facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5079) Detect failed jobs / batches in Spark Streaming unit tests

2015-02-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333743#comment-14333743
 ] 

Ilya Ganelin commented on SPARK-5079:
-

Hi [~joshrosen] - I'm trying to wrap my head around the unit tests trying to 
find some specific tests where this is a problem as a baseline. If you could 
highlight a couple of examples as a starting point that would help a lot. 
Thanks!

> Detect failed jobs / batches in Spark Streaming unit tests
> --
>
> Key: SPARK-5079
> URL: https://issues.apache.org/jira/browse/SPARK-5079
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Josh Rosen
>Assignee: Ilya Ganelin
>
> Currently, it is possible to write Spark Streaming unit tests where Spark 
> jobs fail but the streaming tests succeed because we rely on wall-clock time 
> plus output comparision in order to check whether a test has passed, and 
> hence may miss cases where errors occurred if they didn't affect these 
> results.  We should strengthen the tests to check that no job failures 
> occurred while processing batches.
> See https://github.com/apache/spark/pull/3832#issuecomment-68580794 for 
> additional context.
> The StreamingTestWaiter in https://github.com/apache/spark/pull/3801 might 
> also fix this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time

2015-02-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333691#comment-14333691
 ] 

Ilya Ganelin commented on SPARK-5845:
-

Hi Kay - I can knock this one out. Thanks. 

> Time to cleanup intermediate shuffle files not included in shuffle write time
> -
>
> Key: SPARK-5845
> URL: https://issues.apache.org/jira/browse/SPARK-5845
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Kay Ousterhout
>Priority: Minor
>
> When the disk is contended, I've observed cases when it takes as long as 7 
> seconds to clean up all of the intermediate spill files for a shuffle (when 
> using the sort based shuffle, but bypassing merging because there are <=200 
> shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
> written from one of the tasks where I observed this).  This is effectively 
> part of the shuffle write time (because it's a necessary side effect of 
> writing data to disk) so should be added to the shuffle write time to 
> facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs

2015-02-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333685#comment-14333685
 ] 

Ilya Ganelin commented on SPARK-5750:
-

Hi Josh - I can knock this out. Thanks.

> Document that ordering of elements in shuffled partitions is not 
> deterministic across runs
> --
>
> Key: SPARK-5750
> URL: https://issues.apache.org/jira/browse/SPARK-5750
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>
> The ordering of elements in shuffled partitions is not deterministic across 
> runs.  For instance, consider the following example:
> {code}
> val largeFiles = sc.textFile(...)
> val airlines = largeFiles.repartition(2000).cache()
> println(airlines.first)
> {code}
> If this code is run twice, then each run will output a different result.  
> There is non-determinism in the shuffle read code that accounts for this:
> Spark's shuffle read path processes blocks as soon as they are fetched  Spark 
> uses 
> [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala]
>  to fetch shuffle data from mappers.  In this code, requests for multiple 
> blocks from the same host are batched together, so nondeterminism in where 
> tasks are run means that the set of requests can vary across runs.  In 
> addition, there's an [explicit 
> call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256]
>  to randomize the order of the batched fetch requests.  As a result, shuffle 
> operations cannot be guaranteed to produce the same ordering of the elements 
> in their partitions.
> Therefore, Spark should update its docs to clarify that the ordering of 
> elements in shuffle RDDs' partitions is non-deterministic.  Note, however, 
> that the _set_ of elements in each partition will be deterministic: if we 
> used {{mapPartitions}} to sort each partition, then the {{first()}} call 
> above would produce a deterministic result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 2:40 AM:
--

Edit:
Upon further consideration I think  what is really at play here is simply a 
need to explain closures in local vs. cluster modes. I'd like to add a section 
on this to the Spark programming guide and  then this could be referenced 
within the shorter description for foreach, map, mapPartitions, 
mapPartitionsWIthIndex, and flatMap or some other set of operators we care 
about.





was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue - local execution on 
the driver (in {{local}} mode) versus the division of labor between the driver 
and the executors (in {{cluster}} mode). Specifically, I'd like to discuss 
where the actual data is that the executors are operating on. This also becomes 
useful during performance tuning - for example using mapPartitions to avoid 
shuffle operations, since it ties in with data aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.


Edit:
Upon further consideration I've realized that the above doesn't quite address 
the spirit of the issue. I think what is really at play here is simply a need 
to explain closures in local vs. cluster modes.  


> Improve foreach() documentation to avoid confusion between local- and 
> cluster-mode behavior
> ---
>
> Key: SPARK-4423
> URL: https://issues.apache.org/jira/browse/SPARK-4423
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>Assignee: Ilya Ganelin
>
> {{foreach}} seems to be a common source of confusion for new users: in 
> {{local}} mode, {{foreach}} can be used to update local variables on the 
> driver, but programs that do this will not work properly when executed on 
> clusters, since the {{foreach}} will update per-executor variables (note that 
> this _will_ work correctly for accumulators, but not for other types of 
> mutable objects).
> Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
> print to the driver's standard output.
> At a minimum, we should improve the documentation to warn users against 
> unsafe uses of {{foreach}} that won't work properly when transitioning from 
> local mode to a real cluster.
> We might also consider changes to local mode so that its behavior more 
> closely matches the cluster modes; this will require some discussion, though, 
> since any change of behavior here would technically be a user-visible 
> backwards-incompatible change (I don't think that we made any explicit 
> guarantees about the current local-mode behavior, but someone might be 
> relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 2:39 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue - local execution on 
the driver (in {{local}} mode) versus the division of labor between the driver 
and the executors (in {{cluster}} mode). Specifically, I'd like to discuss 
where the actual data is that the executors are operating on. This also becomes 
useful during performance tuning - for example using mapPartitions to avoid 
shuffle operations, since it ties in with data aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.


Edit:
Upon further consideration I've realized that the above doesn't quite address 
the spirit of the issue. I think what is really at play here is simply a need 
to explain closures in local vs. cluster modes.  



was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue - local execution on 
the driver (in {{local}} mode) versus the division of labor between the driver 
and the executors (in {{cluster}} mode). Specifically, I'd like to discuss 
where the actual data is that the executors are operating on. This also becomes 
useful during performance tuning - for example using mapPartitions to avoid 
shuffle operations, since it ties in with data aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



> Improve foreach() documentation to avoid confusion between local- and 
> cluster-mode behavior
> ---
>
> Key: SPARK-4423
> URL: https://issues.apache.org/jira/browse/SPARK-4423
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>Assignee: Ilya Ganelin
>
> {{foreach}} seems to be a common source of confusion for new users: in 
> {{local}} mode, {{foreach}} can be used to update local variables on the 
> driver, but programs that do this will not work properly when executed on 
> clusters, since the {{foreach}} will update per-executor variables (note that 
> this _will_ work correctly for accumulators, but not for other types of 
> mutable objects).
> Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
> print to the driver's standard output.
> At a minimum, we should improve the documentation to warn users against 
> unsafe uses of {{foreach}} that won't work properly when transitioning from 
> local mode to a real cluster.
> We might also consider changes to local mode so that its behavior more 
> closely matches the cluster modes; this will require some discussion, though, 
> since any change of behavior here would technically be a user-visible 
> backwards-incompatible change (I don't think that we made any explicit 
> guarantees about the current local-mode behavior, but someone might be 
> relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:46 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue - local execution on 
the driver (in {{local}} mode) versus the division of labor between the driver 
and the executors (in {{cluster}} mode). Specifically, I'd like to discuss 
where the actual data is that the executors are operating on. This also becomes 
useful during performance tuning - for example using mapPartitions to avoid 
shuffle operations, since it ties in with data aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.




was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). Specifically, I'd 
like to discuss where the actual data is that the executors are operating on. 
This also becomes useful during performance tuning - for example using 
mapPartitions to avoid shuffle operations, since it ties in with data 
aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



> Improve foreach() documentation to avoid confusion between local- and 
> cluster-mode behavior
> ---
>
> Key: SPARK-4423
> URL: https://issues.apache.org/jira/browse/SPARK-4423
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>Assignee: Ilya Ganelin
>
> {{foreach}} seems to be a common source of confusion for new users: in 
> {{local}} mode, {{foreach}} can be used to update local variables on the 
> driver, but programs that do this will not work properly when executed on 
> clusters, since the {{foreach}} will update per-executor variables (note that 
> this _will_ work correctly for accumulators, but not for other types of 
> mutable objects).
> Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
> print to the driver's standard output.
> At a minimum, we should improve the documentation to warn users against 
> unsafe uses of {{foreach}} that won't work properly when transitioning from 
> local mode to a real cluster.
> We might also consider changes to local mode so that its behavior more 
> closely matches the cluster modes; this will require some discussion, though, 
> since any change of behavior here would technically be a user-visible 
> backwards-incompatible change (I don't think that we made any explicit 
> guarantees about the current local-mode behavior, but someone might be 
> relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:46 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). Specifically, I'd 
like to discuss where the actual data is that the executors are operating on. 
This also becomes useful during performance tuning - for example using 
mapPartitions to avoid shuffle operations, since it ties in with data 
aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.




was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. This also becomes useful during performance tuning (for example using 
mapPartitions to avoid shuffle operations). 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



> Improve foreach() documentation to avoid confusion between local- and 
> cluster-mode behavior
> ---
>
> Key: SPARK-4423
> URL: https://issues.apache.org/jira/browse/SPARK-4423
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>Assignee: Ilya Ganelin
>
> {{foreach}} seems to be a common source of confusion for new users: in 
> {{local}} mode, {{foreach}} can be used to update local variables on the 
> driver, but programs that do this will not work properly when executed on 
> clusters, since the {{foreach}} will update per-executor variables (note that 
> this _will_ work correctly for accumulators, but not for other types of 
> mutable objects).
> Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
> print to the driver's standard output.
> At a minimum, we should improve the documentation to warn users against 
> unsafe uses of {{foreach}} that won't work properly when transitioning from 
> local mode to a real cluster.
> We might also consider changes to local mode so that its behavior more 
> closely matches the cluster modes; this will require some discussion, though, 
> since any change of behavior here would technically be a user-visible 
> backwards-incompatible change (I don't think that we made any explicit 
> guarantees about the current local-mode behavior, but someone might be 
> relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:43 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. This also becomes useful during performance tuning (for example using 
mapPartitions to avoid shuffle operations). 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.




was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in ```local``` mode) versus the division of labor 
between the driver and the executors (in ```cluster``` mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



> Improve foreach() documentation to avoid confusion between local- and 
> cluster-mode behavior
> ---
>
> Key: SPARK-4423
> URL: https://issues.apache.org/jira/browse/SPARK-4423
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>Assignee: Ilya Ganelin
>
> {{foreach}} seems to be a common source of confusion for new users: in 
> {{local}} mode, {{foreach}} can be used to update local variables on the 
> driver, but programs that do this will not work properly when executed on 
> clusters, since the {{foreach}} will update per-executor variables (note that 
> this _will_ work correctly for accumulators, but not for other types of 
> mutable objects).
> Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
> print to the driver's standard output.
> At a minimum, we should improve the documentation to warn users against 
> unsafe uses of {{foreach}} that won't work properly when transitioning from 
> local mode to a real cluster.
> We might also consider changes to local mode so that its behavior more 
> closely matches the cluster modes; this will require some discussion, though, 
> since any change of behavior here would technically be a user-visible 
> backwards-incompatible change (I don't think that we made any explicit 
> guarantees about the current local-mode behavior, but someone might be 
> relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin commented on SPARK-4423:
-

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in ```local``` mode) versus the division of labor 
between the driver and the executors (in ```cluster``` mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



> Improve foreach() documentation to avoid confusion between local- and 
> cluster-mode behavior
> ---
>
> Key: SPARK-4423
> URL: https://issues.apache.org/jira/browse/SPARK-4423
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>Assignee: Ilya Ganelin
>
> {{foreach}} seems to be a common source of confusion for new users: in 
> {{local}} mode, {{foreach}} can be used to update local variables on the 
> driver, but programs that do this will not work properly when executed on 
> clusters, since the {{foreach}} will update per-executor variables (note that 
> this _will_ work correctly for accumulators, but not for other types of 
> mutable objects).
> Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
> print to the driver's standard output.
> At a minimum, we should improve the documentation to warn users against 
> unsafe uses of {{foreach}} that won't work properly when transitioning from 
> local mode to a real cluster.
> We might also consider changes to local mode so that its behavior more 
> closely matches the cluster modes; this will require some discussion, though, 
> since any change of behavior here would technically be a user-visible 
> backwards-incompatible change (I don't think that we made any explicit 
> guarantees about the current local-mode behavior, but someone might be 
> relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4655) Split Stage into ShuffleMapStage and ResultStage subclasses

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312458#comment-14312458
 ] 

Ilya Ganelin commented on SPARK-4655:
-

Hi [~joshrosen], I'd be happy to work on this. Thanks!

> Split Stage into ShuffleMapStage and ResultStage subclasses
> ---
>
> Key: SPARK-4655
> URL: https://issues.apache.org/jira/browse/SPARK-4655
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Josh Rosen
>Assignee: Josh Rosen
>
> The scheduler's {{Stage}} class has many fields which are only applicable to 
> result stages or shuffle map stages.  As a result, I think that it makes 
> sense to make {{Stage}} into an abstract base class with two subclasses, 
> {{ResultStage}} and {{ShuffleMapStage}}.  This would improve the 
> understandability of the DAGScheduler code. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312424#comment-14312424
 ] 

Ilya Ganelin commented on SPARK-4423:
-

I'll be happy to update this. Thank you.

> Improve foreach() documentation to avoid confusion between local- and 
> cluster-mode behavior
> ---
>
> Key: SPARK-4423
> URL: https://issues.apache.org/jira/browse/SPARK-4423
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Josh Rosen
>
> {{foreach}} seems to be a common source of confusion for new users: in 
> {{local}} mode, {{foreach}} can be used to update local variables on the 
> driver, but programs that do this will not work properly when executed on 
> clusters, since the {{foreach}} will update per-executor variables (note that 
> this _will_ work correctly for accumulators, but not for other types of 
> mutable objects).
> Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
> print to the driver's standard output.
> At a minimum, we should improve the documentation to warn users against 
> unsafe uses of {{foreach}} that won't work properly when transitioning from 
> local mode to a real cluster.
> We might also consider changes to local mode so that its behavior more 
> closely matches the cluster modes; this will require some discussion, though, 
> since any change of behavior here would technically be a user-visible 
> backwards-incompatible change (I don't think that we made any explicit 
> guarantees about the current local-mode behavior, but someone might be 
> relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5570) No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312416#comment-14312416
 ] 

Ilya Ganelin edited comment on SPARK-5570 at 2/9/15 4:27 PM:
-

I would be happy to fix this. Thank you. 


was (Author: ilganeli):
I'll fix this, can you please assign it to me? Thanks.

> No docs stating that `new SparkConf().set("spark.driver.memory", ...) will 
> not work
> ---
>
> Key: SPARK-5570
> URL: https://issues.apache.org/jira/browse/SPARK-5570
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Spark Core
>Affects Versions: 1.2.0
>Reporter: Tathagata Das
>Assignee: Andrew Or
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5570) No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312416#comment-14312416
 ] 

Ilya Ganelin commented on SPARK-5570:
-

I'll fix this, can you please assign it to me? Thanks.

> No docs stating that `new SparkConf().set("spark.driver.memory", ...) will 
> not work
> ---
>
> Key: SPARK-5570
> URL: https://issues.apache.org/jira/browse/SPARK-5570
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Spark Core
>Affects Versions: 1.2.0
>Reporter: Tathagata Das
>Assignee: Andrew Or
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5079) Detect failed jobs / batches in Spark Streaming unit tests

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312415#comment-14312415
 ] 

Ilya Ganelin commented on SPARK-5079:
-

I can work on this - can you please assign it to me? Thank you. 

> Detect failed jobs / batches in Spark Streaming unit tests
> --
>
> Key: SPARK-5079
> URL: https://issues.apache.org/jira/browse/SPARK-5079
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Josh Rosen
>
> Currently, it is possible to write Spark Streaming unit tests where Spark 
> jobs fail but the streaming tests succeed because we rely on wall-clock time 
> plus output comparision in order to check whether a test has passed, and 
> hence may miss cases where errors occurred if they didn't affect these 
> results.  We should strengthen the tests to check that no job failures 
> occurred while processing batches.
> See https://github.com/apache/spark/pull/3832#issuecomment-68580794 for 
> additional context.
> The StreamingTestWaiter in https://github.com/apache/spark/pull/3801 might 
> also fix this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-823) spark.default.parallelism's default is inconsistent across scheduler backends

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312382#comment-14312382
 ] 

Ilya Ganelin commented on SPARK-823:


Hi [~joshrosen] I believe the documentation is up to date and I reviewed all 
usages of spark.default.parallelism and found no inconsistencies with the 
documentation. The only thing that is un-documented with regards to the usage 
of spark.default.parallelism is how it's used within the Partitioner class in 
both Spark and Python. If defined, the default number of partitions created is 
equal to spark.default.parallelism - otherwise, it's the local number of 
partitions. I think this issue can be closed - I don't think that particular 
case needs to be publicly documented (it's clearly evident in the code what is 
going on). 

> spark.default.parallelism's default is inconsistent across scheduler backends
> -
>
> Key: SPARK-823
> URL: https://issues.apache.org/jira/browse/SPARK-823
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, PySpark, Scheduler
>Affects Versions: 0.8.0, 0.7.3, 0.9.1
>Reporter: Josh Rosen
>Priority: Minor
>
> The [0.7.3 configuration 
> guide|http://spark-project.org/docs/latest/configuration.html] says that 
> {{spark.default.parallelism}}'s default is 8, but the default is actually 
> max(totalCoreCount, 2) for the standalone scheduler backend, 8 for the Mesos 
> scheduler, and {{threads}} for the local scheduler:
> https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L157
> https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala#L317
> https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala#L150
> Should this be clarified in the documentation?  Should the Mesos scheduler 
> backend's default be revised?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2584) Do not mutate block storage level on the UI

2015-01-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273877#comment-14273877
 ] 

Ilya Ganelin edited comment on SPARK-2584 at 1/12/15 7:08 PM:
--

Understood, I am able to recreate this issue in 1.1. I'll work on a fix to 
clarify what's going on. Thank.



was (Author: ilganeli):
Understood, I was looking at the UI for Spark 1.1 and did not see the block 
storage level represented as MEMORY_AND_DISK or DISK_ONLY. It's now presented 
as Memory Deserialized or Disk Deserialized. I'll attempt to recreate this 
problem in the newer version of Spark but wanted to know if you've seen it 
since 1.0.1. 

> Do not mutate block storage level on the UI
> ---
>
> Key: SPARK-2584
> URL: https://issues.apache.org/jira/browse/SPARK-2584
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Web UI
>Affects Versions: 1.0.1
>Reporter: Andrew Or
>
> If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes 
> DISK_ONLY on the UI. We should preserve the original storage level  proposed 
> by the user, in addition to the change in actual storage level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2584) Do not mutate block storage level on the UI

2015-01-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273877#comment-14273877
 ] 

Ilya Ganelin commented on SPARK-2584:
-

Understood, I was looking at the UI for Spark 1.1 and did not see the block 
storage level represented as MEMORY_AND_DISK or DISK_ONLY. It's now presented 
as Memory Deserialized or Disk Deserialized. I'll attempt to recreate this 
problem in the newer version of Spark but wanted to know if you've seen it 
since 1.0.1. 

> Do not mutate block storage level on the UI
> ---
>
> Key: SPARK-2584
> URL: https://issues.apache.org/jira/browse/SPARK-2584
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Web UI
>Affects Versions: 1.0.1
>Reporter: Andrew Or
>
> If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes 
> DISK_ONLY on the UI. We should preserve the original storage level  proposed 
> by the user, in addition to the change in actual storage level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2584) Do not mutate block storage level on the UI

2015-01-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273762#comment-14273762
 ] 

Ilya Ganelin commented on SPARK-2584:
-

Hi Andrew, question about this. When you say "we drop it from memory" what 
mechanism are you talking about? It's illegal to change the persistence level 
of an already persisted RDD and if you call unpersist() it's dropped from both 
memory and disk storage. How would an RDD be "dropped" from memory? 

> Do not mutate block storage level on the UI
> ---
>
> Key: SPARK-2584
> URL: https://issues.apache.org/jira/browse/SPARK-2584
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Web UI
>Affects Versions: 1.0.1
>Reporter: Andrew Or
>
> If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes 
> DISK_ONLY on the UI. We should preserve the original storage level  proposed 
> by the user, in addition to the change in actual storage level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >