[jira] [Updated] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00
[ https://issues.apache.org/jira/browse/SPARK-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-13268: - Description: There is an issue with how timestamps are displayed/converted to Strings in Spark SQL. The documentation states that the timestamp should be created in the GMT time zone, however, if we do so, we see that the output actually contains a -8 hour offset: {code} new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli) res144: java.sql.Timestamp = 2014-12-31 16:00:00.0 new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli) res145: java.sql.Timestamp = 2015-01-01 00:00:00.0 {code} This result is confusing, unintuitive, and introduces issues when converting from DataFrames containing timestamps to RDDs which are then saved as text. This has the effect of essentially shifting all dates in a dataset by 1 day. The suggested fix for this is to update the timestamp toString representation to either a) Include timezone or b) Correctly display in GMT. This change may well introduce substantial and insidious bugs so I'm not sure how best to resolve this. was: There is an issue with how timestamps are displayed/converted to Strings in Spark SQL. The documentation states that the timestamp should be created in the GMT time zone, however, if we do so, we see that the output actually contains a -8 hour offset: {code} new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli) res144: java.sql.Timestamp = 2014-12-31 16:00:00.0 new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli) res145: java.sql.Timestamp = 2015-01-01 00:00:00.0 {code} This result is confusing, unintuitive, and introduces issues when converting from DataFrames containing timestamps to RDDs which are then saved as text. This has the effect of essentially shifting all dates in a dataset by 1 day. > SQL Timestamp stored as GMT but toString returns GMT-08:00 > -- > > Key: SPARK-13268 > URL: https://issues.apache.org/jira/browse/SPARK-13268 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Ilya Ganelin > > There is an issue with how timestamps are displayed/converted to Strings in > Spark SQL. The documentation states that the timestamp should be created in > the GMT time zone, however, if we do so, we see that the output actually > contains a -8 hour offset: > {code} > new > Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli) > res144: java.sql.Timestamp = 2014-12-31 16:00:00.0 > new > Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli) > res145: java.sql.Timestamp = 2015-01-01 00:00:00.0 > {code} > This result is confusing, unintuitive, and introduces issues when converting > from DataFrames containing timestamps to RDDs which are then saved as text. > This has the effect of essentially shifting all dates in a dataset by 1 day. > The suggested fix for this is to update the timestamp toString representation > to either a) Include timezone or b) Correctly display in GMT. > This change may well introduce substantial and insidious bugs so I'm not sure > how best to resolve this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00
[ https://issues.apache.org/jira/browse/SPARK-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-13268: - Description: There is an issue with how timestamps are displayed/converted to Strings in Spark SQL. The documentation states that the timestamp should be created in the GMT time zone, however, if we do so, we see that the output actually contains a -8 hour offset: {code} new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli) res144: java.sql.Timestamp = 2014-12-31 16:00:00.0 new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli) res145: java.sql.Timestamp = 2015-01-01 00:00:00.0 {code} This result is confusing, unintuitive, and introduces issues when converting from DataFrames containing timestamps to RDDs which are then saved as text. This has the effect of essentially shifting all dates in a dataset by 1 day. was: There is an issue with how timestamps are displayed/converted to Strings in Spark SQL. The documentation states that the timestamp should be created in the GMT time zone, however, if we do so, we see that the output actually contains a -8 hour offset: {{ new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli) res144: java.sql.Timestamp = 2014-12-31 16:00:00.0 new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli) res145: java.sql.Timestamp = 2015-01-01 00:00:00.0 }} This result is confusing, unintuitive, and introduces issues when converting from DataFrames containing timestamps to RDDs which are then saved as text. This has the effect of essentially shifting all dates in a dataset by 1 day. > SQL Timestamp stored as GMT but toString returns GMT-08:00 > -- > > Key: SPARK-13268 > URL: https://issues.apache.org/jira/browse/SPARK-13268 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Ilya Ganelin > > There is an issue with how timestamps are displayed/converted to Strings in > Spark SQL. The documentation states that the timestamp should be created in > the GMT time zone, however, if we do so, we see that the output actually > contains a -8 hour offset: > {code} > new > Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli) > res144: java.sql.Timestamp = 2014-12-31 16:00:00.0 > new > Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli) > res145: java.sql.Timestamp = 2015-01-01 00:00:00.0 > {code} > This result is confusing, unintuitive, and introduces issues when converting > from DataFrames containing timestamps to RDDs which are then saved as text. > This has the effect of essentially shifting all dates in a dataset by 1 day. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00
Ilya Ganelin created SPARK-13268: Summary: SQL Timestamp stored as GMT but toString returns GMT-08:00 Key: SPARK-13268 URL: https://issues.apache.org/jira/browse/SPARK-13268 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.0 Reporter: Ilya Ganelin There is an issue with how timestamps are displayed/converted to Strings in Spark SQL. The documentation states that the timestamp should be created in the GMT time zone, however, if we do so, we see that the output actually contains a -8 hour offset: {{ new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli) res144: java.sql.Timestamp = 2014-12-31 16:00:00.0 new Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli) res145: java.sql.Timestamp = 2015-01-01 00:00:00.0 }} This result is confusing, unintuitive, and introduces issues when converting from DataFrames containing timestamps to RDDs which are then saved as text. This has the effect of essentially shifting all dates in a dataset by 1 day. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs
[ https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15073236#comment-15073236 ] Ilya Ganelin commented on SPARK-12488: -- I'll submit a dataset that causes this when I have a moment. Thanks! Thank you, Ilya Ganelin > LDA describeTopics() Generates Invalid Term IDs > --- > > Key: SPARK-12488 > URL: https://issues.apache.org/jira/browse/SPARK-12488 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 >Reporter: Ilya Ganelin > > When running the LDA model, and using the describeTopics function, invalid > values appear in the termID list that is returned: > The below example generates 10 topics on a data set with a vocabulary of 685. > {code} > // Set LDA parameters > val numTopics = 10 > val lda = new LDA().setK(numTopics).setMaxIterations(10) > val ldaModel = lda.run(docTermVector) > val distModel = > ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted.reverse > res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, > 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, > 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, > 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, > 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, > 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, > 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, > 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, > 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, > 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, > 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted > res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, > -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, > -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, > -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, > -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, > -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, > -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, > -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, > -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, > 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, > 38, 39, 40, 41, 42, 43, 44, 45, 4... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs
[ https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068827#comment-15068827 ] Ilya Ganelin commented on SPARK-12488: -- Further investigation identifies the issue as stemming from the docTermVector containing zero-vectors (as in no words from the vocabulary present in the document). > LDA describeTopics() Generates Invalid Term IDs > --- > > Key: SPARK-12488 > URL: https://issues.apache.org/jira/browse/SPARK-12488 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 >Reporter: Ilya Ganelin > > When running the LDA model, and using the describeTopics function, invalid > values appear in the termID list that is returned: > The below example generates 10 topics on a data set with a vocabulary of 685. > {code} > // Set LDA parameters > val numTopics = 10 > val lda = new LDA().setK(numTopics).setMaxIterations(10) > val ldaModel = lda.run(docTermVector) > val distModel = > ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted.reverse > res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, > 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, > 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, > 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, > 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, > 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, > 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, > 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, > 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, > 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, > 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted > res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, > -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, > -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, > -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, > -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, > -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, > -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, > -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, > -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, > 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, > 38, 39, 40, 41, 42, 43, 44, 45, 4... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs
[ https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068761#comment-15068761 ] Ilya Ganelin edited comment on SPARK-12488 at 12/22/15 9:32 PM: [~josephkb] Would love your feedback here. Thanks! was (Author: ilganeli): @jkbradley Would love your feedback here. Thanks! > LDA describeTopics() Generates Invalid Term IDs > --- > > Key: SPARK-12488 > URL: https://issues.apache.org/jira/browse/SPARK-12488 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 >Reporter: Ilya Ganelin > > When running the LDA model, and using the describeTopics function, invalid > values appear in the termID list that is returned: > The below example generates 10 topics on a data set with a vocabulary of 685. > {code} > // Set LDA parameters > val numTopics = 10 > val lda = new LDA().setK(numTopics).setMaxIterations(10) > val ldaModel = lda.run(docTermVector) > val distModel = > ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted.reverse > res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, > 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, > 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, > 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, > 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, > 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, > 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, > 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, > 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, > 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, > 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted > res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, > -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, > -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, > -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, > -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, > -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, > -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, > -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, > -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, > 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, > 38, 39, 40, 41, 42, 43, 44, 45, 4... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs
[ https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068761#comment-15068761 ] Ilya Ganelin commented on SPARK-12488: -- @jkbradley Would love your feedback here. Thanks! > LDA describeTopics() Generates Invalid Term IDs > --- > > Key: SPARK-12488 > URL: https://issues.apache.org/jira/browse/SPARK-12488 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 >Reporter: Ilya Ganelin > > When running the LDA model, and using the describeTopics function, invalid > values appear in the termID list that is returned: > The below example generates 10 topics on a data set with a vocabulary of 685. > {code} > // Set LDA parameters > val numTopics = 10 > val lda = new LDA().setK(numTopics).setMaxIterations(10) > val ldaModel = lda.run(docTermVector) > val distModel = > ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted.reverse > res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, > 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, > 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, > 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, > 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, > 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, > 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, > 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, > 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, > 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, > 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted > res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, > -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, > -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, > -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, > -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, > -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, > -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, > -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, > -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, > 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, > 38, 39, 40, 41, 42, 43, 44, 45, 4... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs
[ https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-12488: - Description: When running the LDA model, and using the describeTopics function, invalid values appear in the termID list that is returned: The below example generates 10 topics on a data set with a vocabulary of 685. {code} // Set LDA parameters val numTopics = 10 val lda = new LDA().setK(numTopics).setMaxIterations(10) val ldaModel = lda.run(docTermVector) val distModel = ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] {code} {code} scala> ldaModel.describeTopics()(0)._1.sorted.reverse res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... {code} {code} scala> ldaModel.describeTopics()(0)._1.sorted res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 4... {code} was: When running the LDA model, and using the describeTopics function, invalid values appear in the termID list that is returned: The below example generated 10 topics on a data set with a vocabulary of 685. {code} // Set LDA parameters val numTopics = 10 val lda = new LDA().setK(numTopics).setMaxIterations(10) val ldaModel = lda.run(docTermVector) val distModel = ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] {code} {code} scala> ldaModel.describeTopics()(0)._1.sorted.reverse res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... {code} {code} scala> ldaModel.describeTopics()(0)._1.sorted res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 4... {code} > LDA describeTopics() Generates Invalid Term IDs > --- > > Key: SPARK-12488 > URL: https://issues.apache.org/jira/browse/SPARK-12488 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2
[jira] [Updated] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs
[ https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-12488: - Summary: LDA describeTopics() Generates Invalid Term IDs (was: LDA Describe Topics Generates Invalid Term IDs) > LDA describeTopics() Generates Invalid Term IDs > --- > > Key: SPARK-12488 > URL: https://issues.apache.org/jira/browse/SPARK-12488 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 >Reporter: Ilya Ganelin > > When running the LDA model, and using the describeTopics function, invalid > values appear in the termID list that is returned: > The below example generated 10 topics on a data set with a vocabulary of 685. > {code} > // Set LDA parameters > val numTopics = 10 > val lda = new LDA().setK(numTopics).setMaxIterations(10) > val ldaModel = lda.run(docTermVector) > val distModel = > ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted.reverse > res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, > 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, > 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, > 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, > 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, > 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, > 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, > 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, > 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, > 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, > 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted > res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, > -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, > -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, > -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, > -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, > -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, > -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, > -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, > -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, > 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, > 38, 39, 40, 41, 42, 43, 44, 45, 4... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12488) LDA Describe Topics Generates Invalid Term IDs
[ https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-12488: - Description: When running the LDA model, and using the describeTopics function, invalid values appear in the termID list that is returned: The below example generated 10 topics on a data set with a vocabulary of 685. {code} // Set LDA parameters val numTopics = 10 val lda = new LDA().setK(numTopics).setMaxIterations(10) val ldaModel = lda.run(docTermVector) val distModel = ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] {code} {code} scala> ldaModel.describeTopics()(0)._1.sorted.reverse res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... {code} {code} scala> ldaModel.describeTopics()(0)._1.sorted res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 4... {code} was: When running the LDA model, and using the describeTopics function, invalid values appear in the termID list that is returned: {code} // Set LDA parameters val numTopics = 10 val lda = new LDA().setK(numTopics).setMaxIterations(10) val ldaModel = lda.run(docTermVector) val distModel = ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] {code} > LDA Describe Topics Generates Invalid Term IDs > -- > > Key: SPARK-12488 > URL: https://issues.apache.org/jira/browse/SPARK-12488 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 1.5.2 >Reporter: Ilya Ganelin > > When running the LDA model, and using the describeTopics function, invalid > values appear in the termID list that is returned: > The below example generated 10 topics on a data set with a vocabulary of 685. > {code} > // Set LDA parameters > val numTopics = 10 > val lda = new LDA().setK(numTopics).setMaxIterations(10) > val ldaModel = lda.run(docTermVector) > val distModel = > ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted.reverse > res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, > 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, > 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, > 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, > 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, > 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, > 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, > 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, > 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, > 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, > 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53... > {code} > {code} > scala> ldaModel.describeTopics()(0)._1.sorted > res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, > -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, > -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, > -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, > -1397991169, -
[jira] [Created] (SPARK-12488) LDA Describe Topics Generates Invalid Term IDs
Ilya Ganelin created SPARK-12488: Summary: LDA Describe Topics Generates Invalid Term IDs Key: SPARK-12488 URL: https://issues.apache.org/jira/browse/SPARK-12488 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 1.5.2 Reporter: Ilya Ganelin When running the LDA model, and using the describeTopics function, invalid values appear in the termID list that is returned: {code} // Set LDA parameters val numTopics = 10 val lda = new LDA().setK(numTopics).setMaxIterations(10) val ldaModel = lda.run(docTermVector) val distModel = ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel] {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620 ] Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:57 PM: -- [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with {{key=1}}, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new {{outputWriter}} for each possible {{key}} 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by {{key}} (and persist this sorted set of {{InternalRow}} objects in memory). 4) One key at a time, create an {{outputWriter}} and write all rows associated with that key 5) Close outputWriter for that {{key}} and open a new {{outputWriter}}, continue from step 4. was (Author: ilganeli): [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of {{InternalRow}} objects in memory). 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620 ] Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:57 PM: -- [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of {{InternalRow}} objects in memory). 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. was (Author: ilganeli): [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of InternalRow objects in memory. 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620 ] Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:55 PM: -- [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of {InternalRow} objects in memory. 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. was (Author: ilganeli): [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of {code}InternalRow{code} objects in memory. 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620 ] Ilya Ganelin commented on SPARK-8890: - [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of {code}InternalRow{code} objects in memory. 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628620#comment-14628620 ] Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:55 PM: -- [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of InternalRow objects in memory. 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. was (Author: ilganeli): [~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete output buffers for keys that have been completely written? Thus, would we, for example, write all values associated with key=1, then close that output buffer, write the next one etc. Operational flow would become: 1) Attempt to create new outputWriter for each possible key 2) When maximum is exceeded, stop outputting rows. 3) Sort all remaining data by key (and persist this sorted set of {InternalRow} objects in memory. 4) One key at a time, create an outputWriter and write all rows associated with that key 5) Close outputWriter for that key and open a new outputWriter, continue from step 4. > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628533#comment-14628533 ] Ilya Ganelin commented on SPARK-8890: - Once data is sorted, is the number of partitions guaranteed to be under that limit? When we're talking about sorting, are we talking about which columns are in which partition? I want to make sure I understand what is happening. When we ingest a data frame, we consume a set of data organized by columns (the schema). When this data is partitioned, does all data under a certain column go to the same partition? If not, what happens in this stage? We create a new ```outputWriter``` for each row based on the columns within that row (from the projected columns). New ```outputWriters``` become necessary when the columns within a row are different. However, given that the schema is fixed, where does this variability come from and what does it mean to "sort" in this context? > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628533#comment-14628533 ] Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 6:46 PM: -- Once data is sorted, is the number of partitions guaranteed to be under that limit? When we're talking about sorting, are we talking about which columns are in which partition? I want to make sure I understand what is happening. When we ingest a data frame, we consume a set of data organized by columns (the schema). When this data is partitioned, does all data under a certain column go to the same partition? If not, what happens in this stage? We create a new outputWriter for each row based on the columns within that row (from the projected columns). New outputWriters become necessary when the columns within a row are different. However, given that the schema is fixed, where does this variability come from and what does it mean to "sort" in this context? was (Author: ilganeli): Once data is sorted, is the number of partitions guaranteed to be under that limit? When we're talking about sorting, are we talking about which columns are in which partition? I want to make sure I understand what is happening. When we ingest a data frame, we consume a set of data organized by columns (the schema). When this data is partitioned, does all data under a certain column go to the same partition? If not, what happens in this stage? We create a new ```outputWriter``` for each row based on the columns within that row (from the projected columns). New ```outputWriters``` become necessary when the columns within a row are different. However, given that the schema is fixed, where does this variability come from and what does it mean to "sort" in this context? > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert
[ https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14628488#comment-14628488 ] Ilya Ganelin commented on SPARK-8890: - [~rxin] I want to make sure I correctly understand your solution. Are you proposing that if the number of active partitions is beyond 50 we repartition the data into 50 partitions? I think we could approach this differently by creating a pool of OutputWriters (of size 50) and only create new OutputWriters once the previous partition has been written. This could be handled by blocking within the outputWriterForRow call when the new outputWriter is created. Does that seem reasonable? Please let me know, thanks! > Reduce memory consumption for dynamic partition insert > -- > > Key: SPARK-8890 > URL: https://issues.apache.org/jira/browse/SPARK-8890 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > Currently, InsertIntoHadoopFsRelation can run out of memory if the number of > table partitions is large. The problem is that we open one output writer for > each partition, and when data are randomized and when the number of > partitions is large, we open a large number of output writers, leading to OOM. > The solution here is to inject a sorting operation once the number of active > partitions is beyond a certain point (e.g. 50?) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8907) Speed up path construction in DynamicPartitionWriterContainer.outputWriterForRow
[ https://issues.apache.org/jira/browse/SPARK-8907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625150#comment-14625150 ] Ilya Ganelin commented on SPARK-8907: - [~rxin] The code for this in master has eliminated usage of zip and map as of [SPARK-8961|https://github.com/apache/spark/commit/33630883685eafcc3ee4521ea8363be342f6e6b4]. Do you think this can be further optimized and if so, how? There doesn't seem to be much within the existing catalyst expressions that would facilitate this, but I could be wrong. The relevant code fragment is below: {code} val partitionPath = { val partitionPathBuilder = new StringBuilder var i = 0 while (i < partitionColumns.length) { val col = partitionColumns(i) val partitionValueString = { val string = row.getString(i) if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string) } if (i > 0) { partitionPathBuilder.append(Path.SEPARATOR_CHAR) } partitionPathBuilder.append(s"$col=$partitionValueString") i += 1 } partitionPathBuilder.toString() } {code} > Speed up path construction in > DynamicPartitionWriterContainer.outputWriterForRow > > > Key: SPARK-8907 > URL: https://issues.apache.org/jira/browse/SPARK-8907 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin > > Don't use zip and scala collection methods to avoid garbage collection > {code} > val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, > rawValue) => > val string = if (rawValue == null) null else String.valueOf(rawValue) > val valueString = if (string == null || string.isEmpty) { > defaultPartitionName > } else { > PartitioningUtils.escapePathName(string) > } > s"/$col=$valueString" > }.mkString.stripPrefix(Path.SEPARATOR) > {code} > We can probably use catalyst expressions themselves to construct the path, > and then we can leverage code generation to do this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3153) shuffle will run out of space when disks have different free space
[ https://issues.apache.org/jira/browse/SPARK-3153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608993#comment-14608993 ] Ilya Ganelin commented on SPARK-3153: - cc [~davies] Believe this issue can be closed given duplication of SPARK-5418, no? > shuffle will run out of space when disks have different free space > -- > > Key: SPARK-3153 > URL: https://issues.apache.org/jira/browse/SPARK-3153 > Project: Spark > Issue Type: Bug > Components: Shuffle >Reporter: Davies Liu > > If we have several disks in SPARK_LOCAL_DIRS, and one of them is much smaller > than others (maybe added in my mistake, or special disk, SSD), them the > shuffle will meet the problem of run out of space with this smaller disk. > PySpark also has this issue during spilling. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8464) Consider separating aggregator and non-aggregator paths in ExternalSorter
[ https://issues.apache.org/jira/browse/SPARK-8464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608292#comment-14608292 ] Ilya Ganelin commented on SPARK-8464: - Josh - I'd be happy to look into this, I'll submit a PR shortly. > Consider separating aggregator and non-aggregator paths in ExternalSorter > - > > Key: SPARK-8464 > URL: https://issues.apache.org/jira/browse/SPARK-8464 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Reporter: Josh Rosen > > ExternalSorter is still really complicated and hard to understand. We should > investigate whether separating the aggregator and non-aggregator paths into > separate files would make the code easier to understand without introducing > significant duplication. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7996) Deprecate the developer api SparkEnv.actorSystem
[ https://issues.apache.org/jira/browse/SPARK-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579643#comment-14579643 ] Ilya Ganelin commented on SPARK-7996: - This seems to be mutually exclusive with https://issues.apache.org/jira/browse/SPARK-7997. Is the latter a placeholder? > Deprecate the developer api SparkEnv.actorSystem > > > Key: SPARK-7996 > URL: https://issues.apache.org/jira/browse/SPARK-7996 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Shixiong Zhu > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-7894) Graph Union Operator
[ https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283 ] Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:38 PM: - How is this functionality different from the existing {{union}} functions within {{VertexRDD}} and {{EdgeRDD}} ? was (Author: ilganeli): How is this functionality different from the existing `union` functions within `VertexRDD` and `EdgeRDD` ? > Graph Union Operator > > > Key: SPARK-7894 > URL: https://issues.apache.org/jira/browse/SPARK-7894 > Project: Spark > Issue Type: Sub-task > Components: GraphX >Reporter: Andy Huang > Labels: graph, union > Attachments: union_operator.png > > > This operator aims to union two graphs and generate a new graph directly. The > union of two graphs is the union of their vertex sets and their edge > families.Vertexes and edges which are included in either graph will be part > of the new graph. > bq. G ∪ H = (VG ∪ VH, EG ∪ EH). > The below image shows a union of graph G and graph H > !union_operator.png|width=600px,align=center! > A Simple interface would be: > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED] > However, inevitably vertexes and edges overlapping will happen between > borders of graphs. For vertex, it's quite nature to just make a union and > remove those duplicate ones. But for edges, a mergeEdges function seems to be > more reasonable. > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: > (ED, ED) => ED): Graph[VD, ED] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-7894) Graph Union Operator
[ https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283 ] Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:37 PM: - How is this functionality different from the existing `union` functions within `VertexRDD` and `EdgeRDD` ? was (Author: ilganeli): How is this functionality different from the existing {union} functions within {VertexRDD} and {EdgeRDD} ? > Graph Union Operator > > > Key: SPARK-7894 > URL: https://issues.apache.org/jira/browse/SPARK-7894 > Project: Spark > Issue Type: Sub-task > Components: GraphX >Reporter: Andy Huang > Labels: graph, union > Attachments: union_operator.png > > > This operator aims to union two graphs and generate a new graph directly. The > union of two graphs is the union of their vertex sets and their edge > families.Vertexes and edges which are included in either graph will be part > of the new graph. > bq. G ∪ H = (VG ∪ VH, EG ∪ EH). > The below image shows a union of graph G and graph H > !union_operator.png|width=600px,align=center! > A Simple interface would be: > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED] > However, inevitably vertexes and edges overlapping will happen between > borders of graphs. For vertex, it's quite nature to just make a union and > remove those duplicate ones. But for edges, a mergeEdges function seems to be > more reasonable. > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: > (ED, ED) => ED): Graph[VD, ED] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-7894) Graph Union Operator
[ https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283 ] Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:35 PM: - How is this functionality different from the existing {union} functions within {VertexRDD} and {EdgeRDD} ? was (Author: ilganeli): How is this functionality different from the existing {code}union{code} functions within {code}VertexRDD{code} and {code}EdgeRDD{code} ? > Graph Union Operator > > > Key: SPARK-7894 > URL: https://issues.apache.org/jira/browse/SPARK-7894 > Project: Spark > Issue Type: Sub-task > Components: GraphX >Reporter: Andy Huang > Labels: graph, union > Attachments: union_operator.png > > > This operator aims to union two graphs and generate a new graph directly. The > union of two graphs is the union of their vertex sets and their edge > families.Vertexes and edges which are included in either graph will be part > of the new graph. > bq. G ∪ H = (VG ∪ VH, EG ∪ EH). > The below image shows a union of graph G and graph H > !union_operator.png|width=600px,align=center! > A Simple interface would be: > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED] > However, inevitably vertexes and edges overlapping will happen between > borders of graphs. For vertex, it's quite nature to just make a union and > remove those duplicate ones. But for edges, a mergeEdges function seems to be > more reasonable. > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: > (ED, ED) => ED): Graph[VD, ED] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-7894) Graph Union Operator
[ https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283 ] Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:35 PM: - How is this functionality different from the existing {code}union{code} functions within {code}VertexRDD{code} and {code}EdgeRDD{code} ? was (Author: ilganeli): How is this functionality different from the existing ```union``` functions within ```VertexRDD``` and ```EdgeRDD``` ? > Graph Union Operator > > > Key: SPARK-7894 > URL: https://issues.apache.org/jira/browse/SPARK-7894 > Project: Spark > Issue Type: Sub-task > Components: GraphX >Reporter: Andy Huang > Labels: graph, union > Attachments: union_operator.png > > > This operator aims to union two graphs and generate a new graph directly. The > union of two graphs is the union of their vertex sets and their edge > families.Vertexes and edges which are included in either graph will be part > of the new graph. > bq. G ∪ H = (VG ∪ VH, EG ∪ EH). > The below image shows a union of graph G and graph H > !union_operator.png|width=600px,align=center! > A Simple interface would be: > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED] > However, inevitably vertexes and edges overlapping will happen between > borders of graphs. For vertex, it's quite nature to just make a union and > remove those duplicate ones. But for edges, a mergeEdges function seems to be > more reasonable. > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: > (ED, ED) => ED): Graph[VD, ED] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7894) Graph Union Operator
[ https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579283#comment-14579283 ] Ilya Ganelin commented on SPARK-7894: - How is this functionality different from the existing ```union``` functions within ```VertexRDD``` and ```EdgeRDD``` ? > Graph Union Operator > > > Key: SPARK-7894 > URL: https://issues.apache.org/jira/browse/SPARK-7894 > Project: Spark > Issue Type: Sub-task > Components: GraphX >Reporter: Andy Huang > Labels: graph, union > Attachments: union_operator.png > > > This operator aims to union two graphs and generate a new graph directly. The > union of two graphs is the union of their vertex sets and their edge > families.Vertexes and edges which are included in either graph will be part > of the new graph. > bq. G ∪ H = (VG ∪ VH, EG ∪ EH). > The below image shows a union of graph G and graph H > !union_operator.png|width=600px,align=center! > A Simple interface would be: > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED] > However, inevitably vertexes and edges overlapping will happen between > borders of graphs. For vertex, it's quite nature to just make a union and > remove those duplicate ones. But for edges, a mergeEdges function seems to be > more reasonable. > bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: > (ED, ED) => ED): Graph[VD, ED] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python
[ https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14574847#comment-14574847 ] Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 5:18 PM: - [~rxin] Sounds good :). Where would you suggest adding a test for StructType creation? Not sure where it quite fits in the grand scheme of things. With regards to also supporting a string for simple types, I think it's safer to enforce usage of DataType since I think the intent is for the SQL schema to be strictly typed. Were you suggesting that we allow passing "int" or "long" as the type argument or for us to infer it automatically by parsing the string? That approach seems a little more dangerous. was (Author: ilganeli): [~rxin] Sounds good :). Where would you suggest adding a test for StructType creation? Not sure where it quite fits in the grand scheme of things. With regards to also supporting a string for simple types, I think it's safer to enforce usage of DataType since the SQL schema should be strictly typed. Were you suggesting that we allow passing "int" or "long" as the type argument or for us to infer it automatically by parsing the string? That approach seems a little more dangerous. > Design an easier way to construct schema for both Scala and Python > -- > > Key: SPARK-8056 > URL: https://issues.apache.org/jira/browse/SPARK-8056 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin > > StructType is fairly hard to construct, especially in Python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python
[ https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14574847#comment-14574847 ] Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 5:17 PM: - [~rxin] Sounds good :). Where would you suggest adding a test for StructType creation? Not sure where it quite fits in the grand scheme of things. With regards to also supporting a string for simple types, I think it's safer to enforce usage of DataType since the SQL schema should be strictly typed. Were you suggesting that we allow passing "int" or "long" as the type argument or for us to infer it automatically by parsing the string? That approach seems a little more dangerous. was (Author: ilganeli): [~rxin] Sounds good :). Where would you suggest adding a test for StructType creation? Not sure where it quite fits in the grand scheme of things. > Design an easier way to construct schema for both Scala and Python > -- > > Key: SPARK-8056 > URL: https://issues.apache.org/jira/browse/SPARK-8056 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin > > StructType is fairly hard to construct, especially in Python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8056) Design an easier way to construct schema for both Scala and Python
[ https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14574847#comment-14574847 ] Ilya Ganelin commented on SPARK-8056: - [~rxin] Sounds good :). Where would you suggest adding a test for StructType creation? Not sure where it quite fits in the grand scheme of things. > Design an easier way to construct schema for both Scala and Python > -- > > Key: SPARK-8056 > URL: https://issues.apache.org/jira/browse/SPARK-8056 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin > > StructType is fairly hard to construct, especially in Python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python
[ https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573874#comment-14573874 ] Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 12:35 AM: -- [~rxin] Are you actively working on this? I think this could be readily solved by providing an interface to construct StructType the way we construct SparkConf, e.g. new StructType().add("f1","v1).add("f1","v2") etc was (Author: ilganeli): [~rxin] Are you actively working on this? I think this could be readily solved by providing interface to construct StructType the way we construct SparkConf, e.g. new StructType().add("f1","v1).add("f1","v2") etc > Design an easier way to construct schema for both Scala and Python > -- > > Key: SPARK-8056 > URL: https://issues.apache.org/jira/browse/SPARK-8056 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > > StructType is fairly hard to construct, especially in Python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8056) Design an easier way to construct schema for both Scala and Python
[ https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573874#comment-14573874 ] Ilya Ganelin commented on SPARK-8056: - [~rxin] Are you actively working on this? I think this could be readily solved by providing interface to construct StructType the way we construct SparkConf, e.g. new StructType().add("f1","v1).add("f1","v2") etc > Design an easier way to construct schema for both Scala and Python > -- > > Key: SPARK-8056 > URL: https://issues.apache.org/jira/browse/SPARK-8056 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > > StructType is fairly hard to construct, especially in Python. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility
[ https://issues.apache.org/jira/browse/SPARK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin closed SPARK-6746. --- Resolution: Won't Fix > Refactor large functions in DAGScheduler to improve readibility > --- > > Key: SPARK-6746 > URL: https://issues.apache.org/jira/browse/SPARK-6746 > Project: Spark > Issue Type: Sub-task > Components: Scheduler >Reporter: Ilya Ganelin > > The DAGScheduler class contains two huge functions that make it > very hard to understand what's going on in the code. These are: > 1) The monolithic handleTaskCompletion > 2) The cleanupStateForJobAndIndependentStages function > These can be simply modularized to eliminate some awkward type casting and > improve code readability. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7075) Project Tungsten: Improving Physical Execution and Memory Management
[ https://issues.apache.org/jira/browse/SPARK-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521944#comment-14521944 ] Ilya Ganelin commented on SPARK-7075: - This looks like the result of a large internal Databricks effort - are there pieces of this where you could use external help or is this issue in place primarily to document migration of internal code? > Project Tungsten: Improving Physical Execution and Memory Management > > > Key: SPARK-7075 > URL: https://issues.apache.org/jira/browse/SPARK-7075 > Project: Spark > Issue Type: Epic > Components: Block Manager, Shuffle, Spark Core, SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > > Based on our observation, majority of Spark workloads are not bottlenecked by > I/O or network, but rather CPU and memory. This project focuses on 3 areas to > improve the efficiency of memory and CPU for Spark applications, to push > performance closer to the limits of the underlying hardware. > 1. Memory Management and Binary Processing: leveraging application semantics > to manage memory explicitly and eliminate the overhead of JVM object model > and garbage collection > 2. Cache-aware computation: algorithms and data structures to exploit memory > hierarchy > 3. Code generation: using code generation to exploit modern compilers and CPUs > Several parts of project Tungsten leverage the DataFrame model, which gives > us more semantics about the application. We will also retrofit the > improvements onto Spark’s RDD API whenever possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4514) SparkContext localProperties does not inherit property updates across thread reuse
[ https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511537#comment-14511537 ] Ilya Ganelin edited comment on SPARK-4514 at 4/24/15 7:37 PM: -- [~joshrosen] - given your work on SPARK-6629, is this still relevant? I saw that there was a comment there stating that issue may not be a problem. I can knock this one out if it's still necessary. was (Author: ilganeli): [~joshrosen] - given your work on SPARK-6629 is this still relevant - I saw that there was a comment there stating that issue may not be a problem? I can knock this one out if it's still necessary. > SparkContext localProperties does not inherit property updates across thread > reuse > -- > > Key: SPARK-4514 > URL: https://issues.apache.org/jira/browse/SPARK-4514 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0, 1.1.1, 1.2.0 >Reporter: Erik Erlandson >Assignee: Josh Rosen >Priority: Critical > > The current job group id of a Spark context is stored in the > {{localProperties}} member value. This data structure is designed to be > thread local, and its settings are not preserved when {{ComplexFutureAction}} > instantiates a new {{Future}}. > One consequence of this is that {{takeAsync()}} does not behave in the same > way as other async actions, e.g. {{countAsync()}}. For example, this test > (if copied into StatusTrackerSuite.scala), will fail, because > {{"my-job-group2"}} is not propagated to the Future which actually > instantiates the job: > {code:java} > test("getJobIdsForGroup() with takeAsync()") { > sc = new SparkContext("local", "test", new SparkConf(false)) > sc.setJobGroup("my-job-group2", "description") > sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) > val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) > val firstJobId = eventually(timeout(10 seconds)) { > firstJobFuture.jobIds.head > } > eventually(timeout(10 seconds)) { > sc.statusTracker.getJobIdsForGroup("my-job-group2") should be > (Seq(firstJobId)) > } > } > {code} > It also impacts current PR for SPARK-1021, which involves additional uses of > {{ComplexFutureAction}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4514) SparkContext localProperties does not inherit property updates across thread reuse
[ https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511537#comment-14511537 ] Ilya Ganelin commented on SPARK-4514: - [~joshrosen] - given your work on SPARK-6629 is this still relevant - I saw that there was a comment there stating that issue may not be a problem? I can knock this one out if it's still necessary. > SparkContext localProperties does not inherit property updates across thread > reuse > -- > > Key: SPARK-4514 > URL: https://issues.apache.org/jira/browse/SPARK-4514 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0, 1.1.1, 1.2.0 >Reporter: Erik Erlandson >Assignee: Josh Rosen >Priority: Critical > > The current job group id of a Spark context is stored in the > {{localProperties}} member value. This data structure is designed to be > thread local, and its settings are not preserved when {{ComplexFutureAction}} > instantiates a new {{Future}}. > One consequence of this is that {{takeAsync()}} does not behave in the same > way as other async actions, e.g. {{countAsync()}}. For example, this test > (if copied into StatusTrackerSuite.scala), will fail, because > {{"my-job-group2"}} is not propagated to the Future which actually > instantiates the job: > {code:java} > test("getJobIdsForGroup() with takeAsync()") { > sc = new SparkContext("local", "test", new SparkConf(false)) > sc.setJobGroup("my-job-group2", "description") > sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) > val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) > val firstJobId = eventually(timeout(10 seconds)) { > firstJobFuture.jobIds.head > } > eventually(timeout(10 seconds)) { > sc.statusTracker.getJobIdsForGroup("my-job-group2") should be > (Seq(firstJobId)) > } > } > {code} > It also impacts current PR for SPARK-1021, which involves additional uses of > {{ComplexFutureAction}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1021) sortByKey() launches a cluster job when it shouldn't
[ https://issues.apache.org/jira/browse/SPARK-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511230#comment-14511230 ] Ilya Ganelin commented on SPARK-1021: - I'd be happy to look into this and https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4514 . > sortByKey() launches a cluster job when it shouldn't > > > Key: SPARK-1021 > URL: https://issues.apache.org/jira/browse/SPARK-1021 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 0.8.0, 0.9.0, 1.0.0, 1.1.0 >Reporter: Andrew Ash >Assignee: Erik Erlandson > Labels: starter > > The sortByKey() method is listed as a transformation, not an action, in the > documentation. But it launches a cluster job regardless. > http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html > Some discussion on the mailing list suggested that this is a problem with the > rdd.count() call inside Partitioner.scala's rangeBounds method. > https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L102 > Josh Rosen suggests that rangeBounds should be made into a lazy variable: > {quote} > I wonder whether making RangePartitoner .rangeBounds into a lazy val would > fix this > (https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95). > We'd need to make sure that rangeBounds() is never called before an action > is performed. This could be tricky because it's called in the > RangePartitioner.equals() method. Maybe it's sufficient to just compare the > number of partitions, the ids of the RDDs used to create the > RangePartitioner, and the sort ordering. This still supports the case where > I range-partition one RDD and pass the same partitioner to a different RDD. > It breaks support for the case where two range partitioners created on > different RDDs happened to have the same rangeBounds(), but it seems unlikely > that this would really harm performance since it's probably unlikely that the > range partitioners are equal by chance. > {quote} > Can we please make this happen? I'll send a PR on GitHub to start the > discussion and testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException
[ https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510154#comment-14510154 ] Ilya Ganelin commented on SPARK-5945: - So to recap: a) Move failure count tracking into Stage b) Reset failure count on Stage success, so even if that stage is re-submitted due to failures downstream, we never hit the cap c) Remove config parameter. > Spark should not retry a stage infinitely on a FetchFailedException > --- > > Key: SPARK-5945 > URL: https://issues.apache.org/jira/browse/SPARK-5945 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Imran Rashid >Assignee: Ilya Ganelin > > While investigating SPARK-5928, I noticed some very strange behavior in the > way spark retries stages after a FetchFailedException. It seems that on a > FetchFailedException, instead of simply killing the task and retrying, Spark > aborts the stage and retries. If it just retried the task, the task might > fail 4 times and then trigger the usual job killing mechanism. But by > killing the stage instead, the max retry logic is skipped (it looks to me > like there is no limit for retries on a stage). > After a bit of discussion with Kay Ousterhout, it seems the idea is that if a > fetch fails, we assume that the block manager we are fetching from has > failed, and that it will succeed if we retry the stage w/out that block > manager. In that case, it wouldn't make any sense to retry the task, since > its doomed to fail every time, so we might as well kill the whole stage. But > this raises two questions: > 1) Is it really safe to assume that a FetchFailedException means that the > BlockManager has failed, and ti will work if we just try another one? > SPARK-5928 shows that there are at least some cases where that assumption is > wrong. Even if we fix that case, this logic seems brittle to the next case > we find. I guess the idea is that this behavior is what gives us the "R" in > RDD ... but it seems like its not really that robust and maybe should be > reconsidered. > 2) Should stages only be retried a limited number of times? It would be > pretty easy to put in a limited number of retries per stage. Though again, > we encounter issues with keeping things resilient. Theoretically one stage > could have many retries, but due to failures in different stages further > downstream, so we might need to track the cause of each retry as well to > still have the desired behavior. > In general it just seems there is some flakiness in the retry logic. This is > the only reproducible example I have at the moment, but I vaguely recall > hitting other cases of strange behavior w/ retries when trying to run long > pipelines. Eg., if one executor is stuck in a GC during a fetch, the fetch > fails, but the executor eventually comes back and the stage gets retried > again, but the same GC issues happen the second time around, etc. > Copied from SPARK-5928, here's the example program that can regularly produce > a loop of stage failures. Note that it will only fail from a remote fetch, > so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell > --num-executors 2 --executor-memory 4000m}} > {code} > val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => > val n = 3e3.toInt > val arr = new Array[Byte](n) > //need to make sure the array doesn't compress to something small > scala.util.Random.nextBytes(arr) > arr > } > rdd.map { x => (1, x)}.groupByKey().count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException
[ https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14508019#comment-14508019 ] Ilya Ganelin commented on SPARK-5945: - [~kayousterhout] - thanks for the review. If I understand correctly, your suggestion would still address [~imranr]'s second comment since the first stage would always (or mostly succeed), e.g. it wouldn't have N consecutive failures so even if subsequent stages fail, those wouldn't count towards the failure count for this particular stage since it would have been reset when it succeeded. Do you have any thoughts on the first comment? Specifically, is retrying a stage likely to succeed at all or is it a waste of effort in the first place? > Spark should not retry a stage infinitely on a FetchFailedException > --- > > Key: SPARK-5945 > URL: https://issues.apache.org/jira/browse/SPARK-5945 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Imran Rashid >Assignee: Ilya Ganelin > > While investigating SPARK-5928, I noticed some very strange behavior in the > way spark retries stages after a FetchFailedException. It seems that on a > FetchFailedException, instead of simply killing the task and retrying, Spark > aborts the stage and retries. If it just retried the task, the task might > fail 4 times and then trigger the usual job killing mechanism. But by > killing the stage instead, the max retry logic is skipped (it looks to me > like there is no limit for retries on a stage). > After a bit of discussion with Kay Ousterhout, it seems the idea is that if a > fetch fails, we assume that the block manager we are fetching from has > failed, and that it will succeed if we retry the stage w/out that block > manager. In that case, it wouldn't make any sense to retry the task, since > its doomed to fail every time, so we might as well kill the whole stage. But > this raises two questions: > 1) Is it really safe to assume that a FetchFailedException means that the > BlockManager has failed, and ti will work if we just try another one? > SPARK-5928 shows that there are at least some cases where that assumption is > wrong. Even if we fix that case, this logic seems brittle to the next case > we find. I guess the idea is that this behavior is what gives us the "R" in > RDD ... but it seems like its not really that robust and maybe should be > reconsidered. > 2) Should stages only be retried a limited number of times? It would be > pretty easy to put in a limited number of retries per stage. Though again, > we encounter issues with keeping things resilient. Theoretically one stage > could have many retries, but due to failures in different stages further > downstream, so we might need to track the cause of each retry as well to > still have the desired behavior. > In general it just seems there is some flakiness in the retry logic. This is > the only reproducible example I have at the moment, but I vaguely recall > hitting other cases of strange behavior w/ retries when trying to run long > pipelines. Eg., if one executor is stuck in a GC during a fetch, the fetch > fails, but the executor eventually comes back and the stage gets retried > again, but the same GC issues happen the second time around, etc. > Copied from SPARK-5928, here's the example program that can regularly produce > a loop of stage failures. Note that it will only fail from a remote fetch, > so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell > --num-executors 2 --executor-memory 4000m}} > {code} > val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => > val n = 3e3.toInt > val arr = new Array[Byte](n) > //need to make sure the array doesn't compress to something small > scala.util.Random.nextBytes(arr) > arr > } > rdd.map { x => (1, x)}.groupByKey().count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6891) ExecutorAllocationManager will request negative number executors
[ https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507886#comment-14507886 ] Ilya Ganelin commented on SPARK-6891: - [~meiyoula] I'm running Spark 1.3 (from the released builds) and I tried your code in the spark shell on yarn as spark-shell --master. I'm able to run it without issue, I can successfully run multiple calls to runSparkPi. Were you seeing this issue when running the trunk? > ExecutorAllocationManager will request negative number executors > > > Key: SPARK-6891 > URL: https://issues.apache.org/jira/browse/SPARK-6891 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: meiyoula >Priority: Critical > Attachments: DynamicExecutorTest.scala > > > Below is the exception: >15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread > spark-dynamic-executor-allocation-0 > java.lang.IllegalArgumentException: Attempted to request a negative > number of executor(s) -1 from the cluster manager. Please specify a positive > number! > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342) > at > org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170) > at > org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) > at > org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) > at > org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723) > at > org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) > at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > Below is the configurations I setted: >spark.dynamicAllocation.enabled true >spark.dynamicAllocation.minExecutors 0 >spark.dynamicAllocation.initialExecutors3 >spark.dynamicAllocation.maxExecutors7 >spark.dynamicAllocation.executorIdleTimeout 30 >spark.shuffle.service.enabled true -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6891) ExecutorAllocationManager will request negative number executors
[ https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507886#comment-14507886 ] Ilya Ganelin edited comment on SPARK-6891 at 4/22/15 8:59 PM: -- [~meiyoula] I'm running Spark 1.3 (from the released builds) and I tried your code in the spark shell on yarn as spark-shell --master yarn. I'm able to run it without issue, I can successfully run multiple calls to runSparkPi. Were you seeing this issue when running the trunk? was (Author: ilganeli): [~meiyoula] I'm running Spark 1.3 (from the released builds) and I tried your code in the spark shell on yarn as spark-shell --master. I'm able to run it without issue, I can successfully run multiple calls to runSparkPi. Were you seeing this issue when running the trunk? > ExecutorAllocationManager will request negative number executors > > > Key: SPARK-6891 > URL: https://issues.apache.org/jira/browse/SPARK-6891 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: meiyoula >Priority: Critical > Attachments: DynamicExecutorTest.scala > > > Below is the exception: >15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread > spark-dynamic-executor-allocation-0 > java.lang.IllegalArgumentException: Attempted to request a negative > number of executor(s) -1 from the cluster manager. Please specify a positive > number! > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342) > at > org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170) > at > org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) > at > org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) > at > org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723) > at > org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) > at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > Below is the configurations I setted: >spark.dynamicAllocation.enabled true >spark.dynamicAllocation.minExecutors 0 >spark.dynamicAllocation.initialExecutors3 >spark.dynamicAllocation.maxExecutors7 >spark.dynamicAllocation.executorIdleTimeout 30 >spark.shuffle.service.enabled true -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6891) ExecutorAllocationManager will request negative number executors
[ https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14502322#comment-14502322 ] Ilya Ganelin commented on SPARK-6891: - [~meiyoula] Any hints on reproducing this aside from your configuration? E.g. simple test code to execute? > ExecutorAllocationManager will request negative number executors > > > Key: SPARK-6891 > URL: https://issues.apache.org/jira/browse/SPARK-6891 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: meiyoula >Priority: Critical > > Below is the exception: >15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread > spark-dynamic-executor-allocation-0 > java.lang.IllegalArgumentException: Attempted to request a negative > number of executor(s) -1 from the cluster manager. Please specify a positive > number! > at > org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342) > at > org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170) > at > org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) > at > org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) > at > org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) > at > org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723) > at > org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) > at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > Below is the configurations I setted: >spark.dynamicAllocation.enabled true >spark.dynamicAllocation.minExecutors 0 >spark.dynamicAllocation.initialExecutors3 >spark.dynamicAllocation.maxExecutors7 >spark.dynamicAllocation.executorIdleTimeout 30 >spark.shuffle.service.enabled true -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6932) A Prototype of Parameter Server
[ https://issues.apache.org/jira/browse/SPARK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6932: Labels: (was: kjhghbg) > A Prototype of Parameter Server > --- > > Key: SPARK-6932 > URL: https://issues.apache.org/jira/browse/SPARK-6932 > Project: Spark > Issue Type: New Feature > Components: ML, MLlib, Spark Core >Reporter: Qiping Li > > h2. Introduction > As specified in > [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590],it would be > very helpful to integrate parameter server into Spark for machine learning > algorithms, especially for those with ultra high dimensions features. > After carefully studying the design doc of [Parameter > Servers|https://docs.google.com/document/d/1SX3nkmF41wFXAAIr9BgqvrHSS5mW362fJ7roBXJm06o/edit?usp=sharing],and > the paper of [Factorbird|http://stanford.edu/~rezab/papers/factorbird.pdf], > we proposed a prototype of Parameter Server on Spark(Ps-on-Spark), with > several key design concerns: > * *User friendly interface* > Careful investigation is done to most existing Parameter Server > systems(including: [petuum|http://petuum.github.io], [parameter > server|http://parameterserver.org], > [paracel|https://github.com/douban/paracel]) and a user friendly interface is > design by absorbing essence from all these system. > * *Prototype of distributed array* > IndexRDD (see > [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590]) doesn't seem > to be a good option for distributed array, because in most case, the #key > updates/second is not be very high. > So we implement a distributed HashMap to store the parameters, which can > be easily extended to get better performance. > > * *Minimal code change* > Quite a lot of effort in done to avoid code change of Spark core. Tasks > which need parameter server are still created and scheduled by Spark's > scheduler. Tasks communicate with parameter server with a client object, > through *akka* or *netty*. > With all these concerns we propose the following architecture: > h2. Architecture > !https://cloud.githubusercontent.com/assets/1285855/7158179/f2d25cc4-e3a9-11e4-835e-89681596c478.jpg! > Data is stored in RDD and is partitioned across workers. During each > iteration, each worker gets parameters from parameter server then computes > new parameters based on old parameters and data in the partition. Finally > each worker updates parameters to parameter server.Worker communicates with > parameter server through a parameter server client,which is initialized in > `TaskContext` of this worker. > The current implementation is based on YARN cluster mode, > but it should not be a problem to transplanted it to other modes. > h3. Interface > We refer to existing parameter server systems(petuum, parameter server, > paracel) when design the interface of parameter server. > *`PSClient` provides the following interface for workers to use:* > {code} > // get parameter indexed by key from parameter server > def get[T](key: String): T > // get multiple parameters from parameter server > def multiGet[T](keys: Array[String]): Array[T] > // add parameter indexed by `key` by `delta`, > // if multiple `delta` to update on the same parameter, > // use `reduceFunc` to reduce these `delta`s frist. > def update[T](key: String, delta: T, reduceFunc: (T, T) => T): Unit > // update multiple parameters at the same time, use the same `reduceFunc`. > def multiUpdate(keys: Array[String], delta: Array[T], reduceFunc: (T, T) => > T: Unit > > // advance clock to indicate that current iteration is finished. > def clock(): Unit > > // block until all workers have reached this line of code. > def sync(): Unit > {code} > *`PSContext` provides following functions to use on driver:* > {code} > // load parameters from existing rdd. > def loadPSModel[T](model: RDD[String, T]) > // fetch parameters from parameter server to construct model. > def fetchPSModel[T](keys: Array[String]): Array[T] > {code} > > *A new function has been add to `RDD` to run parameter server tasks:* > {code} > // run the provided `func` on each partition of this RDD. > // This function can use data of this partition(the first argument) > // and a parameter server client(the second argument). > // See the following Logistic Regression for an example. > def runWithPS[U: ClassTag](func: (Array[T], PSClient) => U): Array[U] > > {code} > h2. Example > Here is an example of using our prototype to implement logistic regression: > {code:title=LogisticRegression.scala|borderStyle=solid} > def train( > sc: SparkContext, > input: RDD[LabeledPoint], > numIterations: Int, > stepSize: Double, > miniBatchFraction: Double): LogisticRegressionModel = { > > // initialize weights >
[jira] [Updated] (SPARK-6932) A Prototype of Parameter Server
[ https://issues.apache.org/jira/browse/SPARK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6932: Description: h2. Introduction As specified in [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590],it would be very helpful to integrate parameter server into Spark for machine learning algorithms, especially for those with ultra high dimensions features. After carefully studying the design doc of [Parameter Servers|https://docs.google.com/document/d/1SX3nkmF41wFXAAIr9BgqvrHSS5mW362fJ7roBXJm06o/edit?usp=sharing],and the paper of [Factorbird|http://stanford.edu/~rezab/papers/factorbird.pdf], we proposed a prototype of Parameter Server on Spark(Ps-on-Spark), with several key design concerns: * *User friendly interface* Careful investigation is done to most existing Parameter Server systems(including: [petuum|http://petuum.github.io], [parameter server|http://parameterserver.org], [paracel|https://github.com/douban/paracel]) and a user friendly interface is design by absorbing essence from all these system. * *Prototype of distributed array* IndexRDD (see [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590]) doesn't seem to be a good option for distributed array, because in most case, the #key updates/second is not be very high. So we implement a distributed HashMap to store the parameters, which can be easily extended to get better performance. * *Minimal code change* Quite a lot of effort in done to avoid code change of Spark core. Tasks which need parameter server are still created and scheduled by Spark's scheduler. Tasks communicate with parameter server with a client object, through *akka* or *netty*. With all these concerns we propose the following architecture: h2. Architecture !https://cloud.githubusercontent.com/assets/1285855/7158179/f2d25cc4-e3a9-11e4-835e-89681596c478.jpg! Data is stored in RDD and is partitioned across workers. During each iteration, each worker gets parameters from parameter server then computes new parameters based on old parameters and data in the partition. Finally each worker updates parameters to parameter server.Worker communicates with parameter server through a parameter server client,which is initialized in `TaskContext` of this worker. The current implementation is based on YARN cluster mode, but it should not be a problem to transplanted it to other modes. h3. Interface We refer to existing parameter server systems(petuum, parameter server, paracel) when design the interface of parameter server. *`PSClient` provides the following interface for workers to use:* {code} // get parameter indexed by key from parameter server def get[T](key: String): T // get multiple parameters from parameter server def multiGet[T](keys: Array[String]): Array[T] // add parameter indexed by `key` by `delta`, // if multiple `delta` to update on the same parameter, // use `reduceFunc` to reduce these `delta`s frist. def update[T](key: String, delta: T, reduceFunc: (T, T) => T): Unit // update multiple parameters at the same time, use the same `reduceFunc`. def multiUpdate(keys: Array[String], delta: Array[T], reduceFunc: (T, T) => T: Unit // advance clock to indicate that current iteration is finished. def clock(): Unit // block until all workers have reached this line of code. def sync(): Unit {code} *`PSContext` provides following functions to use on driver:* {code} // load parameters from existing rdd. def loadPSModel[T](model: RDD[String, T]) // fetch parameters from parameter server to construct model. def fetchPSModel[T](keys: Array[String]): Array[T] {code} *A new function has been add to `RDD` to run parameter server tasks:* {code} // run the provided `func` on each partition of this RDD. // This function can use data of this partition(the first argument) // and a parameter server client(the second argument). // See the following Logistic Regression for an example. def runWithPS[U: ClassTag](func: (Array[T], PSClient) => U): Array[U] {code} h2. Example Here is an example of using our prototype to implement logistic regression: {code:title=LogisticRegression.scala|borderStyle=solid} def train( sc: SparkContext, input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double): LogisticRegressionModel = { // initialize weights val numFeatures = input.map(_.features.size).first() val initialWeights = new Array[Double](numFeatures) // initialize parameter server context val pssc = new PSContext(sc) // load initialized weights into parameter server val initialModelRDD = sc.parallelize(Array(("w", initialWeights)), 1) pssc.loadPSModel(initialModelRDD) // run logistic regression algorithm on input data input.runWithPS((arr, client) => { val sampler = new Bernoull
[jira] [Comment Edited] (SPARK-6703) Provide a way to discover existing SparkContext's
[ https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492909#comment-14492909 ] Ilya Ganelin edited comment on SPARK-6703 at 4/13/15 11:15 PM: --- Patrick - what¹s the time line for the 1.4 release? Just want to have a sense for it so I can schedule accordingly. was (Author: ilganeli): Patrick - what¹s the time line for the 1.4 release? Just want to have a sense for it so I can schedule accordingly. Thank you, Ilya Ganelin The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. > Provide a way to discover existing SparkContext's > - > > Key: SPARK-6703 > URL: https://issues.apache.org/jira/browse/SPARK-6703 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Patrick Wendell >Assignee: Ilya Ganelin >Priority: Critical > > Right now it is difficult to write a Spark application in a way that can be > run independently and also be composed with other Spark applications in an > environment such as the JobServer, notebook servers, etc where there is a > shared SparkContext. > It would be nice to provide a rendez-vous point so that applications can > learn whether an existing SparkContext already exists before creating one. > The most simple/surgical way I see to do this is to have an optional static > SparkContext singleton that people can be retrieved as follows: > {code} > val sc = SparkContext.getOrCreate(conf = new SparkConf()) > {code} > And you could also have a setter where some outer framework/server can set it > for use by multiple downstream applications. > A more advanced version of this would have some named registry or something, > but since we only support a single SparkContext in one JVM at this point > anyways, this seems sufficient and much simpler. Another advanced option > would be to allow plugging in some other notion of configuration you'd pass > when retrieving an existing context. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's
[ https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492909#comment-14492909 ] Ilya Ganelin commented on SPARK-6703: - Patrick - what¹s the time line for the 1.4 release? Just want to have a sense for it so I can schedule accordingly. Thank you, Ilya Ganelin The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. > Provide a way to discover existing SparkContext's > - > > Key: SPARK-6703 > URL: https://issues.apache.org/jira/browse/SPARK-6703 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Patrick Wendell >Assignee: Ilya Ganelin >Priority: Critical > > Right now it is difficult to write a Spark application in a way that can be > run independently and also be composed with other Spark applications in an > environment such as the JobServer, notebook servers, etc where there is a > shared SparkContext. > It would be nice to provide a rendez-vous point so that applications can > learn whether an existing SparkContext already exists before creating one. > The most simple/surgical way I see to do this is to have an optional static > SparkContext singleton that people can be retrieved as follows: > {code} > val sc = SparkContext.getOrCreate(conf = new SparkConf()) > {code} > And you could also have a setter where some outer framework/server can set it > for use by multiple downstream applications. > A more advanced version of this would have some named registry or something, > but since we only support a single SparkContext in one JVM at this point > anyways, this seems sufficient and much simpler. Another advanced option > would be to allow plugging in some other notion of configuration you'd pass > when retrieving an existing context. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's
[ https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491877#comment-14491877 ] Ilya Ganelin commented on SPARK-6703: - Patrick - I can look into this. Thank you. > Provide a way to discover existing SparkContext's > - > > Key: SPARK-6703 > URL: https://issues.apache.org/jira/browse/SPARK-6703 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Patrick Wendell > > Right now it is difficult to write a Spark application in a way that can be > run independently and also be composed with other Spark applications in an > environment such as the JobServer, notebook servers, etc where there is a > shared SparkContext. > It would be nice to provide a rendez-vous point so that applications can > learn whether an existing SparkContext already exists before creating one. > The most simple/surgical way I see to do this is to have an optional static > SparkContext singleton that people can be retrieved as follows: > {code} > val sc = SparkContext.getOrCreate(conf = new SparkConf()) > {code} > And you could also have a setter where some outer framework/server can set it > for use by multiple downstream applications. > A more advanced version of this would have some named registry or something, > but since we only support a single SparkContext in one JVM at this point > anyways, this seems sufficient and much simpler. Another advanced option > would be to allow plugging in some other notion of configuration you'd pass > when retrieving an existing context. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions
[ https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490576#comment-14490576 ] Ilya Ganelin edited comment on SPARK-6839 at 4/11/15 12:09 AM: --- The obvious solution won't work. Adding a {{TaskContext}} to {{dataSerialize()}} won't work because it's called from within both {{MemoryStore}} and {{TachyonStore}} which are instantiated within the {{BlockManager}} constructor. The {{TaskContext}} also can't be created within the constructor for {{BlockManager}} since that's created within the {{SparkEnv}} constructor which has no tasks associated with it. The only workable solution that I can see is to assign a {{TaskContext}} to the {{BlockManager}} at run-time but that sounds very sketchy to me since the block manager is a singleton and we may have multiple tasks going at once. Any thoughts on this conundrum? was (Author: ilganeli): The obvious solution won't work. Adding a {code}TaskContext{code} to {code}dataSerialize(){code} won't work because it's called from within both {code}MemoryStore{code} and {code}TachyonStore{code} which are instantiated within the {code}BlockManager{code} constructor. The {code}TaskContext{code} also can't be created within the constructor for {code}BlockManager{code} since that's created within the {code}SparkEnv{code} constructor which has no tasks associated with it. The only workable solution that I can see is to assign a {code}TaskContext{code} to the {code}BlockManager{code} at run-time but that sounds very sketchy to me since the block manager is a singleton and we may have multiple tasks going at once. Any thoughts on this conundrum? > BlockManager.dataDeserialize leaks resources on user exceptions > --- > > Key: SPARK-6839 > URL: https://issues.apache.org/jira/browse/SPARK-6839 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Imran Rashid > > From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized > that > [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202] > doesn't guarantee the underlying InputStream is properly closed. In > particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time > there is an exception in user code. > The problem is that right now, we convert the input streams to iterators, and > only close the input stream if the end of the iterator is reached. But, we > might never reach the end of the iterator -- the obvious case is if there is > a bug in the user code, so tasks fail part of the way through the iterator. > I think the solution is to give {{BlockManager.dataDeserialize}} a > {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do > the cleanup (as is done in {{ShuffleBlockFetcherIterator}}). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions
[ https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490576#comment-14490576 ] Ilya Ganelin edited comment on SPARK-6839 at 4/11/15 12:07 AM: --- The obvious solution won't work. Adding a {code}TaskContext{code} to {code}dataSerialize(){code} won't work because it's called from within both {code}MemoryStore{code} and {code}TachyonStore{code} which are instantiated within the {code}BlockManager{code} constructor. The {code}TaskContext{code} also can't be created within the constructor for {code}BlockManager{code} since that's created within the {code}SparkEnv{code} constructor which has no tasks associated with it. The only workable solution that I can see is to assign a {code}TaskContext{code} to the {code}BlockManager{code} at run-time but that sounds very sketchy to me since the block manager is a singleton and we may have multiple tasks going at once. Any thoughts on this conundrum? was (Author: ilganeli): The obvious solution won't work. Adding a ```TaskContext``` to ```dataSerialize()``` won't work because it's called from within both ```MemoryStore``` and ```TachyonStore``` which are instantiated within the ```BlockManager``` constructor. The ```TaskContext``` also can't be created within the constructor for ```BlockManager``` since that's created within the ```SparkEnv``` constructor which has no tasks associated with it. The only workable solution that I can see is to assign a ```TaskContext``` to the ```BlockManager``` at run-time but that sounds very sketchy to me since the block manager is a singleton and we may have multiple tasks going at once. Any thoughts on this conundrum? > BlockManager.dataDeserialize leaks resources on user exceptions > --- > > Key: SPARK-6839 > URL: https://issues.apache.org/jira/browse/SPARK-6839 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Imran Rashid > > From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized > that > [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202] > doesn't guarantee the underlying InputStream is properly closed. In > particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time > there is an exception in user code. > The problem is that right now, we convert the input streams to iterators, and > only close the input stream if the end of the iterator is reached. But, we > might never reach the end of the iterator -- the obvious case is if there is > a bug in the user code, so tasks fail part of the way through the iterator. > I think the solution is to give {{BlockManager.dataDeserialize}} a > {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do > the cleanup (as is done in {{ShuffleBlockFetcherIterator}}). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions
[ https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490576#comment-14490576 ] Ilya Ganelin commented on SPARK-6839: - The obvious solution won't work. Adding a ```TaskContext``` to ```dataSerialize()``` won't work because it's called from within both ```MemoryStore``` and ```TachyonStore``` which are instantiated within the ```BlockManager``` constructor. The ```TaskContext``` also can't be created within the constructor for ```BlockManager``` since that's created within the ```SparkEnv``` constructor which has no tasks associated with it. The only workable solution that I can see is to assign a ```TaskContext``` to the ```BlockManager``` at run-time but that sounds very sketchy to me since the block manager is a singleton and we may have multiple tasks going at once. Any thoughts on this conundrum? > BlockManager.dataDeserialize leaks resources on user exceptions > --- > > Key: SPARK-6839 > URL: https://issues.apache.org/jira/browse/SPARK-6839 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Imran Rashid > > From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized > that > [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202] > doesn't guarantee the underlying InputStream is properly closed. In > particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time > there is an exception in user code. > The problem is that right now, we convert the input streams to iterators, and > only close the input stream if the end of the iterator is reached. But, we > might never reach the end of the iterator -- the obvious case is if there is > a bug in the user code, so tasks fail part of the way through the iterator. > I think the solution is to give {{BlockManager.dataDeserialize}} a > {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do > the cleanup (as is done in {{ShuffleBlockFetcherIterator}}). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions
[ https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490545#comment-14490545 ] Ilya Ganelin commented on SPARK-6839: - Imran - I can knock this out. Thanks! > BlockManager.dataDeserialize leaks resources on user exceptions > --- > > Key: SPARK-6839 > URL: https://issues.apache.org/jira/browse/SPARK-6839 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Imran Rashid > > From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized > that > [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202] > doesn't guarantee the underlying InputStream is properly closed. In > particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time > there is an exception in user code. > The problem is that right now, we convert the input streams to iterators, and > only close the input stream if the end of the iterator is reached. But, we > might never reach the end of the iterator -- the obvious case is if there is > a bug in the user code, so tasks fail part of the way through the iterator. > I think the solution is to give {{BlockManager.dataDeserialize}} a > {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do > the cleanup (as is done in {{ShuffleBlockFetcherIterator}}). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark
[ https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485586#comment-14485586 ] Ilya Ganelin commented on SPARK-6780: - Matching test code: {code} test("saveAsHadoopFileByKey should generate a text file per key") { val testPairs : JavaRDD[Array[Byte]] = sc.parallelize( Seq( Array(1.toByte,1.toByte), Array(2.toByte,4.toByte), Array(3.toByte,9.toByte), Array(4.toByte,16.toByte), Array(5.toByte,25.toByte)) ).toJavaRDD() val fs = FileSystem.get(new Configuration()) val basePath = sc.conf.get("spark.local.dir", "/tmp") val fullPath = basePath + "/testPath" fs.delete(new Path(fullPath), true) PythonRDD.saveAsHadoopFileByKey( testPairs, false, fullPath, classOf[RDDMultipleTextOutputFormat].toString, classOf[Int].toString, classOf[Int].toString, null, null, new java.util.HashMap(), "") // Test that a file was created for each key (1 to 5).foreach(key => { val testPath = new Path(fullPath + "/" + key) assert(fs.exists(testPath)) // Read the file and test that the contents are the values matching that key split by line val input = fs.open(testPath) val reader = new BufferedReader(new InputStreamReader(input)) val values = new HashSet[Int] val lines = Stream.continually(reader.readLine()).takeWhile(_ != null) lines.foreach(s => values += s.toInt) assert(values.contains(key*key)) }) fs.delete(new Path(fullPath), true) } {code} > Add saveAsTextFileByKey method for PySpark > -- > > Key: SPARK-6780 > URL: https://issues.apache.org/jira/browse/SPARK-6780 > Project: Spark > Issue Type: Improvement > Components: PySpark >Reporter: Ilya Ganelin > > The PySpark API should have a method to allow saving a key-value RDD to > subdirectories organized by key as in : > https://issues.apache.org/jira/browse/SPARK-3533 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark
[ https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485582#comment-14485582 ] Ilya Ganelin commented on SPARK-6780: - This code was my attempt to implement this within PythonRDD.scala but I ran into run-time reflection issues I could not solve. {code} /** * Output a Python RDD of key-value pairs to any Hadoop file system such that the values within * the rdd are written to sub-directories organized by the associated key. * * Keys and values are converted to suitable output types using either user specified converters * or, if not specified, [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion * types `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of * this RDD. */ def saveAsHadoopFileByKey[K, V, C <: CompressionCodec]( pyRDD: JavaRDD[Array[Byte]], batchSerialized: Boolean, path: String, outputFormatClass: String, keyClass: String, valueClass: String, keyConverterClass: String, valueConverterClass: String, confAsMap: java.util.HashMap[String, String], compressionCodecClass: String) = { val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized) val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse( inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass)) val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration) val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]]) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, new JavaToWritableConverter) converted.saveAsHadoopFile(path, ClassUtils.primitiveToWrapper(kc), ClassUtils.primitiveToWrapper(vc), classOf[RDDMultipleTextOutputFormat[K,V]], new JobConf(mergedConf), codec=codec) } {code} > Add saveAsTextFileByKey method for PySpark > -- > > Key: SPARK-6780 > URL: https://issues.apache.org/jira/browse/SPARK-6780 > Project: Spark > Issue Type: Improvement > Components: PySpark >Reporter: Ilya Ganelin > > The PySpark API should have a method to allow saving a key-value RDD to > subdirectories organized by key as in : > https://issues.apache.org/jira/browse/SPARK-3533 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark
[ https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485577#comment-14485577 ] Ilya Ganelin commented on SPARK-6780: - SPARK-3533 defines matching methods for Scala and Java APIs. > Add saveAsTextFileByKey method for PySpark > -- > > Key: SPARK-6780 > URL: https://issues.apache.org/jira/browse/SPARK-6780 > Project: Spark > Issue Type: Improvement > Components: PySpark >Reporter: Ilya Ganelin > > The PySpark API should have a method to allow saving a key-value RDD to > subdirectories organized by key as in : > https://issues.apache.org/jira/browse/SPARK-3533 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6780) Add saveAsTextFileByKey method for PySpark
Ilya Ganelin created SPARK-6780: --- Summary: Add saveAsTextFileByKey method for PySpark Key: SPARK-6780 URL: https://issues.apache.org/jira/browse/SPARK-6780 Project: Spark Issue Type: Improvement Components: PySpark Reporter: Ilya Ganelin The PySpark API should have a method to allow saving a key-value RDD to subdirectories organized by key as in : https://issues.apache.org/jira/browse/SPARK-3533 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility
[ https://issues.apache.org/jira/browse/SPARK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14483561#comment-14483561 ] Ilya Ganelin commented on SPARK-6746: - SPARK-5945 requires updating the logic for handling fetch failures. This will introduce merge conflicts with this patch so this one should be merged first since it cleans up the DAG Scheduler code to help understand what's going on. > Refactor large functions in DAGScheduler to improve readibility > --- > > Key: SPARK-6746 > URL: https://issues.apache.org/jira/browse/SPARK-6746 > Project: Spark > Issue Type: Sub-task > Components: Scheduler >Reporter: Ilya Ganelin > > The DAGScheduler class contains two huge functions that make it > very hard to understand what's going on in the code. These are: > 1) The monolithic handleTaskCompletion > 2) The cleanupStateForJobAndIndependentStages function > These can be simply modularized to eliminate some awkward type casting and > improve code readability. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility
Ilya Ganelin created SPARK-6746: --- Summary: Refactor large functions in DAGScheduler to improve readibility Key: SPARK-6746 URL: https://issues.apache.org/jira/browse/SPARK-6746 Project: Spark Issue Type: Sub-task Reporter: Ilya Ganelin The DAGScheduler class contains two huge functions that make it very hard to understand what's going on in the code. These are: 1) The monolithic handleTaskCompletion 2) The cleanupStateForJobAndIndependentStages function These can be simply modularized to eliminate some awkward type casting and improve code readability. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.
[ https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6616: Description: There are numerous instances throughout the code base of the following: {code} if (!stopped) { stopped = true ... } {code} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {code}SparkContext.stop() {code}. A cursory examination reveals this in {code}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop() {code}. was: There are numerous instances throughout the code base of the following: {code} if (!stopped) { stopped = true ... } {code} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop() {{code}}. A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop() {{code}}. > IsStopped set to true in before stop() is complete. > --- > > Key: SPARK-6616 > URL: https://issues.apache.org/jira/browse/SPARK-6616 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Ilya Ganelin > > There are numerous instances throughout the code base of the following: > {code} > if (!stopped) { > stopped = true > ... > } > {code} > In general, this is bad practice since it can cause an incomplete cleanup if > there is an error during shutdown and not all code executes. Incomplete > cleanup is harder to track down than a double cleanup that triggers some > error. I propose fixing this throughout the code, starting with the cleanup > sequence with {code}SparkContext.stop() {code}. > A cursory examination reveals this in {code}SparkContext.stop(), > SparkEnv.stop(), and ContextCleaner.stop() {code}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.
[ https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6616: Description: There are numerous instances throughout the code base of the following: {code} if (!stopped) { stopped = true ... } {code} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {code}SparkContext.stop() {code} A cursory examination reveals this in {code}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop() {code} was: There are numerous instances throughout the code base of the following: {code} if (!stopped) { stopped = true ... } {code} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {code}SparkContext.stop() {code}. A cursory examination reveals this in {code}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop() {code}. > IsStopped set to true in before stop() is complete. > --- > > Key: SPARK-6616 > URL: https://issues.apache.org/jira/browse/SPARK-6616 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Ilya Ganelin > > There are numerous instances throughout the code base of the following: > {code} > if (!stopped) { > stopped = true > ... > } > {code} > In general, this is bad practice since it can cause an incomplete cleanup if > there is an error during shutdown and not all code executes. Incomplete > cleanup is harder to track down than a double cleanup that triggers some > error. I propose fixing this throughout the code, starting with the cleanup > sequence with {code}SparkContext.stop() {code} > A cursory examination reveals this in {code}SparkContext.stop(), > SparkEnv.stop(), and ContextCleaner.stop() {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.
[ https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6616: Description: There are numerous instances throughout the code base of the following: {code} if (!stopped) { stopped = true ... } {code} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop() {{code}}. A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop() {{code}}. was: There are numerous instances throughout the code base of the following: {{code}} if (!stopped) { stopped = true ... } {{code}} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop() {{code}}. A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop() {{code}}. > IsStopped set to true in before stop() is complete. > --- > > Key: SPARK-6616 > URL: https://issues.apache.org/jira/browse/SPARK-6616 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Ilya Ganelin > > There are numerous instances throughout the code base of the following: > {code} > if (!stopped) { > stopped = true > ... > } > {code} > In general, this is bad practice since it can cause an incomplete cleanup if > there is an error during shutdown and not all code executes. Incomplete > cleanup is harder to track down than a double cleanup that triggers some > error. I propose fixing this throughout the code, starting with the cleanup > sequence with {{code}}SparkContext.stop() {{code}}. > A cursory examination reveals this in {{code}}SparkContext.stop(), > SparkEnv.stop(), and ContextCleaner.stop() {{code}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.
[ https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6616: Description: There are numerous instances throughout the code base of the following: {{code}} if (!stopped) { stopped = true ... } {{code}} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop() {{code}}. A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop() {{code}}. was: There are numerous instances throughout the code base of the following: {{code}} if (!stopped) { stopped = true ... } {{code}} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop().{{code}} A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop().{{code}} > IsStopped set to true in before stop() is complete. > --- > > Key: SPARK-6616 > URL: https://issues.apache.org/jira/browse/SPARK-6616 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Ilya Ganelin > > There are numerous instances throughout the code base of the following: > {{code}} > if (!stopped) { > stopped = true > ... > } > {{code}} > In general, this is bad practice since it can cause an incomplete cleanup if > there is an error during shutdown and not all code executes. Incomplete > cleanup is harder to track down than a double cleanup that triggers some > error. I propose fixing this throughout the code, starting with the cleanup > sequence with {{code}}SparkContext.stop() {{code}}. > A cursory examination reveals this in {{code}}SparkContext.stop(), > SparkEnv.stop(), and ContextCleaner.stop() {{code}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.
[ https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6616: Description: There are numerous instances throughout the code base of the following: {{code}} if (!stopped) { stopped = true ... } {{code}} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop().{{code}} A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop().{{code}} was: There are numerous instances throughout the code base of the following: {{code}} if (!stopped) { stopped = true ... } {{code}} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop()```.{{code}} A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop().{{code}} > IsStopped set to true in before stop() is complete. > --- > > Key: SPARK-6616 > URL: https://issues.apache.org/jira/browse/SPARK-6616 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Ilya Ganelin > > There are numerous instances throughout the code base of the following: > {{code}} > if (!stopped) { > stopped = true > ... > } > {{code}} > In general, this is bad practice since it can cause an incomplete cleanup if > there is an error during shutdown and not all code executes. Incomplete > cleanup is harder to track down than a double cleanup that triggers some > error. I propose fixing this throughout the code, starting with the cleanup > sequence with {{code}}SparkContext.stop().{{code}} > A cursory examination reveals this in {{code}}SparkContext.stop(), > SparkEnv.stop(), and ContextCleaner.stop().{{code}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.
[ https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-6616: Description: There are numerous instances throughout the code base of the following: {{code}} if (!stopped) { stopped = true ... } {{code}} In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with {{code}}SparkContext.stop()```.{{code}} A cursory examination reveals this in {{code}}SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop().{{code}} was: There are numerous instances throughout the code base of the following: ``` if (!stopped) { stopped = true ... } ``` In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with ```SparkContext.stop()```. A cursory examination reveals this in ```SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop()```. > IsStopped set to true in before stop() is complete. > --- > > Key: SPARK-6616 > URL: https://issues.apache.org/jira/browse/SPARK-6616 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.3.0 >Reporter: Ilya Ganelin > > There are numerous instances throughout the code base of the following: > {{code}} > if (!stopped) { > stopped = true > ... > } > {{code}} > In general, this is bad practice since it can cause an incomplete cleanup if > there is an error during shutdown and not all code executes. Incomplete > cleanup is harder to track down than a double cleanup that triggers some > error. I propose fixing this throughout the code, starting with the cleanup > sequence with {{code}}SparkContext.stop()```.{{code}} > A cursory examination reveals this in {{code}}SparkContext.stop(), > SparkEnv.stop(), and ContextCleaner.stop().{{code}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6616) IsStopped set to true in before stop() is complete.
Ilya Ganelin created SPARK-6616: --- Summary: IsStopped set to true in before stop() is complete. Key: SPARK-6616 URL: https://issues.apache.org/jira/browse/SPARK-6616 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.3.0 Reporter: Ilya Ganelin There are numerous instances throughout the code base of the following: ``` if (!stopped) { stopped = true ... } ``` In general, this is bad practice since it can cause an incomplete cleanup if there is an error during shutdown and not all code executes. Incomplete cleanup is harder to track down than a double cleanup that triggers some error. I propose fixing this throughout the code, starting with the cleanup sequence with ```SparkContext.stop()```. A cursory examination reveals this in ```SparkContext.stop(), SparkEnv.stop(), and ContextCleaner.stop()```. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6492) SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies
[ https://issues.apache.org/jira/browse/SPARK-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387499#comment-14387499 ] Ilya Ganelin edited comment on SPARK-6492 at 3/30/15 10:03 PM: --- Would it be reasonable to fix this by adding some timeout/retry logic in the SparkContext shutdown code? If so, I can take care of this. Thanks. was (Author: ilganeli): Would it be reasonable to fix this by adding some timeout/retry logic in the DAGScheduler shutdown code? If so, I can take care of this. Thanks. > SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies > --- > > Key: SPARK-6492 > URL: https://issues.apache.org/jira/browse/SPARK-6492 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.0, 1.4.0 >Reporter: Josh Rosen >Priority: Critical > > A deadlock can occur when DAGScheduler death causes a SparkContext to be shut > down while user code is concurrently racing to stop the SparkContext in a > finally block. > For example: > {code} > try { > sc = new SparkContext("local", "test") > // start running a job that causes the DAGSchedulerEventProcessor to > crash > someRDD.doStuff() > } > } finally { > sc.stop() // stop the sparkcontext once the failure in DAGScheduler causes > the above job to fail with an exception > } > {code} > This leads to a deadlock. The event processor thread tries to lock on the > {{SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK}} and becomes blocked because > the thread that holds that lock is waiting for the event processor thread to > join: > {code} > "dag-scheduler-event-loop" daemon prio=5 tid=0x7ffa69456000 nid=0x9403 > waiting for monitor entry [0x0001223ad000] >java.lang.Thread.State: BLOCKED (on object monitor) > at org.apache.spark.SparkContext.stop(SparkContext.scala:1398) > - waiting to lock <0x0007f5037b08> (a java.lang.Object) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:1412) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:52) > {code} > {code} > "pool-1-thread-1-ScalaTest-running-SparkContextSuite" prio=5 > tid=0x7ffa69864800 nid=0x5903 in Object.wait() [0x0001202dc000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0007f4b28000> (a > org.apache.spark.util.EventLoop$$anon$1) > at java.lang.Thread.join(Thread.java:1281) > - locked <0x0007f4b28000> (a > org.apache.spark.util.EventLoop$$anon$1) > at java.lang.Thread.join(Thread.java:1355) > at org.apache.spark.util.EventLoop.stop(EventLoop.scala:79) > at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1352) > at org.apache.spark.SparkContext.stop(SparkContext.scala:1405) > - locked <0x0007f5037b08> (a java.lang.Object) > [...] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6492) SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies
[ https://issues.apache.org/jira/browse/SPARK-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387499#comment-14387499 ] Ilya Ganelin commented on SPARK-6492: - Would it be reasonable to fix this by adding some timeout/retry logic in the DAGScheduler shutdown code? If so, I can take care of this. Thanks. > SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies > --- > > Key: SPARK-6492 > URL: https://issues.apache.org/jira/browse/SPARK-6492 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.0, 1.4.0 >Reporter: Josh Rosen >Priority: Critical > > A deadlock can occur when DAGScheduler death causes a SparkContext to be shut > down while user code is concurrently racing to stop the SparkContext in a > finally block. > For example: > {code} > try { > sc = new SparkContext("local", "test") > // start running a job that causes the DAGSchedulerEventProcessor to > crash > someRDD.doStuff() > } > } finally { > sc.stop() // stop the sparkcontext once the failure in DAGScheduler causes > the above job to fail with an exception > } > {code} > This leads to a deadlock. The event processor thread tries to lock on the > {{SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK}} and becomes blocked because > the thread that holds that lock is waiting for the event processor thread to > join: > {code} > "dag-scheduler-event-loop" daemon prio=5 tid=0x7ffa69456000 nid=0x9403 > waiting for monitor entry [0x0001223ad000] >java.lang.Thread.State: BLOCKED (on object monitor) > at org.apache.spark.SparkContext.stop(SparkContext.scala:1398) > - waiting to lock <0x0007f5037b08> (a java.lang.Object) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:1412) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:52) > {code} > {code} > "pool-1-thread-1-ScalaTest-running-SparkContextSuite" prio=5 > tid=0x7ffa69864800 nid=0x5903 in Object.wait() [0x0001202dc000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0007f4b28000> (a > org.apache.spark.util.EventLoop$$anon$1) > at java.lang.Thread.join(Thread.java:1281) > - locked <0x0007f4b28000> (a > org.apache.spark.util.EventLoop$$anon$1) > at java.lang.Thread.join(Thread.java:1355) > at org.apache.spark.util.EventLoop.stop(EventLoop.scala:79) > at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1352) > at org.apache.spark.SparkContext.stop(SparkContext.scala:1405) > - locked <0x0007f5037b08> (a java.lang.Object) > [...] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-5931) Use consistent naming for time properties
[ https://issues.apache.org/jira/browse/SPARK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367915#comment-14367915 ] Ilya Ganelin edited comment on SPARK-5931 at 3/18/15 9:09 PM: -- [~andrewor14] - I can take this out. Thanks. was (Author: ilganeli): @andrewor - I can take this out. Thanks. > Use consistent naming for time properties > - > > Key: SPARK-5931 > URL: https://issues.apache.org/jira/browse/SPARK-5931 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.0.0 >Reporter: Andrew Or >Assignee: Andrew Or > > This is SPARK-5932's sister issue. > The naming of existing time configs is inconsistent. We currently have the > following throughout the code base: > {code} > spark.network.timeout // seconds > spark.executor.heartbeatInterval // milliseconds > spark.storage.blockManagerSlaveTimeoutMs // milliseconds > spark.yarn.scheduler.heartbeat.interval-ms // milliseconds > {code} > Instead, my proposal is to simplify the config name itself and make > everything accept time using the following format: 5s, 2ms, 100us. For > instance: > {code} > spark.network.timeout = 5s > spark.executor.heartbeatInterval = 500ms > spark.storage.blockManagerSlaveTimeout = 100ms > spark.yarn.scheduler.heartbeatInterval = 400ms > {code} > All existing configs that are relevant will be deprecated in favor of the new > ones. We should do this soon before we keep introducing more time configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5931) Use consistent naming for time properties
[ https://issues.apache.org/jira/browse/SPARK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367915#comment-14367915 ] Ilya Ganelin commented on SPARK-5931: - @andrewor - I can take this out. Thanks. > Use consistent naming for time properties > - > > Key: SPARK-5931 > URL: https://issues.apache.org/jira/browse/SPARK-5931 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.0.0 >Reporter: Andrew Or >Assignee: Andrew Or > > This is SPARK-5932's sister issue. > The naming of existing time configs is inconsistent. We currently have the > following throughout the code base: > {code} > spark.network.timeout // seconds > spark.executor.heartbeatInterval // milliseconds > spark.storage.blockManagerSlaveTimeoutMs // milliseconds > spark.yarn.scheduler.heartbeat.interval-ms // milliseconds > {code} > Instead, my proposal is to simplify the config name itself and make > everything accept time using the following format: 5s, 2ms, 100us. For > instance: > {code} > spark.network.timeout = 5s > spark.executor.heartbeatInterval = 500ms > spark.storage.blockManagerSlaveTimeout = 100ms > spark.yarn.scheduler.heartbeatInterval = 400ms > {code} > All existing configs that are relevant will be deprecated in favor of the new > ones. We should do this soon before we keep introducing more time configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5932) Use consistent naming for byte properties
[ https://issues.apache.org/jira/browse/SPARK-5932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367919#comment-14367919 ] Ilya Ganelin commented on SPARK-5932: - [~andrewor14] - I can take this out. Thanks. > Use consistent naming for byte properties > - > > Key: SPARK-5932 > URL: https://issues.apache.org/jira/browse/SPARK-5932 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.0.0 >Reporter: Andrew Or >Assignee: Andrew Or > > This is SPARK-5931's sister issue. > The naming of existing byte configs is inconsistent. We currently have the > following throughout the code base: > {code} > spark.reducer.maxMbInFlight // megabytes > spark.kryoserializer.buffer.mb // megabytes > spark.shuffle.file.buffer.kb // kilobytes > spark.broadcast.blockSize // kilobytes > spark.executor.logs.rolling.size.maxBytes // bytes > spark.io.compression.snappy.block.size // bytes > {code} > Instead, my proposal is to simplify the config name itself and make > everything accept time using the following format: 500b, 2k, 100m, 46g, > similar to what we currently use for our memory settings. For instance: > {code} > spark.reducer.maxSizeInFlight = 10m > spark.kryoserializer.buffer = 2m > spark.shuffle.file.buffer = 10k > spark.broadcast.blockSize = 20k > spark.executor.logs.rolling.maxSize = 500b > spark.io.compression.snappy.blockSize = 200b > {code} > All existing configs that are relevant will be deprecated in favor of the new > ones. We should do this soon before we keep introducing more time configs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException
[ https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367891#comment-14367891 ] Ilya Ganelin commented on SPARK-5945: - Hi Imran - I'd be happy to tackle this. Could you please assign it to me? Thank you. > Spark should not retry a stage infinitely on a FetchFailedException > --- > > Key: SPARK-5945 > URL: https://issues.apache.org/jira/browse/SPARK-5945 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Imran Rashid > > While investigating SPARK-5928, I noticed some very strange behavior in the > way spark retries stages after a FetchFailedException. It seems that on a > FetchFailedException, instead of simply killing the task and retrying, Spark > aborts the stage and retries. If it just retried the task, the task might > fail 4 times and then trigger the usual job killing mechanism. But by > killing the stage instead, the max retry logic is skipped (it looks to me > like there is no limit for retries on a stage). > After a bit of discussion with Kay Ousterhout, it seems the idea is that if a > fetch fails, we assume that the block manager we are fetching from has > failed, and that it will succeed if we retry the stage w/out that block > manager. In that case, it wouldn't make any sense to retry the task, since > its doomed to fail every time, so we might as well kill the whole stage. But > this raises two questions: > 1) Is it really safe to assume that a FetchFailedException means that the > BlockManager has failed, and ti will work if we just try another one? > SPARK-5928 shows that there are at least some cases where that assumption is > wrong. Even if we fix that case, this logic seems brittle to the next case > we find. I guess the idea is that this behavior is what gives us the "R" in > RDD ... but it seems like its not really that robust and maybe should be > reconsidered. > 2) Should stages only be retried a limited number of times? It would be > pretty easy to put in a limited number of retries per stage. Though again, > we encounter issues with keeping things resilient. Theoretically one stage > could have many retries, but due to failures in different stages further > downstream, so we might need to track the cause of each retry as well to > still have the desired behavior. > In general it just seems there is some flakiness in the retry logic. This is > the only reproducible example I have at the moment, but I vaguely recall > hitting other cases of strange behavior w/ retries when trying to run long > pipelines. Eg., if one executor is stuck in a GC during a fetch, the fetch > fails, but the executor eventually comes back and the stage gets retried > again, but the same GC issues happen the second time around, etc. > Copied from SPARK-5928, here's the example program that can regularly produce > a loop of stage failures. Note that it will only fail from a remote fetch, > so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell > --num-executors 2 --executor-memory 4000m}} > {code} > val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => > val n = 3e3.toInt > val arr = new Array[Byte](n) > //need to make sure the array doesn't compress to something small > scala.util.Random.nextBytes(arr) > arr > } > rdd.map { x => (1, x)}.groupByKey().count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14359294#comment-14359294 ] Ilya Ganelin commented on SPARK-4927: - Are you running over yarn? My theory is that the memory usage has to do with data movement between nodes. Sent with Good (www.good.com) > Spark does not clean up properly during long jobs. > --- > > Key: SPARK-4927 > URL: https://issues.apache.org/jira/browse/SPARK-4927 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Ilya Ganelin > > On a long running Spark job, Spark will eventually run out of memory on the > driver node due to metadata overhead from the shuffle operation. Spark will > continue to operate, however with drastically decreased performance (since > swapping now occurs with every operation). > The spark.cleanup.tll parameter allows a user to configure when cleanup > happens but the issue with doing this is that it isn’t done safely, e.g. If > this clears a cached RDD or active task in the middle of processing a stage, > this ultimately causes a KeyNotFoundException when the next stage attempts to > reference the cleared RDD or task. > There should be a sustainable mechanism for cleaning up stale metadata that > allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358958#comment-14358958 ] Ilya Ganelin edited comment on SPARK-4927 at 3/12/15 6:50 PM: -- Hi Sean - I have a code snippet that reproduced this. Let me send it to you in a bit - I don't have the means to run 1.3 in a cluster. Realized that I already had that code snippet posted. Running the above code doesn't reproduce the issue? was (Author: ilganeli): Hi Sean - I have a code snippet that reproduced this. Let me send it to you in a bit - I don't have the means to run 1.3 in a cluster. Sent with Good (www.good.com) > Spark does not clean up properly during long jobs. > --- > > Key: SPARK-4927 > URL: https://issues.apache.org/jira/browse/SPARK-4927 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Ilya Ganelin > > On a long running Spark job, Spark will eventually run out of memory on the > driver node due to metadata overhead from the shuffle operation. Spark will > continue to operate, however with drastically decreased performance (since > swapping now occurs with every operation). > The spark.cleanup.tll parameter allows a user to configure when cleanup > happens but the issue with doing this is that it isn’t done safely, e.g. If > this clears a cached RDD or active task in the middle of processing a stage, > this ultimately causes a KeyNotFoundException when the next stage attempts to > reference the cleared RDD or task. > There should be a sustainable mechanism for cleaning up stale metadata that > allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358958#comment-14358958 ] Ilya Ganelin commented on SPARK-4927: - Hi Sean - I have a code snippet that reproduced this. Let me send it to you in a bit - I don't have the means to run 1.3 in a cluster. Sent with Good (www.good.com) > Spark does not clean up properly during long jobs. > --- > > Key: SPARK-4927 > URL: https://issues.apache.org/jira/browse/SPARK-4927 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Ilya Ganelin > > On a long running Spark job, Spark will eventually run out of memory on the > driver node due to metadata overhead from the shuffle operation. Spark will > continue to operate, however with drastically decreased performance (since > swapping now occurs with every operation). > The spark.cleanup.tll parameter allows a user to configure when cleanup > happens but the issue with doing this is that it isn’t done safely, e.g. If > this clears a cached RDD or active task in the middle of processing a stage, > this ultimately causes a KeyNotFoundException when the next stage attempts to > reference the cleared RDD or task. > There should be a sustainable mechanism for cleaning up stale metadata that > allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347216#comment-14347216 ] Ilya Ganelin commented on SPARK-3533: - [~aaronjosephs] - Let me see if that's it. Thanks! > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14343387#comment-14343387 ] Ilya Ganelin commented on SPARK-3533: - Hey [~aaronjosephs], please feel free. I'm out of ideas for this one. You can see my code changes at the github link and the issue I ran into here. Thanks. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5845) Time to cleanup spilled shuffle files not included in shuffle write time
[ https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14339794#comment-14339794 ] Ilya Ganelin commented on SPARK-5845: - I'm code complete on this, will submit a PR shortly. > Time to cleanup spilled shuffle files not included in shuffle write time > > > Key: SPARK-5845 > URL: https://issues.apache.org/jira/browse/SPARK-5845 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.3.0, 1.2.1 >Reporter: Kay Ousterhout >Assignee: Ilya Ganelin >Priority: Minor > > When the disk is contended, I've observed cases when it takes as long as 7 > seconds to clean up all of the intermediate spill files for a shuffle (when > using the sort based shuffle, but bypassing merging because there are <=200 > shuffle partitions). This is even when the shuffle data is non-huge (152MB > written from one of the tasks where I observed this). This is effectively > part of the shuffle write time (because it's a necessary side effect of > writing data to disk) so should be added to the shuffle write time to > facilitate debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs
[ https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14337104#comment-14337104 ] Ilya Ganelin commented on SPARK-5750: - I'd be happy to pull those in. Is it fine to submit the PR against this issue? > Document that ordering of elements in shuffled partitions is not > deterministic across runs > -- > > Key: SPARK-5750 > URL: https://issues.apache.org/jira/browse/SPARK-5750 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen > > The ordering of elements in shuffled partitions is not deterministic across > runs. For instance, consider the following example: > {code} > val largeFiles = sc.textFile(...) > val airlines = largeFiles.repartition(2000).cache() > println(airlines.first) > {code} > If this code is run twice, then each run will output a different result. > There is non-determinism in the shuffle read code that accounts for this: > Spark's shuffle read path processes blocks as soon as they are fetched Spark > uses > [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala] > to fetch shuffle data from mappers. In this code, requests for multiple > blocks from the same host are batched together, so nondeterminism in where > tasks are run means that the set of requests can vary across runs. In > addition, there's an [explicit > call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256] > to randomize the order of the batched fetch requests. As a result, shuffle > operations cannot be guaranteed to produce the same ordering of the elements > in their partitions. > Therefore, Spark should update its docs to clarify that the ordering of > elements in shuffle RDDs' partitions is non-deterministic. Note, however, > that the _set_ of elements in each partition will be deterministic: if we > used {{mapPartitions}} to sort each partition, then the {{first()}} call > above would produce a deterministic result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs
[ https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14336795#comment-14336795 ] Ilya Ganelin commented on SPARK-5750: - Did you have a particular doc in mind to update? I feel like this sort of comment should go in the programming guide but there's not really a good spot for it. One glaring omission in the guide is a general writeup of the shuffle operation and the role that it plays internally. Understanding shuffles is key to writing stable Spark applications yet there isn't really any mention of it outside of the tech talks and presentations from the Spark folks. My suggestion would be to create a section providing an overview of shuffle, what parameters influence its behavior and stability, and then add this comment to that section. > Document that ordering of elements in shuffled partitions is not > deterministic across runs > -- > > Key: SPARK-5750 > URL: https://issues.apache.org/jira/browse/SPARK-5750 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen > > The ordering of elements in shuffled partitions is not deterministic across > runs. For instance, consider the following example: > {code} > val largeFiles = sc.textFile(...) > val airlines = largeFiles.repartition(2000).cache() > println(airlines.first) > {code} > If this code is run twice, then each run will output a different result. > There is non-determinism in the shuffle read code that accounts for this: > Spark's shuffle read path processes blocks as soon as they are fetched Spark > uses > [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala] > to fetch shuffle data from mappers. In this code, requests for multiple > blocks from the same host are batched together, so nondeterminism in where > tasks are run means that the set of requests can vary across runs. In > addition, there's an [explicit > call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256] > to randomize the order of the batched fetch requests. As a result, shuffle > operations cannot be guaranteed to produce the same ordering of the elements > in their partitions. > Therefore, Spark should update its docs to clarify that the ordering of > elements in shuffle RDDs' partitions is non-deterministic. Note, however, > that the _set_ of elements in each partition will be deterministic: if we > used {{mapPartitions}} to sort each partition, then the {{first()}} call > above would produce a deterministic result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5845) Time to cleanup spilled shuffle files not included in shuffle write time
[ https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335751#comment-14335751 ] Ilya Ganelin commented on SPARK-5845: - My mistake - missed your comment about the spill files in the detailed description. Given that we're interested in cleaning up the spill files which appear to be cleaned up in ExternalSorter.stop() (please correct me if I'm wrong), I would like to either a) Pass the context to the stop() method - this is possible since the SortShuffleWriter has visibility of the TaskContext (which in turn stores the metrics we're interested in). b) (My preference since it won't break the existing interface) Surround sorter.stop() on line 91 of SortShuffleWriter.scala with a timer. The only downside to this second approach is that it will also include the cleanup of the partition writers. I'm not sure whether that should be included in this time computation. > Time to cleanup spilled shuffle files not included in shuffle write time > > > Key: SPARK-5845 > URL: https://issues.apache.org/jira/browse/SPARK-5845 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.3.0, 1.2.1 >Reporter: Kay Ousterhout >Assignee: Ilya Ganelin >Priority: Minor > > When the disk is contended, I've observed cases when it takes as long as 7 > seconds to clean up all of the intermediate spill files for a shuffle (when > using the sort based shuffle, but bypassing merging because there are <=200 > shuffle partitions). This is even when the shuffle data is non-huge (152MB > written from one of the tasks where I observed this). This is effectively > part of the shuffle write time (because it's a necessary side effect of > writing data to disk) so should be added to the shuffle write time to > facilitate debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time
[ https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335646#comment-14335646 ] Ilya Ganelin edited comment on SPARK-5845 at 2/24/15 11:19 PM: --- If I understand correctly, the file cleanup happens in IndexShuffleBlockManager:::removeDataByMap(), which is called from either the SortShuffleManager or the SortShuffleWriter. The problem is that these classes do not have any knowledge of the currently collected metrics. Furthermore, the disk cleanup is, unless configured in the SparkConf, triggered asynchronously via the RemoveShuffle message so there doesn't appear to be a straightforward way to provide a set of metrics to be updated. Do you have any suggestions for getting around this? Please let me know, thank you. was (Author: ilganeli): If I understand correctly, the file cleanup happens in IndexShuffleBlockManager:::removeDataByMap(), which is called from either the SortShuffleManager or the SortShuffleWriter. The problem is that these classes do not have any knowledge of the currently collected metrics. Furthermore, the disk cleanup is, unless configured in the SparkConf, triggered asynchronously via the RemoveShuffle() message so there doesn't appear to be a straightforward way to provide a set of metrics to be updated. Do you have any suggestions for getting around this? Please let me know, thank you. > Time to cleanup intermediate shuffle files not included in shuffle write time > - > > Key: SPARK-5845 > URL: https://issues.apache.org/jira/browse/SPARK-5845 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.3.0, 1.2.1 >Reporter: Kay Ousterhout >Assignee: Ilya Ganelin >Priority: Minor > > When the disk is contended, I've observed cases when it takes as long as 7 > seconds to clean up all of the intermediate spill files for a shuffle (when > using the sort based shuffle, but bypassing merging because there are <=200 > shuffle partitions). This is even when the shuffle data is non-huge (152MB > written from one of the tasks where I observed this). This is effectively > part of the shuffle write time (because it's a necessary side effect of > writing data to disk) so should be added to the shuffle write time to > facilitate debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time
[ https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335646#comment-14335646 ] Ilya Ganelin commented on SPARK-5845: - If I understand correctly, the file cleanup happens in IndexShuffleBlockManager:::removeDataByMap(), which is called from either the SortShuffleManager or the SortShuffleWriter. The problem is that these classes do not have any knowledge of the currently collected metrics. Furthermore, the disk cleanup is, unless configured in the SparkConf, triggered asynchronously via the RemoveShuffle() message so there doesn't appear to be a straightforward way to provide a set of metrics to be updated. Do you have any suggestions for getting around this? Please let me know, thank you. > Time to cleanup intermediate shuffle files not included in shuffle write time > - > > Key: SPARK-5845 > URL: https://issues.apache.org/jira/browse/SPARK-5845 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.3.0, 1.2.1 >Reporter: Kay Ousterhout >Assignee: Ilya Ganelin >Priority: Minor > > When the disk is contended, I've observed cases when it takes as long as 7 > seconds to clean up all of the intermediate spill files for a shuffle (when > using the sort based shuffle, but bypassing merging because there are <=200 > shuffle partitions). This is even when the shuffle data is non-huge (152MB > written from one of the tasks where I observed this). This is effectively > part of the shuffle write time (because it's a necessary side effect of > writing data to disk) so should be added to the shuffle write time to > facilitate debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5079) Detect failed jobs / batches in Spark Streaming unit tests
[ https://issues.apache.org/jira/browse/SPARK-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333743#comment-14333743 ] Ilya Ganelin commented on SPARK-5079: - Hi [~joshrosen] - I'm trying to wrap my head around the unit tests trying to find some specific tests where this is a problem as a baseline. If you could highlight a couple of examples as a starting point that would help a lot. Thanks! > Detect failed jobs / batches in Spark Streaming unit tests > -- > > Key: SPARK-5079 > URL: https://issues.apache.org/jira/browse/SPARK-5079 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Josh Rosen >Assignee: Ilya Ganelin > > Currently, it is possible to write Spark Streaming unit tests where Spark > jobs fail but the streaming tests succeed because we rely on wall-clock time > plus output comparision in order to check whether a test has passed, and > hence may miss cases where errors occurred if they didn't affect these > results. We should strengthen the tests to check that no job failures > occurred while processing batches. > See https://github.com/apache/spark/pull/3832#issuecomment-68580794 for > additional context. > The StreamingTestWaiter in https://github.com/apache/spark/pull/3801 might > also fix this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time
[ https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333691#comment-14333691 ] Ilya Ganelin commented on SPARK-5845: - Hi Kay - I can knock this one out. Thanks. > Time to cleanup intermediate shuffle files not included in shuffle write time > - > > Key: SPARK-5845 > URL: https://issues.apache.org/jira/browse/SPARK-5845 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 1.3.0, 1.2.1 >Reporter: Kay Ousterhout >Priority: Minor > > When the disk is contended, I've observed cases when it takes as long as 7 > seconds to clean up all of the intermediate spill files for a shuffle (when > using the sort based shuffle, but bypassing merging because there are <=200 > shuffle partitions). This is even when the shuffle data is non-huge (152MB > written from one of the tasks where I observed this). This is effectively > part of the shuffle write time (because it's a necessary side effect of > writing data to disk) so should be added to the shuffle write time to > facilitate debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs
[ https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333685#comment-14333685 ] Ilya Ganelin commented on SPARK-5750: - Hi Josh - I can knock this out. Thanks. > Document that ordering of elements in shuffled partitions is not > deterministic across runs > -- > > Key: SPARK-5750 > URL: https://issues.apache.org/jira/browse/SPARK-5750 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen > > The ordering of elements in shuffled partitions is not deterministic across > runs. For instance, consider the following example: > {code} > val largeFiles = sc.textFile(...) > val airlines = largeFiles.repartition(2000).cache() > println(airlines.first) > {code} > If this code is run twice, then each run will output a different result. > There is non-determinism in the shuffle read code that accounts for this: > Spark's shuffle read path processes blocks as soon as they are fetched Spark > uses > [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala] > to fetch shuffle data from mappers. In this code, requests for multiple > blocks from the same host are batched together, so nondeterminism in where > tasks are run means that the set of requests can vary across runs. In > addition, there's an [explicit > call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256] > to randomize the order of the batched fetch requests. As a result, shuffle > operations cannot be guaranteed to produce the same ordering of the elements > in their partitions. > Therefore, Spark should update its docs to clarify that the ordering of > elements in shuffle RDDs' partitions is non-deterministic. Note, however, > that the _set_ of elements in each partition will be deterministic: if we > used {{mapPartitions}} to sort each partition, then the {{first()}} call > above would produce a deterministic result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
[ https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334 ] Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 2:40 AM: -- Edit: Upon further consideration I think what is really at play here is simply a need to explain closures in local vs. cluster modes. I'd like to add a section on this to the Spark programming guide and then this could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. was (Author: ilganeli): Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). Specifically, I'd like to discuss where the actual data is that the executors are operating on. This also becomes useful during performance tuning - for example using mapPartitions to avoid shuffle operations, since it ties in with data aggregation for executors. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. Edit: Upon further consideration I've realized that the above doesn't quite address the spirit of the issue. I think what is really at play here is simply a need to explain closures in local vs. cluster modes. > Improve foreach() documentation to avoid confusion between local- and > cluster-mode behavior > --- > > Key: SPARK-4423 > URL: https://issues.apache.org/jira/browse/SPARK-4423 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen >Assignee: Ilya Ganelin > > {{foreach}} seems to be a common source of confusion for new users: in > {{local}} mode, {{foreach}} can be used to update local variables on the > driver, but programs that do this will not work properly when executed on > clusters, since the {{foreach}} will update per-executor variables (note that > this _will_ work correctly for accumulators, but not for other types of > mutable objects). > Similarly, I've seen users become confused when {{.foreach(println)}} doesn't > print to the driver's standard output. > At a minimum, we should improve the documentation to warn users against > unsafe uses of {{foreach}} that won't work properly when transitioning from > local mode to a real cluster. > We might also consider changes to local mode so that its behavior more > closely matches the cluster modes; this will require some discussion, though, > since any change of behavior here would technically be a user-visible > backwards-incompatible change (I don't think that we made any explicit > guarantees about the current local-mode behavior, but someone might be > relying on the current implicit behavior). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
[ https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334 ] Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 2:39 AM: -- Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). Specifically, I'd like to discuss where the actual data is that the executors are operating on. This also becomes useful during performance tuning - for example using mapPartitions to avoid shuffle operations, since it ties in with data aggregation for executors. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. Edit: Upon further consideration I've realized that the above doesn't quite address the spirit of the issue. I think what is really at play here is simply a need to explain closures in local vs. cluster modes. was (Author: ilganeli): Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). Specifically, I'd like to discuss where the actual data is that the executors are operating on. This also becomes useful during performance tuning - for example using mapPartitions to avoid shuffle operations, since it ties in with data aggregation for executors. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. > Improve foreach() documentation to avoid confusion between local- and > cluster-mode behavior > --- > > Key: SPARK-4423 > URL: https://issues.apache.org/jira/browse/SPARK-4423 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen >Assignee: Ilya Ganelin > > {{foreach}} seems to be a common source of confusion for new users: in > {{local}} mode, {{foreach}} can be used to update local variables on the > driver, but programs that do this will not work properly when executed on > clusters, since the {{foreach}} will update per-executor variables (note that > this _will_ work correctly for accumulators, but not for other types of > mutable objects). > Similarly, I've seen users become confused when {{.foreach(println)}} doesn't > print to the driver's standard output. > At a minimum, we should improve the documentation to warn users against > unsafe uses of {{foreach}} that won't work properly when transitioning from > local mode to a real cluster. > We might also consider changes to local mode so that its behavior more > closely matches the cluster modes; this will require some discussion, though, > since any change of behavior here would technically be a user-visible > backwards-incompatible change (I don't think that we made any explicit > guarantees about the current local-mode behavior, but someone might be > relying on the current implicit behavior). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
[ https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334 ] Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:46 AM: -- Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). Specifically, I'd like to discuss where the actual data is that the executors are operating on. This also becomes useful during performance tuning - for example using mapPartitions to avoid shuffle operations, since it ties in with data aggregation for executors. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. was (Author: ilganeli): Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue specifically - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). Specifically, I'd like to discuss where the actual data is that the executors are operating on. This also becomes useful during performance tuning - for example using mapPartitions to avoid shuffle operations, since it ties in with data aggregation for executors. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. > Improve foreach() documentation to avoid confusion between local- and > cluster-mode behavior > --- > > Key: SPARK-4423 > URL: https://issues.apache.org/jira/browse/SPARK-4423 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen >Assignee: Ilya Ganelin > > {{foreach}} seems to be a common source of confusion for new users: in > {{local}} mode, {{foreach}} can be used to update local variables on the > driver, but programs that do this will not work properly when executed on > clusters, since the {{foreach}} will update per-executor variables (note that > this _will_ work correctly for accumulators, but not for other types of > mutable objects). > Similarly, I've seen users become confused when {{.foreach(println)}} doesn't > print to the driver's standard output. > At a minimum, we should improve the documentation to warn users against > unsafe uses of {{foreach}} that won't work properly when transitioning from > local mode to a real cluster. > We might also consider changes to local mode so that its behavior more > closely matches the cluster modes; this will require some discussion, though, > since any change of behavior here would technically be a user-visible > backwards-incompatible change (I don't think that we made any explicit > guarantees about the current local-mode behavior, but someone might be > relying on the current implicit behavior). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
[ https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334 ] Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:46 AM: -- Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue specifically - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). Specifically, I'd like to discuss where the actual data is that the executors are operating on. This also becomes useful during performance tuning - for example using mapPartitions to avoid shuffle operations, since it ties in with data aggregation for executors. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. was (Author: ilganeli): Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue specifically - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). This is something that's a little un-intuitive and understanding it is vital to understanding Spark. This also becomes useful during performance tuning (for example using mapPartitions to avoid shuffle operations). This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. > Improve foreach() documentation to avoid confusion between local- and > cluster-mode behavior > --- > > Key: SPARK-4423 > URL: https://issues.apache.org/jira/browse/SPARK-4423 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen >Assignee: Ilya Ganelin > > {{foreach}} seems to be a common source of confusion for new users: in > {{local}} mode, {{foreach}} can be used to update local variables on the > driver, but programs that do this will not work properly when executed on > clusters, since the {{foreach}} will update per-executor variables (note that > this _will_ work correctly for accumulators, but not for other types of > mutable objects). > Similarly, I've seen users become confused when {{.foreach(println)}} doesn't > print to the driver's standard output. > At a minimum, we should improve the documentation to warn users against > unsafe uses of {{foreach}} that won't work properly when transitioning from > local mode to a real cluster. > We might also consider changes to local mode so that its behavior more > closely matches the cluster modes; this will require some discussion, though, > since any change of behavior here would technically be a user-visible > backwards-incompatible change (I don't think that we made any explicit > guarantees about the current local-mode behavior, but someone might be > relying on the current implicit behavior). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
[ https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334 ] Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:43 AM: -- Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue specifically - local execution on the driver (in {{local}} mode) versus the division of labor between the driver and the executors (in {{cluster}} mode). This is something that's a little un-intuitive and understanding it is vital to understanding Spark. This also becomes useful during performance tuning (for example using mapPartitions to avoid shuffle operations). This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. was (Author: ilganeli): Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue specifically - local execution on the driver (in ```local``` mode) versus the division of labor between the driver and the executors (in ```cluster``` mode). This is something that's a little un-intuitive and understanding it is vital to understanding Spark. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. > Improve foreach() documentation to avoid confusion between local- and > cluster-mode behavior > --- > > Key: SPARK-4423 > URL: https://issues.apache.org/jira/browse/SPARK-4423 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen >Assignee: Ilya Ganelin > > {{foreach}} seems to be a common source of confusion for new users: in > {{local}} mode, {{foreach}} can be used to update local variables on the > driver, but programs that do this will not work properly when executed on > clusters, since the {{foreach}} will update per-executor variables (note that > this _will_ work correctly for accumulators, but not for other types of > mutable objects). > Similarly, I've seen users become confused when {{.foreach(println)}} doesn't > print to the driver's standard output. > At a minimum, we should improve the documentation to warn users against > unsafe uses of {{foreach}} that won't work properly when transitioning from > local mode to a real cluster. > We might also consider changes to local mode so that its behavior more > closely matches the cluster modes; this will require some discussion, though, > since any change of behavior here would technically be a user-visible > backwards-incompatible change (I don't think that we made any explicit > guarantees about the current local-mode behavior, but someone might be > relying on the current implicit behavior). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
[ https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317334#comment-14317334 ] Ilya Ganelin commented on SPARK-4423: - Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section to the Spark Programming Guide that discusses this issue specifically - local execution on the driver (in ```local``` mode) versus the division of labor between the driver and the executors (in ```cluster``` mode). This is something that's a little un-intuitive and understanding it is vital to understanding Spark. This section could be referenced within the shorter description for foreach, map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of operators we care about. > Improve foreach() documentation to avoid confusion between local- and > cluster-mode behavior > --- > > Key: SPARK-4423 > URL: https://issues.apache.org/jira/browse/SPARK-4423 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen >Assignee: Ilya Ganelin > > {{foreach}} seems to be a common source of confusion for new users: in > {{local}} mode, {{foreach}} can be used to update local variables on the > driver, but programs that do this will not work properly when executed on > clusters, since the {{foreach}} will update per-executor variables (note that > this _will_ work correctly for accumulators, but not for other types of > mutable objects). > Similarly, I've seen users become confused when {{.foreach(println)}} doesn't > print to the driver's standard output. > At a minimum, we should improve the documentation to warn users against > unsafe uses of {{foreach}} that won't work properly when transitioning from > local mode to a real cluster. > We might also consider changes to local mode so that its behavior more > closely matches the cluster modes; this will require some discussion, though, > since any change of behavior here would technically be a user-visible > backwards-incompatible change (I don't think that we made any explicit > guarantees about the current local-mode behavior, but someone might be > relying on the current implicit behavior). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4655) Split Stage into ShuffleMapStage and ResultStage subclasses
[ https://issues.apache.org/jira/browse/SPARK-4655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312458#comment-14312458 ] Ilya Ganelin commented on SPARK-4655: - Hi [~joshrosen], I'd be happy to work on this. Thanks! > Split Stage into ShuffleMapStage and ResultStage subclasses > --- > > Key: SPARK-4655 > URL: https://issues.apache.org/jira/browse/SPARK-4655 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Josh Rosen >Assignee: Josh Rosen > > The scheduler's {{Stage}} class has many fields which are only applicable to > result stages or shuffle map stages. As a result, I think that it makes > sense to make {{Stage}} into an abstract base class with two subclasses, > {{ResultStage}} and {{ShuffleMapStage}}. This would improve the > understandability of the DAGScheduler code. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
[ https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312424#comment-14312424 ] Ilya Ganelin commented on SPARK-4423: - I'll be happy to update this. Thank you. > Improve foreach() documentation to avoid confusion between local- and > cluster-mode behavior > --- > > Key: SPARK-4423 > URL: https://issues.apache.org/jira/browse/SPARK-4423 > Project: Spark > Issue Type: Improvement > Components: Documentation >Reporter: Josh Rosen > > {{foreach}} seems to be a common source of confusion for new users: in > {{local}} mode, {{foreach}} can be used to update local variables on the > driver, but programs that do this will not work properly when executed on > clusters, since the {{foreach}} will update per-executor variables (note that > this _will_ work correctly for accumulators, but not for other types of > mutable objects). > Similarly, I've seen users become confused when {{.foreach(println)}} doesn't > print to the driver's standard output. > At a minimum, we should improve the documentation to warn users against > unsafe uses of {{foreach}} that won't work properly when transitioning from > local mode to a real cluster. > We might also consider changes to local mode so that its behavior more > closely matches the cluster modes; this will require some discussion, though, > since any change of behavior here would technically be a user-visible > backwards-incompatible change (I don't think that we made any explicit > guarantees about the current local-mode behavior, but someone might be > relying on the current implicit behavior). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-5570) No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work
[ https://issues.apache.org/jira/browse/SPARK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312416#comment-14312416 ] Ilya Ganelin edited comment on SPARK-5570 at 2/9/15 4:27 PM: - I would be happy to fix this. Thank you. was (Author: ilganeli): I'll fix this, can you please assign it to me? Thanks. > No docs stating that `new SparkConf().set("spark.driver.memory", ...) will > not work > --- > > Key: SPARK-5570 > URL: https://issues.apache.org/jira/browse/SPARK-5570 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Affects Versions: 1.2.0 >Reporter: Tathagata Das >Assignee: Andrew Or > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5570) No docs stating that `new SparkConf().set("spark.driver.memory", ...) will not work
[ https://issues.apache.org/jira/browse/SPARK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312416#comment-14312416 ] Ilya Ganelin commented on SPARK-5570: - I'll fix this, can you please assign it to me? Thanks. > No docs stating that `new SparkConf().set("spark.driver.memory", ...) will > not work > --- > > Key: SPARK-5570 > URL: https://issues.apache.org/jira/browse/SPARK-5570 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Affects Versions: 1.2.0 >Reporter: Tathagata Das >Assignee: Andrew Or > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5079) Detect failed jobs / batches in Spark Streaming unit tests
[ https://issues.apache.org/jira/browse/SPARK-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312415#comment-14312415 ] Ilya Ganelin commented on SPARK-5079: - I can work on this - can you please assign it to me? Thank you. > Detect failed jobs / batches in Spark Streaming unit tests > -- > > Key: SPARK-5079 > URL: https://issues.apache.org/jira/browse/SPARK-5079 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Josh Rosen > > Currently, it is possible to write Spark Streaming unit tests where Spark > jobs fail but the streaming tests succeed because we rely on wall-clock time > plus output comparision in order to check whether a test has passed, and > hence may miss cases where errors occurred if they didn't affect these > results. We should strengthen the tests to check that no job failures > occurred while processing batches. > See https://github.com/apache/spark/pull/3832#issuecomment-68580794 for > additional context. > The StreamingTestWaiter in https://github.com/apache/spark/pull/3801 might > also fix this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-823) spark.default.parallelism's default is inconsistent across scheduler backends
[ https://issues.apache.org/jira/browse/SPARK-823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312382#comment-14312382 ] Ilya Ganelin commented on SPARK-823: Hi [~joshrosen] I believe the documentation is up to date and I reviewed all usages of spark.default.parallelism and found no inconsistencies with the documentation. The only thing that is un-documented with regards to the usage of spark.default.parallelism is how it's used within the Partitioner class in both Spark and Python. If defined, the default number of partitions created is equal to spark.default.parallelism - otherwise, it's the local number of partitions. I think this issue can be closed - I don't think that particular case needs to be publicly documented (it's clearly evident in the code what is going on). > spark.default.parallelism's default is inconsistent across scheduler backends > - > > Key: SPARK-823 > URL: https://issues.apache.org/jira/browse/SPARK-823 > Project: Spark > Issue Type: Bug > Components: Documentation, PySpark, Scheduler >Affects Versions: 0.8.0, 0.7.3, 0.9.1 >Reporter: Josh Rosen >Priority: Minor > > The [0.7.3 configuration > guide|http://spark-project.org/docs/latest/configuration.html] says that > {{spark.default.parallelism}}'s default is 8, but the default is actually > max(totalCoreCount, 2) for the standalone scheduler backend, 8 for the Mesos > scheduler, and {{threads}} for the local scheduler: > https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L157 > https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala#L317 > https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala#L150 > Should this be clarified in the documentation? Should the Mesos scheduler > backend's default be revised? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2584) Do not mutate block storage level on the UI
[ https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273877#comment-14273877 ] Ilya Ganelin edited comment on SPARK-2584 at 1/12/15 7:08 PM: -- Understood, I am able to recreate this issue in 1.1. I'll work on a fix to clarify what's going on. Thank. was (Author: ilganeli): Understood, I was looking at the UI for Spark 1.1 and did not see the block storage level represented as MEMORY_AND_DISK or DISK_ONLY. It's now presented as Memory Deserialized or Disk Deserialized. I'll attempt to recreate this problem in the newer version of Spark but wanted to know if you've seen it since 1.0.1. > Do not mutate block storage level on the UI > --- > > Key: SPARK-2584 > URL: https://issues.apache.org/jira/browse/SPARK-2584 > Project: Spark > Issue Type: Bug > Components: Spark Core, Web UI >Affects Versions: 1.0.1 >Reporter: Andrew Or > > If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes > DISK_ONLY on the UI. We should preserve the original storage level proposed > by the user, in addition to the change in actual storage level. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2584) Do not mutate block storage level on the UI
[ https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273877#comment-14273877 ] Ilya Ganelin commented on SPARK-2584: - Understood, I was looking at the UI for Spark 1.1 and did not see the block storage level represented as MEMORY_AND_DISK or DISK_ONLY. It's now presented as Memory Deserialized or Disk Deserialized. I'll attempt to recreate this problem in the newer version of Spark but wanted to know if you've seen it since 1.0.1. > Do not mutate block storage level on the UI > --- > > Key: SPARK-2584 > URL: https://issues.apache.org/jira/browse/SPARK-2584 > Project: Spark > Issue Type: Bug > Components: Spark Core, Web UI >Affects Versions: 1.0.1 >Reporter: Andrew Or > > If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes > DISK_ONLY on the UI. We should preserve the original storage level proposed > by the user, in addition to the change in actual storage level. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2584) Do not mutate block storage level on the UI
[ https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273762#comment-14273762 ] Ilya Ganelin commented on SPARK-2584: - Hi Andrew, question about this. When you say "we drop it from memory" what mechanism are you talking about? It's illegal to change the persistence level of an already persisted RDD and if you call unpersist() it's dropped from both memory and disk storage. How would an RDD be "dropped" from memory? > Do not mutate block storage level on the UI > --- > > Key: SPARK-2584 > URL: https://issues.apache.org/jira/browse/SPARK-2584 > Project: Spark > Issue Type: Bug > Components: Spark Core, Web UI >Affects Versions: 1.0.1 >Reporter: Andrew Or > > If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes > DISK_ONLY on the UI. We should preserve the original storage level proposed > by the user, in addition to the change in actual storage level. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org