[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses
[ https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447859#comment-17447859 ] Sergei Lebedev commented on SPARK-22674: Hi [~hyukjin.kwon], very happy to see you working on this again! I've noticed that you've marked this issue as a duplicate of SPARK-32079, which is newer. Shouldn't it be the other way round, i.e. SPARK-32079 marked as a duplicate of this issue? > PySpark breaks serialization of namedtuple subclasses > - > > Key: SPARK-22674 > URL: https://issues.apache.org/jira/browse/SPARK-22674 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0, 2.3.0, 3.1.1 >Reporter: Jonas Amrich >Priority: Major > > Pyspark monkey patches the namedtuple class to make it serializable, however > this breaks serialization of its subclasses. With current implementation, any > subclass will be serialized (and deserialized) as it's parent namedtuple. > Consider this code, which will fail with {{AttributeError: 'Point' object has > no attribute 'sum'}}: > {code} > from collections import namedtuple > Point = namedtuple("Point", "x y") > class PointSubclass(Point): > def sum(self): > return self.x + self.y > rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]]) > rdd.collect()[0][0].sum() > {code} > Moreover, as PySpark hijacks all namedtuples in the main module, importing > pyspark breaks serialization of namedtuple subclasses even in code which is > not related to spark / distributed execution. I don't see any clean solution > to this; a possible workaround may be to limit serialization hack only to > direct namedtuple subclasses like in > https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204 -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22338) namedtuple serialization is inefficient
[ https://issues.apache.org/jira/browse/SPARK-22338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458128#comment-16458128 ] Sergei Lebedev commented on SPARK-22338: [This|https://github.com/apache/spark/pull/21180] PR fixes the issue for namedtuples defined in modules (as opposed to the ones defined inside functions or in the REPL). > namedtuple serialization is inefficient > --- > > Key: SPARK-22338 > URL: https://issues.apache.org/jira/browse/SPARK-22338 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Joel Croteau >Priority: Minor > > I greatly appreciate the level of hack that PySpark contains in order to make > namedtuples serializable, but I feel like it could be done a little better. > In particular, say I create a namedtuple class with a few long argument names > like this: > {code:JiraShouldReallySupportPython} > MyTuple = namedtuple('MyTuple', ('longarga', 'longargb', 'longargc')) > {code} > If a create an instance of this, here is how PySpark serializes it: > {code:JiraShouldReallySupportPython} > mytuple = MyTuple(1, 2, 3) > pickle.dumps(mytuple, pickle.HIGHEST_PROTOCOL) > b'\x80\x04\x95]\x00\x00\x00\x00\x00\x00\x00\x8c\x13pyspark.serializers\x94\x8c\x08_restore\x94\x93\x94\x8c\x07MyTuple\x94\x8c\x08longarga\x94\x8c\x08longargb\x94\x8c\x08longargc\x94\x87\x94K\x01K\x02K\x03\x87\x94\x87\x94R\x94.' > {code} > This serialization includes the name of the namedtuple class, the names of > each of its members, as well as references to internal functions in > pyspark.serializers. By comparison, this is what I get if I serialize the > bare tuple: > {code:JiraShouldReallySupportPython} > shorttuple = (1,2,3) > pickle.dumps(shorttuple, pickle.HIGHEST_PROTOCOL) > b'\x80\x04\x95\t\x00\x00\x00\x00\x00\x00\x00K\x01K\x02K\x03\x87\x94.' > {code} > Much shorter. For another comparison, here is what it looks like if I build a > dict with the same data and element names: > {code:JiraShouldReallySupportPython} > mydict = {'longarga':1, 'longargb':2, 'longargc':3} > pickle.dumps(mydict, pickle.HIGHEST_PROTOCOL) > b'\x80\x04\x95,\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x08longarga\x94K\x01\x8c\x08longargb\x94K\x02\x8c\x08longargc\x94K\x03u.' > {code} > In other words, even using a dict is substantially shorter than using a > namedtuple in its current form. There shouldn't be any need for namedtuples > to have this much overhead in their serialization. For one thing, if the > class object is being broadcast to the nodes, there should be no need for > each namedtuple instance to include all of the field names; the class name > should be enough. If you use namedtuples heavily, this can create a lot of > overhead in memory and disk use. I am going to try and improve the > serialization and submit a patch if I can find the time, but I don't know the > pyspark code too well, so if anyone has suggestions for where to start, I > would love to hear them. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses
[ https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399488#comment-16399488 ] Sergei Lebedev commented on SPARK-22674: [~jonasamrich] I see your point, but the current implementation has known limitations, which make it "breaking" as is. > PySpark breaks serialization of namedtuple subclasses > - > > Key: SPARK-22674 > URL: https://issues.apache.org/jira/browse/SPARK-22674 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0, 2.3.0 >Reporter: Jonas Amrich >Priority: Major > > Pyspark monkey patches the namedtuple class to make it serializable, however > this breaks serialization of its subclasses. With current implementation, any > subclass will be serialized (and deserialized) as it's parent namedtuple. > Consider this code, which will fail with {{AttributeError: 'Point' object has > no attribute 'sum'}}: > {code} > from collections import namedtuple > Point = namedtuple("Point", "x y") > class PointSubclass(Point): > def sum(self): > return self.x + self.y > rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]]) > rdd.collect()[0][0].sum() > {code} > Moreover, as PySpark hijacks all namedtuples in the main module, importing > pyspark breaks serialization of namedtuple subclasses even in code which is > not related to spark / distributed execution. I don't see any clean solution > to this; a possible workaround may be to limit serialization hack only to > direct namedtuple subclasses like in > https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses
[ https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396948#comment-16396948 ] Sergei Lebedev commented on SPARK-22674: Just out of curiosity: what is the original motivation for hacking the namedtuple serialization? I've been exploring the quirks caused by {{_hijack_namedtuple}}, and I feel PySpark might be better off without it. Some of the reasons are: * it silently incurs a performance cost for namedtuples defined outside of __main__ (both user-defined and third-party); * it leads to confusing and hard to debug error messages in the presence of inheritance; * it makes namedtuple classes different from normal classes, because the latter cannot be serialized if defined in the REPL. What do you think? > PySpark breaks serialization of namedtuple subclasses > - > > Key: SPARK-22674 > URL: https://issues.apache.org/jira/browse/SPARK-22674 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Jonas Amrich >Priority: Major > > Pyspark monkey patches the namedtuple class to make it serializable, however > this breaks serialization of its subclasses. With current implementation, any > subclass will be serialized (and deserialized) as it's parent namedtuple. > Consider this code, which will fail with {{AttributeError: 'Point' object has > no attribute 'sum'}}: > {code} > from collections import namedtuple > Point = namedtuple("Point", "x y") > class PointSubclass(Point): > def sum(self): > return self.x + self.y > rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]]) > rdd.collect()[0][0].sum() > {code} > Moreover, as PySpark hijacks all namedtuples in the main module, importing > pyspark breaks serialization of namedtuple subclasses even in code which is > not related to spark / distributed execution. I don't see any clean solution > to this; a possible workaround may be to limit serialization hack only to > direct namedtuple subclasses like in > https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294745#comment-16294745 ] Sergei Lebedev commented on SPARK-22805: It does, in a sense that 1.6 history server can no longer read 2.X event logs. > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294230#comment-16294230 ] Sergei Lebedev commented on SPARK-22805: It doesn't save much storage if block updates tracking is turned off (default, starting from 2.3). If it is on, it saves ~50%. What do you think? Should we kill this ticket? > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20970) Deprecate TaskMetrics._updatedBlockStatuses
[ https://issues.apache.org/jira/browse/SPARK-20970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293965#comment-16293965 ] Sergei Lebedev commented on SPARK-20970: Not only does it use a lot of memory on the driver, but it also bloats the event logs, see SPARK-22805. > Deprecate TaskMetrics._updatedBlockStatuses > --- > > Key: SPARK-20970 > URL: https://issues.apache.org/jira/browse/SPARK-20970 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Thomas Graves > > TaskMetrics._updatedBlockStatuses isn't used anywhere internally by spark. It > could be used by users though since its exposed by SparkListenerTaskEnd. We > made it configurable to turn off the tracking of it since it uses a lot of > memory in https://issues.apache.org/jira/browse/SPARK-20923. That config is > still true for backwards compatibility. We should turn that to false in next > release and deprecate that api altogether. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836 ] Sergei Lebedev edited comment on SPARK-22805 at 12/16/17 11:17 PM: --- I've emulated the effect of SPARK-20923 by removing all {{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event log. The table below compares uncompressed/compressed sizes of this log with and without the patch proposed in this issue: ||Mode||Size|| |Decompressed|157M| |Decompressed with patch|155M| - *Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. [~andrewor14], [~srowen] do you know if there is a reason for that? It looks like a bug to me. *Update*: I'm recomputing the numbers for a fully updatedBlockStatuses-free log. *Update*: the effect of SPARK-20923 is much more noticeable than I thought initially. Removing {{"internal.metrics.updatedBlockStatuses"}} from {{"Accumulables"}} and {{"Updated Blocks"}} from {{"Task Metrics"}} reduced the log size to 160M. The storage level compression now just shaves of a few M (see updated table). was (Author: lebedev): I've emulated the effect of SPARK-20923 by removing all {{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event log. The table below compares uncompressed/compressed sizes of this log with and without the patch proposed in this issue: ||Mode||Size|| |LZ4-compressed|2.3G| |Decompressed|25G| |LZ4-compressed with patch|2.3G| |Decompressed with patch|16G| *Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. [~andrewor14], [~srowen] do you know if there is a reason for that? It looks like a bug to me. *Update*: I'm recomputing the numbers for a fully updatedBlockStatuses-free log. > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836 ] Sergei Lebedev edited comment on SPARK-22805 at 12/16/17 10:54 PM: --- I've emulated the effect of SPARK-20923 by removing all {{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event log. The table below compares uncompressed/compressed sizes of this log with and without the patch proposed in this issue: ||Mode||Size|| |LZ4-compressed|2.3G| |Decompressed|25G| |LZ4-compressed with patch|2.3G| |Decompressed with patch|16G| *Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. [~andrewor14], [~srowen] do you know if there is a reason for that? It looks like a bug to me. *Update*: I'm recomputing the numbers for a fully updatedBlockStatuses-free log. was (Author: lebedev): I've emulated the effect of SPARK-20923 by removing all {{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event log. The table below compares uncompressed/compressed sizes of this log with and without the patch proposed in this issue: ||Mode||Size|| |LZ4-compressed|2.3G| |Decompressed|25G| |LZ4-compressed with patch|2.3G| |Decompressed with patch|16G| *Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. [~andrewor14], [~srowen] do you know if there is a reason for that? It looks like a bug to me. > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836 ] Sergei Lebedev edited comment on SPARK-22805 at 12/16/17 8:37 PM: -- I've emulated the effect of SPARK-20923 by removing all {{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event log. The table below compares uncompressed/compressed sizes of this log with and without the patch proposed in this issue: ||Mode||Size|| |LZ4-compressed|2.3G| |Decompressed|25G| |LZ4-compressed with patch|2.3G| |Decompressed with patch|16G| *Update*: turns out {{SparkTaskEndEvent}} carries the list of updated blocks twice (!): as part of the {{"Accumulables"}} and in {{"Task Metrics"}}. [~andrewor14], [~srowen] do you know if there is a reason for that? It looks like a bug to me. was (Author: lebedev): I've emulated the effect of SPARK-20923 by removing all {{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event log. The table below compares uncompressed/compressed sizes of this log with and without the patch proposed in this issue: ||Mode||Size|| |LZ4-compressed|2.3G| |Decompressed|25G| |LZ4-compressed with patch|2.3G| |Decompressed with patch|16G| > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293836#comment-16293836 ] Sergei Lebedev commented on SPARK-22805: I've emulated the effect of SPARK-20923 by removing all {{"internal.metrics.updatedBlockStatuses"}} entries from the original 79G event log. The table below compares uncompressed/compressed sizes of this log with and without the patch proposed in this issue: ||Mode||Size|| |LZ4-compressed|2.3G| |Decompressed|25G| |LZ4-compressed with patch|2.3G| |Decompressed with patch|16G| > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293369#comment-16293369 ] Sergei Lebedev commented on SPARK-22805: After some investigation, it turns out that the majority of space is taken by {{SparkListenerTaskEnd}} which reports block statuses. I suspect the space reduction after SPARK-20923 would be less impressive. > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758 ] Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 9:44 PM: -- I have a patch which preserves backward compatibility. Will post some number a bit later. Also, note that the format is flexible "in theory", in practice, it always contains one of the predefined levels. *Update*: turns out there's {{StorageLevel.apply}}, so "always" above should be read as "almost always". was (Author: lebedev): I have a patch which preserves backward compatibility. Will post some number a bit later. Also, note that the format is flexible "in theory", in practice, it always contains one of the predefined levels. **Update**: turns out there's {{StorageLevel.apply}}, so "always" above should be read as "almost always". > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758 ] Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 9:43 PM: -- I have a patch which preserves backward compatibility. Will post some number a bit later. Also, note that the format is flexible "in theory", in practice, it always contains one of the predefined levels. **Update**: turns out there's {{StorageLevel.apply}}, so "always" above should be read as "almost always". was (Author: lebedev): I have a patch which preserves backward compatibility. Will post some number a bit later. Also, note that the format is flexible "in theory", in practice, it always contains one of the predefined levels. > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292848#comment-16292848 ] Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 5:25 PM: -- Here're results for a single application with 6K partitions. Admittedly, this is not generalizable to any application, but it gives an idea of the redundancy due to {{StorageLevel}}: || Mode || Size|| |LZ4-compressed|8.1G| |Decompressed|79G| |LZ4-compressed with patch|7.2G| |Decompressed with patch|49G| was (Author: lebedev): Here're results for a single application with 6K partitions: || Mode || Size|| |LZ4-compressed|8.1G| |Decompressed|79G| |LZ4-compressed with patch|7.2G| |Decompressed with patch|49G| > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292848#comment-16292848 ] Sergei Lebedev commented on SPARK-22805: Here're results for a single application with 6K partitions: || Mode || Size|| |LZ4-compressed|8.1G| |Decompressed|79G| |LZ4-compressed with patch|7.2G| |Decompressed with patch|49G| > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758 ] Sergei Lebedev commented on SPARK-22805: I have a patch which preserves backward compatibility. Will post some number a bit later. > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22805) Use aliases for StorageLevel in event logs
[ https://issues.apache.org/jira/browse/SPARK-22805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292758#comment-16292758 ] Sergei Lebedev edited comment on SPARK-22805 at 12/15/17 4:28 PM: -- I have a patch which preserves backward compatibility. Will post some number a bit later. Also, note that the format is flexible "in theory", in practice, it always contains one of the predefined levels. was (Author: lebedev): I have a patch which preserves backward compatibility. Will post some number a bit later. > Use aliases for StorageLevel in event logs > -- > > Key: SPARK-22805 > URL: https://issues.apache.org/jira/browse/SPARK-22805 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.2, 2.2.1 >Reporter: Sergei Lebedev >Priority: Minor > > Fact 1: {{StorageLevel}} has a private constructor, therefore a list of > predefined levels is not extendable (by the users). > Fact 2: The format of event logs uses redundant representation for storage > levels > {code} > >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, > >>> "Replication": 1}') > 79 > >>> len('DISK_ONLY') > 9 > {code} > Fact 3: This leads to excessive log sizes for workloads with lots of > partitions, because every partition would have the storage level field which > is 60-70 bytes more than it should be. > Suggested quick win: use the names of the predefined levels to identify them > in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22805) Use aliases for StorageLevel in event logs
Sergei Lebedev created SPARK-22805: -- Summary: Use aliases for StorageLevel in event logs Key: SPARK-22805 URL: https://issues.apache.org/jira/browse/SPARK-22805 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.2.1, 2.1.2 Reporter: Sergei Lebedev Priority: Minor Fact 1: {{StorageLevel}} has a private constructor, therefore a list of predefined levels is not extendable (by the users). Fact 2: The format of event logs uses redundant representation for storage levels {code} >>> len('{"Use Disk": true, "Use Memory": false, "Deserialized": true, >>> "Replication": 1}') 79 >>> len('DISK_ONLY') 9 {code} Fact 3: This leads to excessive log sizes for workloads with lots of partitions, because every partition would have the storage level field which is 60-70 bytes more than it should be. Suggested quick win: use the names of the predefined levels to identify them in the event log. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors
[ https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257809#comment-16257809 ] Sergei Lebedev commented on SPARK-19371: > Usually the answer is to force a shuffle [...] [~srowen] we are seeing exactly the same imbalance as a result of a shuffle. From the Executors page, it looks like a couple of executors get much more "reduce" tasks than the others. Does this sound like a possible scenario? > Cannot spread cached partitions evenly across executors > --- > > Key: SPARK-19371 > URL: https://issues.apache.org/jira/browse/SPARK-19371 > Project: Spark > Issue Type: Bug >Affects Versions: 1.6.1 >Reporter: Thunder Stumpges > Attachments: RDD Block Distribution on two executors.png, Unbalanced > RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and > resulting task imbalance.png, execution timeline.png > > > Before running an intensive iterative job (in this case a distributed topic > model training), we need to load a dataset and persist it across executors. > After loading from HDFS and persisting, the partitions are spread unevenly > across executors (based on the initial scheduling of the reads which are not > data locale sensitive). The partition sizes are even, just not their > distribution over executors. We currently have no way to force the partitions > to spread evenly, and as the iterative algorithm begins, tasks are > distributed to executors based on this initial load, forcing some very > unbalanced work. > This has been mentioned a > [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059] > of > [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html] > in > [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html] > user/dev group threads. > None of the discussions I could find had solutions that worked for me. Here > are examples of things I have tried. All resulted in partitions in memory > that were NOT evenly distributed to executors, causing future tasks to be > imbalanced across executors as well. > *Reduce Locality* > {code}spark.shuffle.reduceLocality.enabled=false/true{code} > *"Legacy" memory mode* > {code}spark.memory.useLegacyMode = true/false{code} > *Basic load and repartition* > {code} > val numPartitions = 48*16 > val df = sqlContext.read. > parquet("/data/folder_to_load"). > repartition(numPartitions). > persist > df.count > {code} > *Load and repartition to 2x partitions, then shuffle repartition down to > desired partitions* > {code} > val numPartitions = 48*16 > val df2 = sqlContext.read. > parquet("/data/folder_to_load"). > repartition(numPartitions*2) > val df = df2.repartition(numPartitions). > persist > df.count > {code} > It would be great if when persisting an RDD/DataFrame, if we could request > that those partitions be stored evenly across executors in preparation for > future tasks. > I'm not sure if this is a more general issue (I.E. not just involving > persisting RDDs), but for the persisted in-memory case, it can make a HUGE > difference in the over-all running time of the remaining work. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle
[ https://issues.apache.org/jira/browse/SPARK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197275#comment-16197275 ] Sergei Lebedev commented on SPARK-7: Sidenote: the trace above is caused by the temporary file created by [SortShuffleWriter|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala#L69]. Sometimes we also saw failures containing {{TempShuffleBlockId}} names. > DiskBlockManager.getAllBlocks could fail if called during shuffle > - > > Key: SPARK-7 > URL: https://issues.apache.org/jira/browse/SPARK-7 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.2.0 >Reporter: Sergei Lebedev >Priority: Minor > > {{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the > block manager only contains files corresponding to "valid" block IDs, i.e. > those parsable via {{BlockId.apply}}. This is not always the case as > demonstrated by the following snippet > {code} > object GetAllBlocksFailure { > def main(args: Array[String]): Unit = { > val sc = new SparkContext(new SparkConf() > .setMaster("local[*]") > .setAppName("demo")) > new Thread { > override def run(): Unit = { > while (true) { > > println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length) > Thread.sleep(10) > } > } > }.start() > val rdd = sc.range(1, 65536, numSlices = 10) > .map(x => (x % 4096, x)) > .persist(StorageLevel.DISK_ONLY) > .reduceByKey { _ + _ } > .collect() > } > } > {code} > We have a thread computing the number of bytes occupied by the block manager > on-disk and it frequently crashes due to this assumption being violated. > Relevant part of the stacktrace > {code} > 2017-10-06 11:20:14,287 ERROR > org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main] > java.lang.IllegalStateException: Unrecognized BlockId: > shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be > at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133) > at > org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103) > at > org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22227) DiskBlockManager.getAllBlocks could fail if called during shuffle
Sergei Lebedev created SPARK-7: -- Summary: DiskBlockManager.getAllBlocks could fail if called during shuffle Key: SPARK-7 URL: https://issues.apache.org/jira/browse/SPARK-7 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 2.2.0 Reporter: Sergei Lebedev Priority: Minor {{DiskBlockManager.getAllBlocks}} assumes that the directories managed by the block manager only contains files corresponding to "valid" block IDs, i.e. those parsable via {{BlockId.apply}}. This is not always the case as demonstrated by the following snippet {code} object GetAllBlocksFailure { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf() .setMaster("local[*]") .setAppName("demo")) new Thread { override def run(): Unit = { while (true) { println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length) Thread.sleep(10) } } }.start() val rdd = sc.range(1, 65536, numSlices = 10) .map(x => (x % 4096, x)) .persist(StorageLevel.DISK_ONLY) .reduceByKey { _ + _ } .collect() } } {code} We have a thread computing the number of bytes occupied by the block manager on-disk and it frequently crashes due to this assumption being violated. Relevant part of the stacktrace {code} 2017-10-06 11:20:14,287 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main] java.lang.IllegalStateException: Unrecognized BlockId: shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133) at org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103) at org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22147) BlockId.hashCode allocates a StringBuilder/String on each call
Sergei Lebedev created SPARK-22147: -- Summary: BlockId.hashCode allocates a StringBuilder/String on each call Key: SPARK-22147 URL: https://issues.apache.org/jira/browse/SPARK-22147 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 2.2.0 Reporter: Sergei Lebedev Priority: Minor The base class {{BlockId}} [defines|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala#L44] {{hashCode}} and {{equals}} for all its subclasses in terms of {{name}}. This makes the definitions of different ID types [very concise|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala#L52]. The downside, however, is redundant allocations. While I don't think this could be the major issue, it is still a bit disappointing to increase GC pressure on the driver for nothing. For our machine learning workloads, we've seen as much as 10% of all allocations on the driver coming from {{BlockId.hashCode}} calls done for [BlockManagerMasterEndpoint.blockLocations|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala#L54]. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-792) PairRDDFunctions should expect Product2 instead of Tuple2
[ https://issues.apache.org/jira/browse/SPARK-792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176268#comment-16176268 ] Sergei Lebedev commented on SPARK-792: -- It looks like the change in the title was never implemented. Does anyone know why? > PairRDDFunctions should expect Product2 instead of Tuple2 > - > > Key: SPARK-792 > URL: https://issues.apache.org/jira/browse/SPARK-792 > Project: Spark > Issue Type: Improvement >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 0.8.0 > > > PairRDDFunctions expect an RDD of Tuple2 (key, value) pair. If we change it > to expect Product2, we can apply for optimizations such as reducing the > number of temporary objects allocated and a faster runtime. > Product2 is just a trait in Scala, and users can define classes that extend > Product2. Two use cases in particular I have in mind: > 1. In MappedRDD or MappedPartitionRDD, the iterator could reuse a concrete > Product2 object and always return that; but in every next() call, the > iterator updates the content of the Product2. > 2. In some Spark cases (such as Shark, GraphX), the key field is only used to > partition the data. A concrete implementation of Product2 can have the key > field (_1) marked as transient, and thus reducing the amount of shuffle data > sent across the network between map and reduce stages. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22062) BlockManager does not account for memory consumed by remote fetches
Sergei Lebedev created SPARK-22062: -- Summary: BlockManager does not account for memory consumed by remote fetches Key: SPARK-22062 URL: https://issues.apache.org/jira/browse/SPARK-22062 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 2.2.0 Reporter: Sergei Lebedev Priority: Minor We use Spark exclusively with {{StorageLevel.DiskOnly}} as our workloads are very sensitive to memory usage. Recently, we've spotted that the jobs sometimes OOM leaving lots of byte[] arrays on the heap. Upon further investigation, we've found that the arrays come from {{BlockManager.getRemoteBytes}}, which [calls|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L638] {{BlockTransferService.fetchBlockSync}}, which in its turn would [allocate|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala#L99] an on-heap {{ByteBuffer}} of the same size as the block (e.g. full partition), if the block was successfully retrieved over the network. This memory is not accounted towards Spark storage/execution memory and could potentially lead to OOM if {{BlockManager}} fetches too many partitions in parallel. I wonder if this is intentional behaviour, or in fact a bug? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17901) NettyRpcEndpointRef: Error sending message and Caused by: java.util.ConcurrentModificationException
[ https://issues.apache.org/jira/browse/SPARK-17901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155707#comment-16155707 ] Sergei Lebedev edited comment on SPARK-17901 at 9/6/17 5:09 PM: [~srowen], I think this issue could've been closed by mistake: the stack trace is different from SPARK-17816. Could you reopen? was (Author: lebedev): [~srowen] I think this issue could've been closed by mistake: the stack trace is different from SPARK-17816. Could you reopen? > NettyRpcEndpointRef: Error sending message and Caused by: > java.util.ConcurrentModificationException > --- > > Key: SPARK-17901 > URL: https://issues.apache.org/jira/browse/SPARK-17901 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.1 >Reporter: Harish > > I have 2 data frames one with 10K rows and 10,000 columns and another with 4M > rows with 50 columns. I joined this and trying to find mean of merged data > set, > i calculated the mean using lamda using python mean() function. I cant write > in pyspark due to 64KB code limit issue. > After calculating the mean i did rdd.take(2). it works.But creating the DF > from RDD and DF.show is progress for more than 2 hours (I stopped the > process) with below message (102 GB , 6 cores per node -- total 10 nodes+ > 1master) > 16/10/13 04:36:31 WARN NettyRpcEndpointRef: Error sending message [message = > Heartbeat(9,[Lscala.Tuple2;@68eedafb,BlockManagerId(9, 10.63.136.103, > 35729))] in 1 attempts > org.apache.spark.SparkException: Exception thrown in awaitResult > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) > at > org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877) > at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayList.writeObject(ArrayList.java:766) > at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at > java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081) > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at >
[jira] [Comment Edited] (SPARK-17901) NettyRpcEndpointRef: Error sending message and Caused by: java.util.ConcurrentModificationException
[ https://issues.apache.org/jira/browse/SPARK-17901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155707#comment-16155707 ] Sergei Lebedev edited comment on SPARK-17901 at 9/6/17 5:09 PM: [~srowen] I think this issue could've been closed by mistake: the stack trace is different from SPARK-17816. Could you reopen? was (Author: lebedev): I think this issue could've been closed by mistake: the stack trace is different from SPARK-17816. Could you reopen? > NettyRpcEndpointRef: Error sending message and Caused by: > java.util.ConcurrentModificationException > --- > > Key: SPARK-17901 > URL: https://issues.apache.org/jira/browse/SPARK-17901 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.1 >Reporter: Harish > > I have 2 data frames one with 10K rows and 10,000 columns and another with 4M > rows with 50 columns. I joined this and trying to find mean of merged data > set, > i calculated the mean using lamda using python mean() function. I cant write > in pyspark due to 64KB code limit issue. > After calculating the mean i did rdd.take(2). it works.But creating the DF > from RDD and DF.show is progress for more than 2 hours (I stopped the > process) with below message (102 GB , 6 cores per node -- total 10 nodes+ > 1master) > 16/10/13 04:36:31 WARN NettyRpcEndpointRef: Error sending message [message = > Heartbeat(9,[Lscala.Tuple2;@68eedafb,BlockManagerId(9, 10.63.136.103, > 35729))] in 1 attempts > org.apache.spark.SparkException: Exception thrown in awaitResult > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) > at > org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877) > at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayList.writeObject(ArrayList.java:766) > at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at > java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081) > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at >
[jira] [Commented] (SPARK-17901) NettyRpcEndpointRef: Error sending message and Caused by: java.util.ConcurrentModificationException
[ https://issues.apache.org/jira/browse/SPARK-17901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155707#comment-16155707 ] Sergei Lebedev commented on SPARK-17901: I think this issue could've been closed by mistake: the stack trace is different from SPARK-17816. Could you reopen? > NettyRpcEndpointRef: Error sending message and Caused by: > java.util.ConcurrentModificationException > --- > > Key: SPARK-17901 > URL: https://issues.apache.org/jira/browse/SPARK-17901 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.1 >Reporter: Harish > > I have 2 data frames one with 10K rows and 10,000 columns and another with 4M > rows with 50 columns. I joined this and trying to find mean of merged data > set, > i calculated the mean using lamda using python mean() function. I cant write > in pyspark due to 64KB code limit issue. > After calculating the mean i did rdd.take(2). it works.But creating the DF > from RDD and DF.show is progress for more than 2 hours (I stopped the > process) with below message (102 GB , 6 cores per node -- total 10 nodes+ > 1master) > 16/10/13 04:36:31 WARN NettyRpcEndpointRef: Error sending message [message = > Heartbeat(9,[Lscala.Tuple2;@68eedafb,BlockManagerId(9, 10.63.136.103, > 35729))] in 1 attempts > org.apache.spark.SparkException: Exception thrown in awaitResult > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) > at > org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877) > at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayList.writeObject(ArrayList.java:766) > at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at > java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081) > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at >
[jira] [Commented] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable
[ https://issues.apache.org/jira/browse/SPARK-20284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964463#comment-15964463 ] Sergei Lebedev commented on SPARK-20284: Yes, Scala is a bad citizen in the JVM land and comes w/o any support for try-with-resources. IIUC scala-arm would manage just fine without Closeable because it uses structural types. However, I think there is no reason not to implement Closeable/AutoCloseable even if Spark/Scala code does not need this. > Make SerializationStream and DeserializationStream extend Closeable > --- > > Key: SPARK-20284 > URL: https://issues.apache.org/jira/browse/SPARK-20284 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.3, 2.1.0 >Reporter: Sergei Lebedev >Priority: Trivial > > Both {{SerializationStream}} and {{DeserializationStream}} implement > {{close}} but do not extend {{Closeable}}. As a result, these streams cannot > be used in try-with-resources. > Was this intentional or rather nobody ever needed that? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable
[ https://issues.apache.org/jira/browse/SPARK-20284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964257#comment-15964257 ] Sergei Lebedev edited comment on SPARK-20284 at 4/11/17 11:57 AM: -- It makes the stream well-behaved for any JVM language, e.g. pure-Java or [Kotlin|https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html]. was (Author: lebedev): It makes the stream well-behaved for any JVM user, e.g. pure-Java or [Kotlin|https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html]. > Make SerializationStream and DeserializationStream extend Closeable > --- > > Key: SPARK-20284 > URL: https://issues.apache.org/jira/browse/SPARK-20284 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.3, 2.1.0 >Reporter: Sergei Lebedev >Priority: Trivial > > Both {{SerializationStream}} and {{DeserializationStream}} implement > {{close}} but do not extend {{Closeable}}. As a result, these streams cannot > be used in try-with-resources. > Was this intentional or rather nobody ever needed that? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable
[ https://issues.apache.org/jira/browse/SPARK-20284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964257#comment-15964257 ] Sergei Lebedev commented on SPARK-20284: It makes the stream well-behaved for any JVM user, e.g. pure-Java or [Kotlin|https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html]. > Make SerializationStream and DeserializationStream extend Closeable > --- > > Key: SPARK-20284 > URL: https://issues.apache.org/jira/browse/SPARK-20284 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.3, 2.1.0 >Reporter: Sergei Lebedev >Priority: Trivial > > Both {{SerializationStream}} and {{DeserializationStream}} implement > {{close}} but do not extend {{Closeable}}. As a result, these streams cannot > be used in try-with-resources. > Was this intentional or rather nobody ever needed that? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20284) Make SerializationStream and DeserializationStream extend Closeable
Sergei Lebedev created SPARK-20284: -- Summary: Make SerializationStream and DeserializationStream extend Closeable Key: SPARK-20284 URL: https://issues.apache.org/jira/browse/SPARK-20284 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.1.0, 1.6.3 Reporter: Sergei Lebedev Priority: Minor Both {{SerializationStream}} and {{DeserializationStream}} implement {{close}} but do not extend {{Closeable}}. As a result, these streams cannot be used in try-with-resources. Was this intentional or rather nobody ever needed that? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19353) Support binary I/O in PipedRDD
[ https://issues.apache.org/jira/browse/SPARK-19353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15852498#comment-15852498 ] Sergei Lebedev commented on SPARK-19353: For reference: we have a fully backward-compatible [implementation|https://github.com/criteo-forks/spark/pull/26] of binary PipedRDD in our GitHub fork. > Support binary I/O in PipedRDD > -- > > Key: SPARK-19353 > URL: https://issues.apache.org/jira/browse/SPARK-19353 > Project: Spark > Issue Type: Improvement >Reporter: Sergei Lebedev >Priority: Minor > > The current design of RDD.pipe is very restrictive. > It is line-based, each element of the input RDD [gets > serialized|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L143] > into one or more lines. Similarly for the output of the child process, one > line corresponds to a single element of the output RDD. > It allows to customize the output format via {{printRDDElement}}, but not the > input format. > It is not designed for extensibility. The only way to get a "BinaryPipedRDD" > is to copy/paste most of it and change the relevant parts. > These limitations have been discussed on > [SO|http://stackoverflow.com/questions/27986830/how-to-pipe-binary-data-in-apache-spark] > and the mailing list, but alas no issue has been created. > A possible solution to at least the first two limitations is to factor out > the format into a separate object (or objects). For instance, {{InputWriter}} > and {{OutputReader}}, following Hadoop streaming API. > {code} > trait InputWriter[T] { > def write(os: OutputStream, elem: T) > } > trait OutputReader[T] { > def read(is: InputStream): T > } > {code} > The default configuration would be to write and read in line-based format, > but the users will also be able to selectively swap those to the appropriate > implementations. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19353) Support binary I/O in PipedRDD
Sergei Lebedev created SPARK-19353: -- Summary: Support binary I/O in PipedRDD Key: SPARK-19353 URL: https://issues.apache.org/jira/browse/SPARK-19353 Project: Spark Issue Type: Improvement Reporter: Sergei Lebedev Priority: Minor The current design of RDD.pipe is very restrictive. It is line-based, each element of the input RDD [gets serialized|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L143] into one or more lines. Similarly for the output of the child process, one line corresponds to a single element of the output RDD. It allows to customize the output format via {{printRDDElement}}, but not the input format. It is not designed for extensibility. The only way to get a "BinaryPipedRDD" is to copy/paste most of it and change the relevant parts. These limitations have been discussed on [SO|http://stackoverflow.com/questions/27986830/how-to-pipe-binary-data-in-apache-spark] and the mailing list, but alas no issue has been created. A possible solution to at least the first two limitations is to factor out the format into a separate object (or objects). For instance, {{InputWriter}} and {{OutputReader}}, following Hadoop streaming API. {code} trait InputWriter[T] { def write(os: OutputStream, elem: T) } trait OutputReader[T] { def read(is: InputStream): T } {code} The default configuration would be to write and read in line-based format, but the users will also be able to selectively swap those to the appropriate implementations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19010) Include Kryo exception in case of overflow
Sergei Lebedev created SPARK-19010: -- Summary: Include Kryo exception in case of overflow Key: SPARK-19010 URL: https://issues.apache.org/jira/browse/SPARK-19010 Project: Spark Issue Type: Improvement Affects Versions: 1.6.3 Reporter: Sergei Lebedev SPARK-6087 replaced Kryo overflow exception with a SparkException giving Spark-specific instructions on tackling the issue. The implementation also [suppressed|https://github.com/apache/spark/pull/4947/files#diff-1f81c62dad0e2dfc387a974bb08c497cR165] the original Kryo exception, which made it impossible to trace the operation causing the overflow. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org