[jira] [Commented] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort

2018-09-27 Thread Kyle Winkelman (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631017#comment-16631017
 ] 

Kyle Winkelman commented on BEAM-5519:
--

Proposed:

// SparkGroupAlsoByWindowViaWindowSet.buildPairDStream
JavaRDD>>
JavaRDD>>>
JavaRDD>>
JavaPairRDD

// UpdateStateByKeyOutputIterator.computeNext
gets the scala.collection.Seq the seq of values that have the same key
decoded to scala.collection.Seq> (convert to Iterable)


> Spark Streaming Duplicated Encoding/Decoding Effort
> ---
>
> Key: BEAM-5519
> URL: https://issues.apache.org/jira/browse/BEAM-5519
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Labels: spark, spark-streaming
>
> When using the SparkRunner in streaming mode. There is a call to groupByKey 
> followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this 
> used to cause 2 shuffles but it still causes 2 encode/decode cycles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort

2018-09-27 Thread Kyle Winkelman (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631012#comment-16631012
 ] 

Kyle Winkelman commented on BEAM-5519:
--

Current:
// GroupCombineFunctions.groupByKeyOnly
JavaRDD>>
JavaRDD>>>
JavaRDD>>
JavaPairRDD>
JavaPairRDD
JavaPairRdd> // groupByKey
JavaPairRDD>>
JavaRDD>>>
JavaRDD

// SparkGroupAlsoByWindowViaWindowSet.buildPairDStream
JavaRDD>>>
JavaPairRDD>>
JavaPairRDD>>>
JavaPairRDD

// UpdateStateByKeyOutputIterator.computeNext
gets the scala.collection.Seq the seq of values that have the same key
decoded to scala.collection.Seq>>> (zero or 
one items because we have already grouped by key)
get the head of the Seq and pull out the Iterable>



> Spark Streaming Duplicated Encoding/Decoding Effort
> ---
>
> Key: BEAM-5519
> URL: https://issues.apache.org/jira/browse/BEAM-5519
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Labels: spark, spark-streaming
>
> When using the SparkRunner in streaming mode. There is a call to groupByKey 
> followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this 
> used to cause 2 shuffles but it still causes 2 encode/decode cycles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort

2018-09-27 Thread Kyle Winkelman (JIRA)
Kyle Winkelman created BEAM-5519:


 Summary: Spark Streaming Duplicated Encoding/Decoding Effort
 Key: BEAM-5519
 URL: https://issues.apache.org/jira/browse/BEAM-5519
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Kyle Winkelman
Assignee: Kyle Winkelman


When using the SparkRunner in streaming mode. There is a call to groupByKey 
followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this 
used to cause 2 shuffles but it still causes 2 encode/decode cycles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-07-26 Thread Kyle Winkelman (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558437#comment-16558437
 ] 

Kyle Winkelman commented on BEAM-4783:
--

After looking into the issue further I don't believe the issue is related to 
Dynamic Allocation at all. I believe all BoundedSources should be broken up 
into as many blocks as are required to have them all be the same bundleSize. 

The way the code is currently written it appears as though all BoundedSources 
will be just broken into n evenly sized blocks where n is the 
defaultParallelism. In this way one 100GB file is broken up very differently 
than eight 25GB files will be (8 times the blocks). The user shouldn't have to 
break their one large file up into smaller pieces to get blocks that are small 
enough to handle.

If they were always broken up by the bundleSize the above two cases would be 
broken up almost identically. This approach is also how spark with an hdfs 
input would work; breaking the file up on the default HDFS block size.

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-07-13 Thread Kyle Winkelman (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Winkelman updated BEAM-4783:
-
Description: 
When the spark-runner is used along with the configuration 
spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
then falls back to the value calculated in this description:
  // when running on YARN/SparkDeploy it's the result of max(totalCores, 2).
  // when running on Mesos it's 8.
  // when running local it's the total number of cores (local = 1, local[N] 
= N,
  // local[*] = estimation of the machine's cores).
  // ** the configuration "spark.default.parallelism" takes precedence over 
all of the above **
So in most cases this default is quite small. This is an issue when using a 
very large input file as it will only get split in half.

I believe that when Dynamic Allocation is enable the SourceRDD should use the 
DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows you 
to change this DEFAULT_BUNDLE_SIZE.

  was:
When the spark-runner is used along with the configuration 
spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
then falls back to the value calculated in this description: 'when running on 
YARN/SparkDeploy it's the result of max(totalCores, 2)'. The totalCores is 
--executor-cores multiplied with --num-executors so this ends up being 2 in the 
most common case. This is an issue when using a very large input file as it 
will only get split in half.

I believe that when Dynamic Allocation is enable the SourceRDD should use the 
DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows you 
to change this DEFAULT_BUNDLE_SIZE.


> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Amit Sela
>Priority: Major
>  Labels: newbie
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-07-13 Thread Kyle Winkelman (JIRA)
Kyle Winkelman created BEAM-4783:


 Summary: Spark SourceRDD Not Designed With Dynamic Allocation In 
Mind
 Key: BEAM-4783
 URL: https://issues.apache.org/jira/browse/BEAM-4783
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Affects Versions: 2.5.0
Reporter: Kyle Winkelman
Assignee: Amit Sela


When the spark-runner is used along with the configuration 
spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
then falls back to the value calculated in this description: 'when running on 
YARN/SparkDeploy it's the result of max(totalCores, 2)'. The totalCores is 
--executor-cores multiplied with --num-executors so this ends up being 2 in the 
most common case. This is an issue when using a very large input file as it 
will only get split in half.

I believe that when Dynamic Allocation is enable the SourceRDD should use the 
DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows you 
to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3095) .withCompression() hinted at in docs, but not usable

2018-07-10 Thread Kyle Winkelman (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538605#comment-16538605
 ] 

Kyle Winkelman commented on BEAM-3095:
--

Can the easyfix, newbie, and starter lables be added to this?

> .withCompression() hinted at in docs, but not usable
> 
>
> Key: BEAM-3095
> URL: https://issues.apache.org/jira/browse/BEAM-3095
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Rafael Fernandez
>Assignee: Chamikara Jayalath
>Priority: Major
>
> There is a FileBasedSink.CompressionType enum, and a comment in TextIO.java 
> that suggests .withCompression(...) is available. Alas, there does not seem 
> to be a documented way to write compressed output. It's unclear whether the 
> documentation is wrong, or the functionality is indeed missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3095) .withCompression() hinted at in docs, but not usable

2018-07-10 Thread Kyle Winkelman (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538589#comment-16538589
 ] 

Kyle Winkelman commented on BEAM-3095:
--

[https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L147]
{code:java}
PCollection lines = ...;
lines.apply(TextIO.write().to("/path/to/file.txt"))
 .withSuffix(".txt")
 .withCompression(Compression.GZIP));{code}
This example is the one that does not work. There is no method 
.withCompression() on the TextIO.Write class.

I believe the below would work (but it uses deprecated APIs):
{code:java}
PCollection lines = ...;
lines.apply(TextIO.write().to("/path/to/file.txt")
 .withSuffix(".txt")
 
.withWritableByteChannelFactory(FileBasedSink.CompressionType.fromCanonical(Compression.GZIP)));
{code}
This Jira should probably be to add a convenience method to the TextIO.Write 
class similiar to the one in the TextIO.TypedWrite class.

[https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L886]
{code:java}
public TypedWrite withCompression(Compression compression) 
{
checkArgument(compression != null, "compression can not be null");
return 
withWritableByteChannelFactory(FileBasedSink.CompressionType.fromCanonical(compression));
}
{code}

> .withCompression() hinted at in docs, but not usable
> 
>
> Key: BEAM-3095
> URL: https://issues.apache.org/jira/browse/BEAM-3095
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Rafael Fernandez
>Assignee: Chamikara Jayalath
>Priority: Major
>
> There is a FileBasedSink.CompressionType enum, and a comment in TextIO.java 
> that suggests .withCompression(...) is available. Alas, there does not seem 
> to be a documented way to write compressed output. It's unclear whether the 
> documentation is wrong, or the functionality is indeed missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)