[jira] [Commented] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort
[ https://issues.apache.org/jira/browse/BEAM-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631017#comment-16631017 ] Kyle Winkelman commented on BEAM-5519: -- Proposed: // SparkGroupAlsoByWindowViaWindowSet.buildPairDStream JavaRDD>> JavaRDD>>> JavaRDD>> JavaPairRDD // UpdateStateByKeyOutputIterator.computeNext gets the scala.collection.Seq the seq of values that have the same key decoded to scala.collection.Seq> (convert to Iterable) > Spark Streaming Duplicated Encoding/Decoding Effort > --- > > Key: BEAM-5519 > URL: https://issues.apache.org/jira/browse/BEAM-5519 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Labels: spark, spark-streaming > > When using the SparkRunner in streaming mode. There is a call to groupByKey > followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this > used to cause 2 shuffles but it still causes 2 encode/decode cycles. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort
[ https://issues.apache.org/jira/browse/BEAM-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631012#comment-16631012 ] Kyle Winkelman commented on BEAM-5519: -- Current: // GroupCombineFunctions.groupByKeyOnly JavaRDD>> JavaRDD>>> JavaRDD>> JavaPairRDD> JavaPairRDD JavaPairRdd> // groupByKey JavaPairRDD>> JavaRDD>>> JavaRDD // SparkGroupAlsoByWindowViaWindowSet.buildPairDStream JavaRDD>>> JavaPairRDD>> JavaPairRDD>>> JavaPairRDD // UpdateStateByKeyOutputIterator.computeNext gets the scala.collection.Seq the seq of values that have the same key decoded to scala.collection.Seq>>> (zero or one items because we have already grouped by key) get the head of the Seq and pull out the Iterable> > Spark Streaming Duplicated Encoding/Decoding Effort > --- > > Key: BEAM-5519 > URL: https://issues.apache.org/jira/browse/BEAM-5519 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Labels: spark, spark-streaming > > When using the SparkRunner in streaming mode. There is a call to groupByKey > followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this > used to cause 2 shuffles but it still causes 2 encode/decode cycles. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort
Kyle Winkelman created BEAM-5519: Summary: Spark Streaming Duplicated Encoding/Decoding Effort Key: BEAM-5519 URL: https://issues.apache.org/jira/browse/BEAM-5519 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Kyle Winkelman Assignee: Kyle Winkelman When using the SparkRunner in streaming mode. There is a call to groupByKey followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this used to cause 2 shuffles but it still causes 2 encode/decode cycles. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558437#comment-16558437 ] Kyle Winkelman commented on BEAM-4783: -- After looking into the issue further I don't believe the issue is related to Dynamic Allocation at all. I believe all BoundedSources should be broken up into as many blocks as are required to have them all be the same bundleSize. The way the code is currently written it appears as though all BoundedSources will be just broken into n evenly sized blocks where n is the defaultParallelism. In this way one 100GB file is broken up very differently than eight 25GB files will be (8 times the blocks). The user shouldn't have to break their one large file up into smaller pieces to get blocks that are small enough to handle. If they were always broken up by the bundleSize the above two cases would be broken up almost identically. This approach is also how spark with an hdfs input would work; breaking the file up on the default HDFS block size. > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Winkelman updated BEAM-4783: - Description: When the spark-runner is used along with the configuration spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It then falls back to the value calculated in this description: // when running on YARN/SparkDeploy it's the result of max(totalCores, 2). // when running on Mesos it's 8. // when running local it's the total number of cores (local = 1, local[N] = N, // local[*] = estimation of the machine's cores). // ** the configuration "spark.default.parallelism" takes precedence over all of the above ** So in most cases this default is quite small. This is an issue when using a very large input file as it will only get split in half. I believe that when Dynamic Allocation is enable the SourceRDD should use the DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows you to change this DEFAULT_BUNDLE_SIZE. was: When the spark-runner is used along with the configuration spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It then falls back to the value calculated in this description: 'when running on YARN/SparkDeploy it's the result of max(totalCores, 2)'. The totalCores is --executor-cores multiplied with --num-executors so this ends up being 2 in the most common case. This is an issue when using a very large input file as it will only get split in half. I believe that when Dynamic Allocation is enable the SourceRDD should use the DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows you to change this DEFAULT_BUNDLE_SIZE. > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Amit Sela >Priority: Major > Labels: newbie > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
Kyle Winkelman created BEAM-4783: Summary: Spark SourceRDD Not Designed With Dynamic Allocation In Mind Key: BEAM-4783 URL: https://issues.apache.org/jira/browse/BEAM-4783 Project: Beam Issue Type: Improvement Components: runner-spark Affects Versions: 2.5.0 Reporter: Kyle Winkelman Assignee: Amit Sela When the spark-runner is used along with the configuration spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It then falls back to the value calculated in this description: 'when running on YARN/SparkDeploy it's the result of max(totalCores, 2)'. The totalCores is --executor-cores multiplied with --num-executors so this ends up being 2 in the most common case. This is an issue when using a very large input file as it will only get split in half. I believe that when Dynamic Allocation is enable the SourceRDD should use the DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3095) .withCompression() hinted at in docs, but not usable
[ https://issues.apache.org/jira/browse/BEAM-3095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538605#comment-16538605 ] Kyle Winkelman commented on BEAM-3095: -- Can the easyfix, newbie, and starter lables be added to this? > .withCompression() hinted at in docs, but not usable > > > Key: BEAM-3095 > URL: https://issues.apache.org/jira/browse/BEAM-3095 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Rafael Fernandez >Assignee: Chamikara Jayalath >Priority: Major > > There is a FileBasedSink.CompressionType enum, and a comment in TextIO.java > that suggests .withCompression(...) is available. Alas, there does not seem > to be a documented way to write compressed output. It's unclear whether the > documentation is wrong, or the functionality is indeed missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3095) .withCompression() hinted at in docs, but not usable
[ https://issues.apache.org/jira/browse/BEAM-3095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538589#comment-16538589 ] Kyle Winkelman commented on BEAM-3095: -- [https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L147] {code:java} PCollection lines = ...; lines.apply(TextIO.write().to("/path/to/file.txt")) .withSuffix(".txt") .withCompression(Compression.GZIP));{code} This example is the one that does not work. There is no method .withCompression() on the TextIO.Write class. I believe the below would work (but it uses deprecated APIs): {code:java} PCollection lines = ...; lines.apply(TextIO.write().to("/path/to/file.txt") .withSuffix(".txt") .withWritableByteChannelFactory(FileBasedSink.CompressionType.fromCanonical(Compression.GZIP))); {code} This Jira should probably be to add a convenience method to the TextIO.Write class similiar to the one in the TextIO.TypedWrite class. [https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L886] {code:java} public TypedWrite withCompression(Compression compression) { checkArgument(compression != null, "compression can not be null"); return withWritableByteChannelFactory(FileBasedSink.CompressionType.fromCanonical(compression)); } {code} > .withCompression() hinted at in docs, but not usable > > > Key: BEAM-3095 > URL: https://issues.apache.org/jira/browse/BEAM-3095 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Rafael Fernandez >Assignee: Chamikara Jayalath >Priority: Major > > There is a FileBasedSink.CompressionType enum, and a comment in TextIO.java > that suggests .withCompression(...) is available. Alas, there does not seem > to be a documented way to write compressed output. It's unclear whether the > documentation is wrong, or the functionality is indeed missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)