[jira] [Commented] (SPARK-19525) Enable Compression of RDD Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872728#comment-15872728 ] Aaditya Ramesh commented on SPARK-19525: Great, I will get the patch ready. > Enable Compression of RDD Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19525) Enable Compression of Spark Streaming Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872698#comment-15872698 ] Aaditya Ramesh commented on SPARK-19525: We are suggesting to compress only before we write the checkpoint, not in memory. This is not happening right now - we just serialize the elements in the partition one by one and add to the serialization stream, according to {{ReliableCheckpointRDD.writePartitionToCheckpointFile}}: {code} val fileOutputStream = if (blockSize < 0) { fs.create(tempOutputPath, false, bufferSize) } else { // This is mainly for testing purpose fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication(fs.getWorkingDirectory), blockSize) } val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) Utils.tryWithSafeFinally { serializeStream.writeAll(iterator) } { serializeStream.close() } {code} As you can see, we don't do any compression after the serialization step. In our patch, we just use the CompressionCodec and wrap the serialization stream in compression codec output stream, and correspondingly in the read. > Enable Compression of Spark Streaming Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19525) Enable Compression of Spark Streaming Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872441#comment-15872441 ] Aaditya Ramesh commented on SPARK-19525: [~zsxwing] Actually, we are compressing the data in the RDDs, not the streaming metadata. We compress all records in a partition together and write them to our DFS. In our case, the snappy-compressed size of each RDD partition is around 18 MB, with 84 partitions, for a total of 1.5 GB per RDD. > Enable Compression of Spark Streaming Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19525) Enable Compression of Spark Streaming Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858810#comment-15858810 ] Aaditya Ramesh commented on SPARK-19525: We have a patch that works for an older version. I am currently trying to port it to Spark 2.1.0. Is this okay? > Enable Compression of Spark Streaming Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19525) Enable Compression of Spark Streaming Checkpoints
Aaditya Ramesh created SPARK-19525: -- Summary: Enable Compression of Spark Streaming Checkpoints Key: SPARK-19525 URL: https://issues.apache.org/jira/browse/SPARK-19525 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.1.0 Reporter: Aaditya Ramesh In our testing, compressing partitions while writing them to checkpoints on HDFS using snappy helped performance significantly while also reducing the variability of the checkpointing operation. In our tests, checkpointing time was reduced by 3X, and variability was reduced by 2X for data sets of compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
[ https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149126#comment-15149126 ] Aaditya Ramesh commented on SPARK-13027: Hi [~zsxwing] sorry to bump this again, I've submitted a new patch. Could you take a look when you get a chance? > Add API for updateStateByKey to provide batch time as input > --- > > Key: SPARK-13027 > URL: https://issues.apache.org/jira/browse/SPARK-13027 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Aaditya Ramesh > > The StateDStream currently does not provide the batch time as input to the > state update function. This is required in cases where the behavior depends > on the batch start time. > We (Conviva) have been patching it manually for the past several Spark > versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
[ https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aaditya Ramesh updated SPARK-13027: --- Description: The StateDStream currently does not provide the batch time as input to the state update function. This is required in cases where the behavior depends on the batch start time. We (Conviva) have been patching it manually for the past several Spark versions but we thought it might be useful for others as well. was: wwThe StateDStream currently does not provide the batch time as input to the state update function. This is required in cases where the behavior depends on the batch start time. We (Conviva) have been patching it manually for the past several Spark versions but we thought it might be useful for others as well. > Add API for updateStateByKey to provide batch time as input > --- > > Key: SPARK-13027 > URL: https://issues.apache.org/jira/browse/SPARK-13027 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Aaditya Ramesh > > The StateDStream currently does not provide the batch time as input to the > state update function. This is required in cases where the behavior depends > on the batch start time. > We (Conviva) have been patching it manually for the past several Spark > versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
[ https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aaditya Ramesh updated SPARK-13027: --- Description: wwThe StateDStream currently does not provide the batch time as input to the state update function. This is required in cases where the behavior depends on the batch start time. We (Conviva) have been patching it manually for the past several Spark versions but we thought it might be useful for others as well. was: The StateDStream currently does not provide the batch time as input to the state update function. This is required in cases where the behavior depends on the batch start time. We (Conviva) have been patching it manually for the past several Spark versions but we thought it might be useful for others as well. > Add API for updateStateByKey to provide batch time as input > --- > > Key: SPARK-13027 > URL: https://issues.apache.org/jira/browse/SPARK-13027 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Aaditya Ramesh > > wwThe StateDStream currently > does not provide the batch time as input to the state update function. This > is required in cases where the behavior depends on the batch start time. > We (Conviva) have been patching it manually for the past several Spark > versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
[ https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139898#comment-15139898 ] Aaditya Ramesh commented on SPARK-13027: [~zsxwing] I've submitted a pull request. Could you take a look when you get a chance? > Add API for updateStateByKey to provide batch time as input > --- > > Key: SPARK-13027 > URL: https://issues.apache.org/jira/browse/SPARK-13027 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Aaditya Ramesh > > The StateDStream currently does not provide the batch time as input to the > state update function. This is required in cases where the behavior depends > on the batch start time. > We (Conviva) have been patching it manually for the past several Spark > versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
[ https://issues.apache.org/jira/browse/SPARK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133087#comment-15133087 ] Aaditya Ramesh commented on SPARK-13027: [~zsxwing] Sorry for the late response, definitely, I have the patch ready, I'm just reviewing it with coworkers before submitting the PR. [~mariobriggs] thank you for the offer. :) I actually already have the patch ready and it would be very helpful if you could review it. > Add API for updateStateByKey to provide batch time as input > --- > > Key: SPARK-13027 > URL: https://issues.apache.org/jira/browse/SPARK-13027 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Aaditya Ramesh > > The StateDStream currently does not provide the batch time as input to the > state update function. This is required in cases where the behavior depends > on the batch start time. > We (Conviva) have been patching it manually for the past several Spark > versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13027) Add API for updateStateByKey to provide batch time as input
Aaditya Ramesh created SPARK-13027: -- Summary: Add API for updateStateByKey to provide batch time as input Key: SPARK-13027 URL: https://issues.apache.org/jira/browse/SPARK-13027 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.6.0 Reporter: Aaditya Ramesh The StateDStream currently does not provide the batch time as input to the state update function. This is required in cases where the behavior depends on the batch start time. We (Conviva) have been patching it manually for the past several Spark versions but we thought it might be useful for others as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org