[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses

2021-11-23 Thread Sergei Lebedev (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447859#comment-17447859
 ] 

Sergei Lebedev commented on SPARK-22674:


Hi [~hyukjin.kwon], very happy to see you working on this again! I've noticed 
that you've marked this issue as a duplicate of SPARK-32079, which is newer. 
Shouldn't it be the other way round, i.e. SPARK-32079 marked as a duplicate of 
this issue?

> PySpark breaks serialization of namedtuple subclasses
> -
>
> Key: SPARK-22674
> URL: https://issues.apache.org/jira/browse/SPARK-22674
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0, 2.3.0, 3.1.1
>Reporter: Jonas Amrich
>Priority: Major
>
> Pyspark monkey patches the namedtuple class to make it serializable, however 
> this breaks serialization of its subclasses. With current implementation, any 
> subclass will be serialized (and deserialized) as it's parent namedtuple. 
> Consider this code, which will fail with {{AttributeError: 'Point' object has 
> no attribute 'sum'}}:
> {code}
> from collections import namedtuple
> Point = namedtuple("Point", "x y")
> class PointSubclass(Point):
> def sum(self):
> return self.x + self.y
> rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]])
> rdd.collect()[0][0].sum()
> {code}
> Moreover, as PySpark hijacks all namedtuples in the main module, importing 
> pyspark breaks serialization of namedtuple subclasses even in code which is 
> not related to spark / distributed execution. I don't see any clean solution 
> to this; a possible workaround may be to limit serialization hack only to 
> direct namedtuple subclasses like in 
> https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22338) namedtuple serialization is inefficient

2018-04-29 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458128#comment-16458128
 ] 

Sergei Lebedev commented on SPARK-22338:


[This|https://github.com/apache/spark/pull/21180] PR fixes the issue for 
namedtuples defined in modules (as opposed to the ones defined inside functions 
or in the REPL).

> namedtuple serialization is inefficient
> ---
>
> Key: SPARK-22338
> URL: https://issues.apache.org/jira/browse/SPARK-22338
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Joel Croteau
>Priority: Minor
>
> I greatly appreciate the level of hack that PySpark contains in order to make 
> namedtuples serializable, but I feel like it could be done a little better. 
> In particular, say I create a namedtuple class with a few long argument names 
> like this:
> {code:JiraShouldReallySupportPython}
> MyTuple = namedtuple('MyTuple', ('longarga', 'longargb', 'longargc'))
> {code}
> If a create an instance of this, here is how PySpark serializes it:
> {code:JiraShouldReallySupportPython}
> mytuple = MyTuple(1, 2, 3)
> pickle.dumps(mytuple, pickle.HIGHEST_PROTOCOL)
> b'\x80\x04\x95]\x00\x00\x00\x00\x00\x00\x00\x8c\x13pyspark.serializers\x94\x8c\x08_restore\x94\x93\x94\x8c\x07MyTuple\x94\x8c\x08longarga\x94\x8c\x08longargb\x94\x8c\x08longargc\x94\x87\x94K\x01K\x02K\x03\x87\x94\x87\x94R\x94.'
> {code}
> This serialization includes the name of the namedtuple class, the names of 
> each of its members, as well as references to internal functions in 
> pyspark.serializers. By comparison, this is what I get if I serialize the 
> bare tuple:
> {code:JiraShouldReallySupportPython}
> shorttuple = (1,2,3)
> pickle.dumps(shorttuple, pickle.HIGHEST_PROTOCOL)
> b'\x80\x04\x95\t\x00\x00\x00\x00\x00\x00\x00K\x01K\x02K\x03\x87\x94.'
> {code}
> Much shorter. For another comparison, here is what it looks like if I build a 
> dict with the same data and element names:
> {code:JiraShouldReallySupportPython}
> mydict = {'longarga':1, 'longargb':2, 'longargc':3}
> pickle.dumps(mydict, pickle.HIGHEST_PROTOCOL)
> b'\x80\x04\x95,\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x08longarga\x94K\x01\x8c\x08longargb\x94K\x02\x8c\x08longargc\x94K\x03u.'
> {code}
> In other words, even using a dict is substantially shorter than using a 
> namedtuple in its current form. There shouldn't be any need for namedtuples 
> to have this much overhead in their serialization. For one thing, if the 
> class object is being broadcast to the nodes, there should be no need for 
> each namedtuple instance to include all of the field names; the class name 
> should be enough. If you use namedtuples heavily, this can create a lot of 
> overhead in memory and disk use. I am going to try and improve the 
> serialization and submit a patch if I can find the time, but I don't know the 
> pyspark code too well, so if anyone has suggestions for where to start, I 
> would love to hear them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses

2018-03-14 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399488#comment-16399488
 ] 

Sergei Lebedev commented on SPARK-22674:


[~jonasamrich] I see your point, but the current implementation has known 
limitations, which make it "breaking" as is.

> PySpark breaks serialization of namedtuple subclasses
> -
>
> Key: SPARK-22674
> URL: https://issues.apache.org/jira/browse/SPARK-22674
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Jonas Amrich
>Priority: Major
>
> Pyspark monkey patches the namedtuple class to make it serializable, however 
> this breaks serialization of its subclasses. With current implementation, any 
> subclass will be serialized (and deserialized) as it's parent namedtuple. 
> Consider this code, which will fail with {{AttributeError: 'Point' object has 
> no attribute 'sum'}}:
> {code}
> from collections import namedtuple
> Point = namedtuple("Point", "x y")
> class PointSubclass(Point):
> def sum(self):
> return self.x + self.y
> rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]])
> rdd.collect()[0][0].sum()
> {code}
> Moreover, as PySpark hijacks all namedtuples in the main module, importing 
> pyspark breaks serialization of namedtuple subclasses even in code which is 
> not related to spark / distributed execution. I don't see any clean solution 
> to this; a possible workaround may be to limit serialization hack only to 
> direct namedtuple subclasses like in 
> https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses

2018-03-13 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396948#comment-16396948
 ] 

Sergei Lebedev commented on SPARK-22674:


Just out of curiosity: what is the original motivation for hacking the 
namedtuple serialization?

I've been exploring the quirks caused by {{_hijack_namedtuple}}, and I feel 
PySpark might be better off without it. Some of the reasons are:
 * it silently incurs a performance cost for namedtuples defined outside of 
__main__ (both user-defined and third-party);
 * it leads to confusing and hard to debug error messages in the presence of 
inheritance;
 * it makes namedtuple classes different from normal classes, because the 
latter cannot be serialized if defined in the REPL.

What do you think?

> PySpark breaks serialization of namedtuple subclasses
> -
>
> Key: SPARK-22674
> URL: https://issues.apache.org/jira/browse/SPARK-22674
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Jonas Amrich
>Priority: Major
>
> Pyspark monkey patches the namedtuple class to make it serializable, however 
> this breaks serialization of its subclasses. With current implementation, any 
> subclass will be serialized (and deserialized) as it's parent namedtuple. 
> Consider this code, which will fail with {{AttributeError: 'Point' object has 
> no attribute 'sum'}}:
> {code}
> from collections import namedtuple
> Point = namedtuple("Point", "x y")
> class PointSubclass(Point):
> def sum(self):
> return self.x + self.y
> rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]])
> rdd.collect()[0][0].sum()
> {code}
> Moreover, as PySpark hijacks all namedtuples in the main module, importing 
> pyspark breaks serialization of namedtuple subclasses even in code which is 
> not related to spark / distributed execution. I don't see any clean solution 
> to this; a possible workaround may be to limit serialization hack only to 
> direct namedtuple subclasses like in 
> https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-18 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294745#comment-16294745
 ] 

Sergei Lebedev commented on SPARK-22805:


It does, in a sense that 1.6 history server can no longer read 2.X event logs.

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-17 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294230#comment-16294230
 ] 

Sergei Lebedev commented on SPARK-22805:


It doesn't save much storage if block updates tracking is turned off (default, 
starting from 2.3). If it is on, it saves ~50%. What do you think? Should we 
kill this ticket?

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20970) Deprecate TaskMetrics._updatedBlockStatuses

2017-12-16 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293965#comment-16293965
 ] 

Sergei Lebedev commented on SPARK-20970:


Not only does it use a lot of memory on the driver, but it also bloats the 
event logs, see SPARK-22805.

> Deprecate TaskMetrics._updatedBlockStatuses
> ---
>
> Key: SPARK-20970
> URL: https://issues.apache.org/jira/browse/SPARK-20970
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Thomas Graves
>
> TaskMetrics._updatedBlockStatuses isn't used anywhere internally by spark. It 
> could be used by users though since its exposed by  SparkListenerTaskEnd.  We 
> made it configurable to turn off the tracking of it since it uses a lot of 
> memory in https://issues.apache.org/jira/browse/SPARK-20923.  That config is 
> still true for backwards compatibility. We should turn that to false in next 
> release and deprecate that api altogether.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-16 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836
 ] 

Sergei Lebedev edited comment on SPARK-22805 at 12/16/17 11:17 PM:
---

I've emulated the effect of SPARK-20923 by removing all 
{{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event 
log. The table below compares uncompressed/compressed sizes of this log with 
and without the patch proposed in this issue:

||Mode||Size||
|Decompressed|157M|
|Decompressed with patch|155M|
-
*Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks 
twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. 
[~andrewor14], [~srowen] do you know if there is a reason for that? It looks 
like a bug to me.

*Update*: I'm recomputing the numbers for a fully updatedBlockStatuses-free log.

*Update*: the effect of SPARK-20923 is much more noticeable than I thought 
initially. Removing {{"internal.metrics.updatedBlockStatuses"}} from 
{{"Accumulables"}} and {{"Updated Blocks"}} from {{"Task Metrics"}} reduced the 
log size to 160M. The storage level compression now just shaves of a few M (see 
updated table).


was (Author: lebedev):
I've emulated the effect of SPARK-20923 by removing all 
{{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event 
log. The table below compares uncompressed/compressed sizes of this log with 
and without the patch proposed in this issue:

||Mode||Size||
|LZ4-compressed|2.3G|
|Decompressed|25G|
|LZ4-compressed with patch|2.3G|
|Decompressed with patch|16G|

*Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks 
twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. 
[~andrewor14], [~srowen] do you know if there is a reason for that? It looks 
like a bug to me.

*Update*: I'm recomputing the numbers for a fully updatedBlockStatuses-free log.

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-16 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836
 ] 

Sergei Lebedev edited comment on SPARK-22805 at 12/16/17 10:54 PM:
---

I've emulated the effect of SPARK-20923 by removing all 
{{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event 
log. The table below compares uncompressed/compressed sizes of this log with 
and without the patch proposed in this issue:

||Mode||Size||
|LZ4-compressed|2.3G|
|Decompressed|25G|
|LZ4-compressed with patch|2.3G|
|Decompressed with patch|16G|

*Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks 
twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. 
[~andrewor14], [~srowen] do you know if there is a reason for that? It looks 
like a bug to me.

*Update*: I'm recomputing the numbers for a fully updatedBlockStatuses-free log.


was (Author: lebedev):
I've emulated the effect of SPARK-20923 by removing all 
{{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event 
log. The table below compares uncompressed/compressed sizes of this log with 
and without the patch proposed in this issue:

||Mode||Size||
|LZ4-compressed|2.3G|
|Decompressed|25G|
|LZ4-compressed with patch|2.3G|
|Decompressed with patch|16G|

*Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks 
twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. 
[~andrewor14], [~srowen] do you know if there is a reason for that? It looks 
like a bug to me.


> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-16 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836
 ] 

Sergei Lebedev edited comment on SPARK-22805 at 12/16/17 8:37 PM:
--

I've emulated the effect of SPARK-20923 by removing all 
{{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event 
log. The table below compares uncompressed/compressed sizes of this log with 
and without the patch proposed in this issue:

||Mode||Size||
|LZ4-compressed|2.3G|
|Decompressed|25G|
|LZ4-compressed with patch|2.3G|
|Decompressed with patch|16G|

*Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks 
twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. 
[~andrewor14], [~srowen] do you know if there is a reason for that? It looks 
like a bug to me.



was (Author: lebedev):
I've emulated the effect of SPARK-20923 by removing all 
{{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event 
log. The table below compares uncompressed/compressed sizes of this log with 
and without the patch proposed in this issue:

||Mode||Size||
|LZ4-compressed|2.3G|
|Decompressed|25G|
|LZ4-compressed with patch|2.3G|
|Decompressed with patch|16G|


> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-16 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836
 ] 

Sergei Lebedev commented on SPARK-22805:


I've emulated the effect of SPARK-20923 by removing all 
{{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event 
log. The table below compares uncompressed/compressed sizes of this log with 
and without the patch proposed in this issue:

||Mode||Size||
|LZ4-compressed|2.3G|
|Decompressed|25G|
|LZ4-compressed with patch|2.3G|
|Decompressed with patch|16G|


> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293369#comment-16293369
 ] 

Sergei Lebedev commented on SPARK-22805:


After some investigation, it turns out that the majority of space is taken by 
{{SparkListenerTaskEnd}} which reports block statuses. I suspect the space 
reduction after SPARK-20923 would be less impressive.

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758
 ] 

Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 9:44 PM:
--

I have a patch which preserves backward compatibility. Will post some number a 
bit later.

Also, note that the format is flexible "in theory", in practice, it always 
contains one of the predefined levels.

*Update*: turns out there's {{StorageLevel.apply}}, so "always" above should be 
read as "almost always".


was (Author: lebedev):
I have a patch which preserves backward compatibility. Will post some number a 
bit later.

Also, note that the format is flexible "in theory", in practice, it always 
contains one of the predefined levels.

**Update**: turns out there's {{StorageLevel.apply}}, so "always" above should 
be read as "almost always".

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758
 ] 

Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 9:43 PM:
--

I have a patch which preserves backward compatibility. Will post some number a 
bit later.

Also, note that the format is flexible "in theory", in practice, it always 
contains one of the predefined levels.

**Update**: turns out there's {{StorageLevel.apply}}, so "always" above should 
be read as "almost always".


was (Author: lebedev):
I have a patch which preserves backward compatibility. Will post some number a 
bit later.

Also, note that the format is flexible "in theory", in practice, it always 
contains one of the predefined levels.

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292848#comment-16292848
 ] 

Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 5:25 PM:
--

Here're results for a single application with 6K partitions. Admittedly, this 
is not generalizable to any application, but it gives an idea of the redundancy 
due to {{StorageLevel}}:

|| Mode || Size||
|LZ4-compressed|8.1G|
|Decompressed|79G|
|LZ4-compressed with patch|7.2G|
|Decompressed with patch|49G|


was (Author: lebedev):
Here're results for a single application with 6K partitions:

|| Mode || Size||
|LZ4-compressed|8.1G|
|Decompressed|79G|
|LZ4-compressed with patch|7.2G|
|Decompressed with patch|49G|

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292848#comment-16292848
 ] 

Sergei Lebedev commented on SPARK-22805:


Here're results for a single application with 6K partitions:

|| Mode || Size||
|LZ4-compressed|8.1G|
|Decompressed|79G|
|LZ4-compressed with patch|7.2G|
|Decompressed with patch|49G|

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758
 ] 

Sergei Lebedev commented on SPARK-22805:


I have a patch which preserves backward compatibility. Will post some number a 
bit later.

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758
 ] 

Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 4:28 PM:
--

I have a patch which preserves backward compatibility. Will post some number a 
bit later.

Also, note that the format is flexible "in theory", in practice, it always 
contains one of the predefined levels.


was (Author: lebedev):
I have a patch which preserves backward compatibility. Will post some number a 
bit later.

> Use aliases for StorageLevel in event logs
> --
>
> Key: SPARK-22805
> URL: https://issues.apache.org/jira/browse/SPARK-22805
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.1
>Reporter: Sergei Lebedev
>Priority: Minor
>
> Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
> predefined levels is not extendable (by the users).
> Fact 2: The format of event logs uses redundant representation for storage 
> levels 
> {code}
> >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
> >>> "Replication": 1}')
> 79
> >>> len('DISK_ONLY')
> 9
> {code}
> Fact 3: This leads to excessive log sizes for workloads with lots of 
> partitions, because every partition would have the storage level field which 
> is 60-70 bytes more than it should be.
> Suggested quick win: use the names of the predefined levels to identify them 
> in the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22805) Use aliases for StorageLevel in event logs

2017-12-15 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-22805:
--

 Summary: Use aliases for StorageLevel in event logs
 Key: SPARK-22805
 URL: https://issues.apache.org/jira/browse/SPARK-22805
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.2.1, 2.1.2
Reporter: Sergei Lebedev
Priority: Minor


Fact 1: {{StorageLevel}} has a private constructor, therefore a list of 
predefined levels is not extendable (by the users).

Fact 2: The format of event logs uses redundant representation for storage 
levels 

{code}
>>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, 
>>> "Replication": 1}')
79
>>> len('DISK_ONLY')
9
{code}

Fact 3: This leads to excessive log sizes for workloads with lots of 
partitions, because every partition would have the storage level field which is 
60-70 bytes more than it should be.

Suggested quick win: use the names of the predefined levels to identify them in 
the event log.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-17 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257809#comment-16257809
 ] 

Sergei Lebedev commented on SPARK-19371:


> Usually the answer is to force a shuffle [...]

[~srowen] we are seeing exactly the same imbalance as a result of a shuffle. 
From the Executors page, it looks like a couple of executors get much more 
"reduce" tasks than the others. Does this sound like a possible scenario?



> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: RDD Block Distribution on two executors.png, Unbalanced 
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and 
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle

2017-10-09 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197275#comment-16197275
 ] 

Sergei Lebedev commented on SPARK-7:


Sidenote: the trace above is caused by the temporary file created by 
[SortShuffleWriter|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala#L69].
 Sometimes we also saw failures containing {{TempShuffleBlockId}} names.

> DiskBlockManager.getAllBlocks could fail if called during shuffle
> -
>
> Key: SPARK-7
> URL: https://issues.apache.org/jira/browse/SPARK-7
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.2.0
>Reporter: Sergei Lebedev
>Priority: Minor
>
> {{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the 
> block manager only contains files corresponding to "valid" block IDs, i.e. 
> those parsable via {{BlockId.apply}}. This is not always the case as 
> demonstrated by the following snippet
> {code}
> object GetAllBlocksFailure {
>   def main(args: Array[String]): Unit = {
> val sc = new SparkContext(new SparkConf()
> .setMaster("local[*]")
> .setAppName("demo"))
> new Thread {
>   override def run(): Unit = {
> while (true) {
>   
> println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length)
>   Thread.sleep(10)
> }
>   }
> }.start()
> val rdd = sc.range(1, 65536, numSlices = 10)
> .map(x => (x % 4096, x))
> .persist(StorageLevel.DISK_ONLY)
> .reduceByKey { _ + _ }
> .collect()
>   }
> }
> {code}
> We have a thread computing the number of bytes occupied by the block manager 
> on-disk and it frequently crashes due to this assumption being violated. 
> Relevant part of the stacktrace
> {code}
> 2017-10-06 11:20:14,287 ERROR  
> org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
> java.lang.IllegalStateException: Unrecognized BlockId: 
> shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be
> at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at 
> org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle

2017-10-09 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-7:
--

 Summary: DiskBlockManager.getAllBlocks could fail if called during 
shuffle
 Key: SPARK-7
 URL: https://issues.apache.org/jira/browse/SPARK-7
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 2.2.0
Reporter: Sergei Lebedev
Priority: Minor


{{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the 
block manager only contains files corresponding to "valid" block IDs, i.e. 
those parsable via {{BlockId.apply}}. This is not always the case as 
demonstrated by the following snippet

{code}
object GetAllBlocksFailure {
  def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf()
.setMaster("local[*]")
.setAppName("demo"))

new Thread {
  override def run(): Unit = {
while (true) {
  
println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length)
  Thread.sleep(10)
}
  }
}.start()

val rdd = sc.range(1, 65536, numSlices = 10)
.map(x => (x % 4096, x))
.persist(StorageLevel.DISK_ONLY)
.reduceByKey { _ + _ }
.collect()
  }
}
{code}

We have a thread computing the number of bytes occupied by the block manager 
on-disk and it frequently crashes due to this assumption being violated. 
Relevant part of the stacktrace

{code}
2017-10-06 11:20:14,287 ERROR  
org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
java.lang.IllegalStateException: Unrecognized BlockId: 
shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be
at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133)
at 
org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
at 
org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22147) BlockId.hashCode allocates a StringBuilder/String on each call

2017-09-27 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-22147:
--

 Summary: BlockId.hashCode allocates a StringBuilder/String on each 
call
 Key: SPARK-22147
 URL: https://issues.apache.org/jira/browse/SPARK-22147
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 2.2.0
Reporter: Sergei Lebedev
Priority: Minor


The base class {{BlockId}} 
[defines|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala#L44]
 {{hashCode}} and {{equals}} for all its subclasses in terms of {{name}}. This 
makes the definitions of different ID types [very 
concise|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala#L52].
 The downside, however, is redundant allocations. While I don't think this 
could be the major issue, it is still a bit disappointing to increase GC 
pressure on the driver for nothing. For our machine learning workloads, we've 
seen as much as 10% of all allocations on the driver coming from 
{{BlockId.hashCode}} calls done for 
[BlockManagerMasterEndpoint.blockLocations|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L54].



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-792) PairRDDFunctions should expect Product2 instead of Tuple2

2017-09-22 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176268#comment-16176268
 ] 

Sergei Lebedev commented on SPARK-792:
--

It looks like the change in the title was never implemented. Does anyone know 
why?

> PairRDDFunctions should expect Product2 instead of Tuple2
> -
>
> Key: SPARK-792
> URL: https://issues.apache.org/jira/browse/SPARK-792
> Project: Spark
>  Issue Type: Improvement
>Reporter: Reynold Xin
>Assignee: Reynold Xin
> Fix For: 0.8.0
>
>
> PairRDDFunctions expect an RDD of Tuple2 (key, value) pair. If we change it 
> to expect Product2, we can apply for optimizations such as reducing the 
> number of temporary objects allocated and a faster runtime.
> Product2 is just a trait in Scala, and users can define classes that extend 
> Product2. Two use cases in particular I have in mind:
> 1. In MappedRDD or MappedPartitionRDD, the iterator could reuse a concrete 
> Product2 object and always return that; but in every next() call, the 
> iterator updates the content of the Product2.
> 2. In some Spark cases (such as Shark, GraphX), the key field is only used to 
> partition the data. A concrete implementation of Product2 can have the key 
> field (_1) marked as transient, and thus reducing the amount of shuffle data 
> sent across the network between map and reduce stages.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22062) BlockManager does not account for memory consumed by remote fetches

2017-09-19 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-22062:
--

 Summary: BlockManager does not account for memory consumed by 
remote fetches
 Key: SPARK-22062
 URL: https://issues.apache.org/jira/browse/SPARK-22062
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 2.2.0
Reporter: Sergei Lebedev
Priority: Minor


We use Spark exclusively with {{StorageLevel.DiskOnly}} as our workloads are 
very sensitive to memory usage. Recently, we've spotted that the jobs sometimes 
OOM leaving lots of byte[] arrays on the heap. Upon further investigation, 
we've found that the arrays come from {{BlockManager.getRemoteBytes}}, which 
[calls|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L638]
 {{BlockTransferService.fetchBlockSync}}, which in its turn would 
[allocate|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala#L99]
 an on-heap {{ByteBuffer}} of the same size as the block (e.g. full partition), 
if the block was successfully retrieved over the network.

This memory is not accounted towards Spark storage/execution memory and could 
potentially lead to OOM if {{BlockManager}} fetches too many partitions in 
parallel. I wonder if this is intentional behaviour, or in fact a bug?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17901) NettyRpcEndpointRef: Error sending message and Caused by: java.util.ConcurrentModificationException

2017-09-06 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155707#comment-16155707
 ] 

Sergei Lebedev edited comment on SPARK-17901 at 9/6/17 5:09 PM:


[~srowen], I think this issue could've been closed by mistake: the stack trace 
is different from SPARK-17816. Could you reopen?


was (Author: lebedev):
[~srowen] I think this issue could've been closed by mistake: the stack trace 
is different from SPARK-17816. Could you reopen?

> NettyRpcEndpointRef: Error sending message and Caused by: 
> java.util.ConcurrentModificationException
> ---
>
> Key: SPARK-17901
> URL: https://issues.apache.org/jira/browse/SPARK-17901
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.1
>Reporter: Harish
>
> I have 2 data frames one with 10K rows and 10,000 columns and another with 4M 
> rows with 50 columns. I joined this and trying to find mean of merged data 
> set,
> i calculated the mean using lamda using python mean() function. I cant write 
> in pyspark due to 64KB code limit issue.
> After calculating the mean i did rdd.take(2). it works.But creating the DF 
> from RDD and DF.show is progress for more than 2 hours (I stopped the 
> process) with below message  (102 GB , 6 cores per node -- total 10 nodes+ 
> 1master)
> 16/10/13 04:36:31 WARN NettyRpcEndpointRef: Error sending message [message = 
> Heartbeat(9,[Lscala.Tuple2;@68eedafb,BlockManagerId(9, 10.63.136.103, 
> 35729))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>   at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>   at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>   at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
>   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>   at java.util.ArrayList.writeObject(ArrayList.java:766)
>   at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>   at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
>   at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> 

[jira] [Comment Edited] (SPARK-17901) NettyRpcEndpointRef: Error sending message and Caused by: java.util.ConcurrentModificationException

2017-09-06 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155707#comment-16155707
 ] 

Sergei Lebedev edited comment on SPARK-17901 at 9/6/17 5:09 PM:


[~srowen] I think this issue could've been closed by mistake: the stack trace 
is different from SPARK-17816. Could you reopen?


was (Author: lebedev):
I think this issue could've been closed by mistake: the stack trace is 
different from SPARK-17816. Could you reopen?

> NettyRpcEndpointRef: Error sending message and Caused by: 
> java.util.ConcurrentModificationException
> ---
>
> Key: SPARK-17901
> URL: https://issues.apache.org/jira/browse/SPARK-17901
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.1
>Reporter: Harish
>
> I have 2 data frames one with 10K rows and 10,000 columns and another with 4M 
> rows with 50 columns. I joined this and trying to find mean of merged data 
> set,
> i calculated the mean using lamda using python mean() function. I cant write 
> in pyspark due to 64KB code limit issue.
> After calculating the mean i did rdd.take(2). it works.But creating the DF 
> from RDD and DF.show is progress for more than 2 hours (I stopped the 
> process) with below message  (102 GB , 6 cores per node -- total 10 nodes+ 
> 1master)
> 16/10/13 04:36:31 WARN NettyRpcEndpointRef: Error sending message [message = 
> Heartbeat(9,[Lscala.Tuple2;@68eedafb,BlockManagerId(9, 10.63.136.103, 
> 35729))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>   at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>   at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>   at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
>   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>   at java.util.ArrayList.writeObject(ArrayList.java:766)
>   at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>   at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
>   at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> 

[jira] [Commented] (SPARK-17901) NettyRpcEndpointRef: Error sending message and Caused by: java.util.ConcurrentModificationException

2017-09-06 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155707#comment-16155707
 ] 

Sergei Lebedev commented on SPARK-17901:


I think this issue could've been closed by mistake: the stack trace is 
different from SPARK-17816. Could you reopen?

> NettyRpcEndpointRef: Error sending message and Caused by: 
> java.util.ConcurrentModificationException
> ---
>
> Key: SPARK-17901
> URL: https://issues.apache.org/jira/browse/SPARK-17901
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.1
>Reporter: Harish
>
> I have 2 data frames one with 10K rows and 10,000 columns and another with 4M 
> rows with 50 columns. I joined this and trying to find mean of merged data 
> set,
> i calculated the mean using lamda using python mean() function. I cant write 
> in pyspark due to 64KB code limit issue.
> After calculating the mean i did rdd.take(2). it works.But creating the DF 
> from RDD and DF.show is progress for more than 2 hours (I stopped the 
> process) with below message  (102 GB , 6 cores per node -- total 10 nodes+ 
> 1master)
> 16/10/13 04:36:31 WARN NettyRpcEndpointRef: Error sending message [message = 
> Heartbeat(9,[Lscala.Tuple2;@68eedafb,BlockManagerId(9, 10.63.136.103, 
> 35729))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>   at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>   at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>   at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>   at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
>   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>   at java.util.ArrayList.writeObject(ArrayList.java:766)
>   at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>   at 
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
>   at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> 

[jira] [Commented] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable

2017-04-11 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964463#comment-15964463
 ] 

Sergei Lebedev commented on SPARK-20284:


Yes, Scala is a bad citizen in the JVM land and comes w/o any support for 
try-with-resources. IIUC scala-arm would manage just fine without Closeable 
because it uses structural types. However, I think there is no reason not to 
implement Closeable/AutoCloseable even if Spark/Scala code does not need this.

> Make SerializationStream and DeserializationStream extend Closeable
> ---
>
> Key: SPARK-20284
> URL: https://issues.apache.org/jira/browse/SPARK-20284
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.6.3, 2.1.0
>Reporter: Sergei Lebedev
>Priority: Trivial
>
> Both {{SerializationStream}} and {{DeserializationStream}} implement 
> {{close}} but do not extend {{Closeable}}. As a result, these streams cannot 
> be used in try-with-resources.
> Was this intentional or rather nobody ever needed that?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable

2017-04-11 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964257#comment-15964257
 ] 

Sergei Lebedev edited comment on SPARK-20284 at 4/11/17 11:57 AM:
--

It makes the stream well-behaved for any JVM language, e.g. pure-Java or 
[Kotlin|https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html]. 


was (Author: lebedev):
It makes the stream well-behaved for any JVM user, e.g. pure-Java or 
[Kotlin|https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html]. 

> Make SerializationStream and DeserializationStream extend Closeable
> ---
>
> Key: SPARK-20284
> URL: https://issues.apache.org/jira/browse/SPARK-20284
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.6.3, 2.1.0
>Reporter: Sergei Lebedev
>Priority: Trivial
>
> Both {{SerializationStream}} and {{DeserializationStream}} implement 
> {{close}} but do not extend {{Closeable}}. As a result, these streams cannot 
> be used in try-with-resources.
> Was this intentional or rather nobody ever needed that?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable

2017-04-11 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964257#comment-15964257
 ] 

Sergei Lebedev commented on SPARK-20284:


It makes the stream well-behaved for any JVM user, e.g. pure-Java or 
[Kotlin|https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html]. 

> Make SerializationStream and DeserializationStream extend Closeable
> ---
>
> Key: SPARK-20284
> URL: https://issues.apache.org/jira/browse/SPARK-20284
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.6.3, 2.1.0
>Reporter: Sergei Lebedev
>Priority: Trivial
>
> Both {{SerializationStream}} and {{DeserializationStream}} implement 
> {{close}} but do not extend {{Closeable}}. As a result, these streams cannot 
> be used in try-with-resources.
> Was this intentional or rather nobody ever needed that?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable

2017-04-10 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-20284:
--

 Summary: Make SerializationStream and DeserializationStream extend 
Closeable
 Key: SPARK-20284
 URL: https://issues.apache.org/jira/browse/SPARK-20284
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.1.0, 1.6.3
Reporter: Sergei Lebedev
Priority: Minor


Both {{SerializationStream}} and {{DeserializationStream}} implement {{close}} 
but do not extend {{Closeable}}. As a result, these streams cannot be used in 
try-with-resources.

Was this intentional or rather nobody ever needed that?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19353) Support binary I/O in PipedRDD

2017-02-03 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15852498#comment-15852498
 ] 

Sergei Lebedev commented on SPARK-19353:


For reference: we have a fully backward-compatible 
[implementation|https://github.com/criteo-forks/spark/pull/26] of binary 
PipedRDD in our GitHub fork.

> Support binary I/O in PipedRDD
> --
>
> Key: SPARK-19353
> URL: https://issues.apache.org/jira/browse/SPARK-19353
> Project: Spark
>  Issue Type: Improvement
>Reporter: Sergei Lebedev
>Priority: Minor
>
> The current design of RDD.pipe is very restrictive. 
> It is line-based, each element of the input RDD [gets 
> serialized|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L143]
>  into one or more lines. Similarly for the output of the child process, one 
> line corresponds to a single element of the output RDD. 
> It allows to customize the output format via {{printRDDElement}}, but not the 
> input format.
> It is not designed for extensibility. The only way to get a "BinaryPipedRDD" 
> is to copy/paste most of it and change the relevant parts.
> These limitations have been discussed on 
> [SO|http://stackoverflow.com/questions/27986830/how-to-pipe-binary-data-in-apache-spark]
>  and the mailing list, but alas no issue has been created.
> A possible solution to at least the first two limitations is to factor out 
> the format into a separate object (or objects). For instance, {{InputWriter}} 
> and {{OutputReader}}, following Hadoop streaming API. 
> {code}
> trait InputWriter[T] {
> def write(os: OutputStream, elem: T)
> }
> trait OutputReader[T] {
> def read(is: InputStream): T
> }
> {code}
> The default configuration would be to write and read in line-based format, 
> but the users will also be able to selectively swap those to the appropriate 
> implementations.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19353) Support binary I/O in PipedRDD

2017-01-24 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-19353:
--

 Summary: Support binary I/O in PipedRDD
 Key: SPARK-19353
 URL: https://issues.apache.org/jira/browse/SPARK-19353
 Project: Spark
  Issue Type: Improvement
Reporter: Sergei Lebedev
Priority: Minor


The current design of RDD.pipe is very restrictive. 

It is line-based, each element of the input RDD [gets 
serialized|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L143]
 into one or more lines. Similarly for the output of the child process, one 
line corresponds to a single element of the output RDD. 

It allows to customize the output format via {{printRDDElement}}, but not the 
input format.

It is not designed for extensibility. The only way to get a "BinaryPipedRDD" is 
to copy/paste most of it and change the relevant parts.

These limitations have been discussed on 
[SO|http://stackoverflow.com/questions/27986830/how-to-pipe-binary-data-in-apache-spark]
 and the mailing list, but alas no issue has been created.

A possible solution to at least the first two limitations is to factor out the 
format into a separate object (or objects). For instance, {{InputWriter}} and 
{{OutputReader}}, following Hadoop streaming API. 

{code}
trait InputWriter[T] {
def write(os: OutputStream, elem: T)
}

trait OutputReader[T] {
def read(is: InputStream): T
}
{code}

The default configuration would be to write and read in line-based format, but 
the users will also be able to selectively swap those to the appropriate 
implementations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19010) Include Kryo exception in case of overflow

2016-12-27 Thread Sergei Lebedev (JIRA)
Sergei Lebedev created SPARK-19010:
--

 Summary: Include Kryo exception in case of overflow
 Key: SPARK-19010
 URL: https://issues.apache.org/jira/browse/SPARK-19010
 Project: Spark
  Issue Type: Improvement
Affects Versions: 1.6.3
Reporter: Sergei Lebedev


SPARK-6087 replaced Kryo overflow exception with a SparkException giving 
Spark-specific instructions on tackling the issue. The implementation also 
[suppressed|https://github.com/apache/spark/pull/4947/files#diff-1f81c62dad0e2dfc387a974bb08c497cR165]
 the original Kryo exception, which made it impossible to trace the operation 
causing the overflow. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org