[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-02-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=196229&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196229
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 08/Feb/19 14:05
Start Date: 08/Feb/19 14:05
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #7091: [BEAM-5987] 
Cache and share materialized side inputs between Spark tasks
URL: https://github.com/apache/beam/pull/7091
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 196229)
Time Spent: 9h 40m  (was: 9.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-02-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=196228&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196228
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 08/Feb/19 14:05
Start Date: 08/Feb/19 14:05
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Cache and 
share materialized side inputs between Spark tasks
URL: https://github.com/apache/beam/pull/7091#issuecomment-461813165
 
 
   Merged!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 196228)
Time Spent: 9.5h  (was: 9h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-02-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=196215&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196215
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 08/Feb/19 13:19
Start Date: 08/Feb/19 13:19
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454794706
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 196215)
Time Spent: 9h 20m  (was: 9h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-02-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=193444&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193444
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 01/Feb/19 16:21
Start Date: 01/Feb/19 16:21
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-456470145
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 193444)
Time Spent: 8h 50m  (was: 8h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-02-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=193446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193446
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 01/Feb/19 16:21
Start Date: 01/Feb/19 16:21
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-456840543
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 193446)
Time Spent: 9h 10m  (was: 9h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-02-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=193445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193445
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 01/Feb/19 16:21
Start Date: 01/Feb/19 16:21
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-456840404
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 193445)
Time Spent: 9h  (was: 8h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=189011&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189011
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 23/Jan/19 15:17
Start Date: 23/Jan/19 15:17
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-456840404
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 189011)
Time Spent: 8.5h  (was: 8h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=189012&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189012
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 23/Jan/19 15:17
Start Date: 23/Jan/19 15:17
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-456840543
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 189012)
Time Spent: 8h 40m  (was: 8.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188327&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188327
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 22/Jan/19 16:41
Start Date: 22/Jan/19 16:41
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-456470145
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 188327)
Time Spent: 8h 10m  (was: 8h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188332&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188332
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 22/Jan/19 16:48
Start Date: 22/Jan/19 16:48
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-456472767
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 188332)
Time Spent: 8h 20m  (was: 8h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188326&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188326
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 22/Jan/19 16:41
Start Date: 22/Jan/19 16:41
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r249863813
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Cache deserialized side inputs for executor so every task doesn't need to 
deserialize them again.
+ * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess.
+ */
+class SideInputStorage {
+
+  /** JVM deserialized side input cache. */
+  private static final Cache, Optional> materializedSideInputs =
 
 Review comment:
   I also think you have bigger troubles if you have collision in `view` or 
`window`(on more places `sparkRunner` relies on that) so I will leave it as it 
is. Is that ok?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 188326)
Time Spent: 8h  (was: 7h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188317&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188317
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 22/Jan/19 16:24
Start Date: 22/Jan/19 16:24
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r249856567
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 @SuppressWarnings("unchecked")
-final Map, T> materializedCasted = (Map) materialized;
-return materializedCasted.computeIfAbsent(
-new Key<>(view, window), key -> delegate.get(view, window));
+final Cache, Optional> materializedCasted =
+(Cache) SideInputStorage.getMaterializedSideInputs();
+
+Key sideInputKey = new Key<>(view, window);
+
+try {
+  Optional optionalResult =
+  materializedCasted.get(
+  sideInputKey,
+  () -> {
+final T result = delegate.get(view, window);
+LOG.info(
+"Caching de-serialized side input for {} of size [{}B] in 
memory.",
+sideInputKey,
+SizeEstimator.estimate(result));
+return Optional.ofNullable(result);
+  });
+  return optionalResult.orElse(null);
 
 Review comment:
   Thanks for suggestions, Optional combo would be not very readable.
   I made my own wrapper where I simply wrap the value so I can put `null` into 
the cache.
   
https://github.com/apache/beam/pull/7091/files#diff-b123f0f1ca9646966a641a458b74cfbcR92
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 188317)
Time Spent: 7h 50m  (was: 7h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188313
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 22/Jan/19 16:22
Start Date: 22/Jan/19 16:22
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r249855350
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
 ##
 @@ -85,6 +88,13 @@ private SideInputBroadcast createBroadcastHelper(
   PCollectionView view, JavaSparkContext context) {
 Tuple2>>> tuple2 = 
pviews.get(view);
 SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, 
tuple2._2);
+String pCollectionName =
+view.getPCollection() != null ? view.getPCollection().getName() : 
"UNKNOWN";
+LOG.info(
 
 Review comment:
   changed to debug
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 188313)
Time Spent: 7.5h  (was: 7h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188314&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188314
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 22/Jan/19 16:22
Start Date: 22/Jan/19 16:22
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r249855378
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -17,17 +17,23 @@
  */
 package org.apache.beam.runners.spark.util;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import com.google.common.cache.Cache;
 
 Review comment:
   fixed to vendored 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 188314)
Time Spent: 7h 40m  (was: 7.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=186632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186632
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 17/Jan/19 23:59
Start Date: 17/Jan/19 23:59
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248886560
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Cache deserialized side inputs for executor so every task doesn't need to 
deserialize them again.
+ * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess.
+ */
+class SideInputStorage {
+
+  /** JVM deserialized side input cache. */
+  private static final Cache, Optional> materializedSideInputs =
 
 Review comment:
   The window should use `windowCoder.structuralValue(window)` which is 
required to behave identically to a full serialization for shuffle purposes. 
The view itself is just a tag so it should have good enough equals as-is. There 
is already caching in the now-donated Dataflow Java worker, if you look through 
uses and subclasses of `SideInputReader`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186632)
Time Spent: 7h 20m  (was: 7h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=186302&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186302
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 17/Jan/19 13:53
Start Date: 17/Jan/19 13:53
Worklog Time Spent: 10m 
  Work Description: amitsela commented on pull request #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248677871
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Cache deserialized side inputs for executor so every task doesn't need to 
deserialize them again.
+ * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess.
+ */
+class SideInputStorage {
+
+  /** JVM deserialized side input cache. */
+  private static final Cache, Optional> materializedSideInputs =
 
 Review comment:
   how do other runners cache side inputs? by which key? this sounds like 
something the SDK could provide guidance on (@kennknowles)

 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186302)
Time Spent: 7h 10m  (was: 7h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=186126&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186126
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 17/Jan/19 02:34
Start Date: 17/Jan/19 02:34
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248521765
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 @SuppressWarnings("unchecked")
-final Map, T> materializedCasted = (Map) materialized;
-return materializedCasted.computeIfAbsent(
-new Key<>(view, window), key -> delegate.get(view, window));
+final Cache, Optional> materializedCasted =
+(Cache) SideInputStorage.getMaterializedSideInputs();
+
+Key sideInputKey = new Key<>(view, window);
+
+try {
+  Optional optionalResult =
+  materializedCasted.get(
+  sideInputKey,
+  () -> {
+final T result = delegate.get(view, window);
+LOG.info(
+"Caching de-serialized side input for {} of size [{}B] in 
memory.",
+sideInputKey,
+SizeEstimator.estimate(result));
+return Optional.ofNullable(result);
+  });
+  return optionalResult.orElse(null);
 
 Review comment:
   Ah, thank you for clarifying. That is a good attempt. The problem is that 
these will incorrectly be turned into the same thing:
   
   `Optional.ofNullable(null).orElse(null) == null`
   
   `Optional.ofNullable(Optional.absent()).orElse(null) == null`
   
   The fact that `Optional.of(null)` throws NPE is a mistake in the design 
(both Java and Guava). Maybe the point of the design is to convince people to 
not use `null`, which [is a billion dollar good 
idea](https://en.wikipedia.org/wiki/Null_pointer#History). But it makes 
`Optional` not correctly parametric in `T`.
   
   I think that if you actually convert `null` into 
`Optional.of(Optional.absent())` and other values `v` into 
`Optional.of(Optional.of(v))` you can simulate the behavior it should have had 
in the first place. Or you could make your own little replacement of `Optional`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186126)
Time Spent: 7h  (was: 6h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185878&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185878
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:55
Start Date: 16/Jan/19 16:55
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248365302
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 @SuppressWarnings("unchecked")
-final Map, T> materializedCasted = (Map) materialized;
-return materializedCasted.computeIfAbsent(
-new Key<>(view, window), key -> delegate.get(view, window));
+final Cache, Optional> materializedCasted =
+(Cache) SideInputStorage.getMaterializedSideInputs();
+
+Key sideInputKey = new Key<>(view, window);
+
+try {
+  Optional optionalResult =
+  materializedCasted.get(
+  sideInputKey,
+  () -> {
+final T result = delegate.get(view, window);
+LOG.info(
+"Caching de-serialized side input for {} of size [{}B] in 
memory.",
+sideInputKey,
+SizeEstimator.estimate(result));
+return Optional.ofNullable(result);
+  });
+  return optionalResult.orElse(null);
 
 Review comment:
   I chose this solution because guava `Cache` doesn't allow null values and I 
didn't realize I will break semantic meaning. I will try to find out different 
solution.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185878)
Time Spent: 6h 50m  (was: 6h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185868&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185868
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:45
Start Date: 16/Jan/19 16:45
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248359016
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
 ##
 @@ -85,6 +88,13 @@ private SideInputBroadcast createBroadcastHelper(
   PCollectionView view, JavaSparkContext context) {
 Tuple2>>> tuple2 = 
pviews.get(view);
 SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, 
tuple2._2);
+String pCollectionName =
+view.getPCollection() != null ? view.getPCollection().getName() : 
"UNKNOWN";
+LOG.info(
 
 Review comment:
   Maybe `LOG.debug`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185868)
Time Spent: 6h 20m  (was: 6h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185871&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185871
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:45
Start Date: 16/Jan/19 16:45
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248360258
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 
 Review comment:
   Ah, the meaning of that comment is that `null` is a value. You can have a 
`PCollection<@Nullable Foo>` that contains just one copy of `null` and use 
`View.asSingleton()` and the side input returns the `null`.
   
   In other words, `get` _must_ return a value of type `T`. But the type `T` 
may itself be `@Nullable Something`. The annotation on `SideInputReader` should 
be removed. It is is incorrect if we use a static analysis that understands 
this. Findbugs does not understand this but we should aspire for our 
annotations to be correct so the documentation is clear.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185871)
Time Spent: 6h 40m  (was: 6.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185869&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185869
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:45
Start Date: 16/Jan/19 16:45
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248359142
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -17,17 +17,23 @@
  */
 package org.apache.beam.runners.spark.util;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import com.google.common.cache.Cache;
 
 Review comment:
   Vendored Guava?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185869)
Time Spent: 6.5h  (was: 6h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185867&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185867
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:45
Start Date: 16/Jan/19 16:45
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248355041
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 
 Review comment:
   I am worried on the possible semantics consequences of 
`CachedSideInputReader.get()` returning a `null` value when it is not in the 
Cache. Wouldn't it imply that a window could get an empty side input assigned?
   [The 
documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32)
 on this is not really clear (pinging @kennknowles to see if I am misreading 
it).
   Wonder if there is a test to validate that this cannot happen or if we can 
create one somehow?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185867)
Time Spent: 6h 10m  (was: 6h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185870&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185870
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:45
Start Date: 16/Jan/19 16:45
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248360966
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 @SuppressWarnings("unchecked")
-final Map, T> materializedCasted = (Map) materialized;
-return materializedCasted.computeIfAbsent(
-new Key<>(view, window), key -> delegate.get(view, window));
+final Cache, Optional> materializedCasted =
+(Cache) SideInputStorage.getMaterializedSideInputs();
+
+Key sideInputKey = new Key<>(view, window);
+
+try {
+  Optional optionalResult =
+  materializedCasted.get(
+  sideInputKey,
+  () -> {
+final T result = delegate.get(view, window);
+LOG.info(
+"Caching de-serialized side input for {} of size [{}B] in 
memory.",
+sideInputKey,
+SizeEstimator.estimate(result));
+return Optional.ofNullable(result);
+  });
+  return optionalResult.orElse(null);
 
 Review comment:
   Ismaël is right. The `delegate.get` is not `@Nullable` at this place in the 
abstraction. You don't need to check for it (unless there's some other bug 
somewhere) and you shouldn't convert `Optional.absent()` to `null`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185870)
Time Spent: 6h 40m  (was: 6.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185866&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185866
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:45
Start Date: 16/Jan/19 16:45
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248355041
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 
 Review comment:
   I am worried on the possible semantics consequences of 
`CachedSideInputReader.get()` returning a `null` value when it is not in the 
`Cache` for this. Wouldn't it imply that a window could get an empty side input 
assigned?
   [The 
documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32)
 on this is not really clear (pinging @kennknowles to see if I am misreading 
it).
   Wonder if there is a test to validate that this cannot happen or if we can 
create one somehow?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185866)
Time Spent: 6h  (was: 5h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185859&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185859
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:36
Start Date: 16/Jan/19 16:36
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248354273
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Cache deserialized side inputs for executor so every task doesn't need to 
deserialize them again.
+ * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess.
+ */
+class SideInputStorage {
+
+  /** JVM deserialized side input cache. */
+  private static final Cache, Optional> materializedSideInputs =
 
 Review comment:
   I am a bit worried on the possible consequence of a collision of the 
`Key` tuple in particular if a bad implementation of equals is 
around. This is not relative to this PR but since the state is now static this 
makes the likelihood of this happening bigger.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185859)
Time Spent: 5h 40m  (was: 5.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185861&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185861
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:38
Start Date: 16/Jan/19 16:38
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248355041
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 
 Review comment:
   I am worried on the possible semantics consequences of 
`SideInputReader.get()` returning a `null` value when it is not in the `Cache` 
for this. Wouldn't it imply that a window could get an empty side input 
assigned?
   [The 
documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32)
 on this is not really clear (pinging @kennknowles to see if I am misreading 
it).
   Wonder if there is a test to validate that this cannot happen or if we can 
create one somehow?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185861)
Time Spent: 5h 50m  (was: 5h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185860&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185860
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 16:36
Start Date: 16/Jan/19 16:36
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r248355041
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) {
   @Override
   public  T get(PCollectionView view, BoundedWindow window) {
 
 Review comment:
   I am worried on the possible semantics consequences of get returning a 
`null` value when it is not in the `Cache` for this `SideInputReader` wouldn't 
it imply that a window could get an empty side input assigned?
   [The 
documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32)
 on this is not really clear (pinging @kennknowles to see if I am misreading 
it).
   Wonder if there is a test to validate that this cannot happen or if we can 
create one somehow?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185860)
Time Spent: 5h 40m  (was: 5.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185799&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185799
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 14:18
Start Date: 16/Jan/19 14:18
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454795081
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185799)
Time Spent: 5.5h  (was: 5h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185789
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 14:08
Start Date: 16/Jan/19 14:08
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454791089
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185789)
Time Spent: 5h  (was: 4h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185796
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 14:17
Start Date: 16/Jan/19 14:17
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454794706
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185796)
Time Spent: 5h 20m  (was: 5h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185795&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185795
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 14:16
Start Date: 16/Jan/19 14:16
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454794367
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185795)
Time Spent: 5h 10m  (was: 5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185788&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185788
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 14:08
Start Date: 16/Jan/19 14:08
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454790500
 
 
   Spark Validates Runner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185788)
Time Spent: 4h 50m  (was: 4h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185787
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 14:08
Start Date: 16/Jan/19 14:08
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454791089
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185787)
Time Spent: 4h 40m  (was: 4.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185786&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185786
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 16/Jan/19 14:06
Start Date: 16/Jan/19 14:06
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-454790500
 
 
   Spark Validates Runner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 185786)
Time Spent: 4.5h  (was: 4h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=184265&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184265
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 11/Jan/19 17:00
Start Date: 11/Jan/19 17:00
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-453585179
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184265)
Time Spent: 4h 20m  (was: 4h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=184241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184241
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 11/Jan/19 15:15
Start Date: 11/Jan/19 15:15
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-453549228
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184241)
Time Spent: 4h 10m  (was: 4h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=184234&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184234
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 11/Jan/19 14:42
Start Date: 11/Jan/19 14:42
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-453538646
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184234)
Time Spent: 4h  (was: 3h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181820&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181820
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 07/Jan/19 15:02
Start Date: 07/Jan/19 15:02
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-451961840
 
 
   After trying several approaches we decided to keep it simple and go with 
expireAfterAccess to drop values from cache. 
   
   Solution with weak values didn't bring desired behavior because when 
`MultiDoFnFunction` finished for one executor, it immediately garbage collected 
de-serialized side input (because it lost references due to end of life for 
`CachedSideInputReader`). It kept side input in cache only if there was 
overlapped running of multiple `MultiDoFnFunction` . In our case for one JVM it 
a de-serialized side input up to 10x.
   
   With [expireAfterAccess]( 
https://github.com/google/guava/wiki/CachesExplained#timed-eviction) side input 
is de-serialized only once. I chose [5 min eviction 
duration](https://github.com/apache/beam/pull/7091/files#diff-b123f0f1ca9646966a641a458b74cfbcR35)
 as best compromise but I am open to discussion if it should be configurable. 
   
   Disadvantage for expireAfterAccess solution could be potential higher memory 
consumption if `SideInputStorage` isn't access long time so nothing can be 
evicted. I don't know how to recognize when `MultiDoFnFunction` is finished so 
I can call cache.cleanup() to trigger eviction for expired items. Also not sure 
if this is even a problem.
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 181820)
Time Spent: 3h 50m  (was: 3h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181819&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181819
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 07/Jan/19 15:01
Start Date: 07/Jan/19 15:01
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-451961840
 
 
   After trying several approaches we decided to keep it simple and go with 
expireAfterAccess to drop values from cache. 
   
   Solution with weak values didn't bring desired behavior because when 
`MultiDoFnFunction` finished for one executor, it immediately garbage collected 
de-serialized side input (because it lost references due to end of life for 
`CachedSideInputReader`). It kept side input in cache only if there was 
overlapped running of multiple `MultiDoFnFunction` . In our case for one JVM it 
a de-serialized side input up to 10x.
   
   With [expireAfterAccess]( 
https://github.com/google/guava/wiki/CachesExplained#timed-eviction) side input 
is de-serialized only once. I chose 5 min eviction duration as best compromise 
but I am open to discussion if it should be configurable. 
   
   Disadvantage for expireAfterAccess solution could be potential higher memory 
consumption if `SideInputStorage` isn't access long time so nothing can be 
evicted. I don't know how to recognize when `MultiDoFnFunction` is finished so 
I can call cache.cleanup() to trigger eviction for expired items. Also not sure 
if this is even a problem.
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 181819)
Time Spent: 3h 40m  (was: 3.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181818&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181818
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 07/Jan/19 14:59
Start Date: 07/Jan/19 14:59
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r245680983
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -17,17 +17,31 @@
  */
 package org.apache.beam.runners.spark.util;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import com.google.common.cache.Cache;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.spark.util.SideInputStorage.Key;
+import org.apache.beam.runners.spark.util.SideInputStorage.Value;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.spark.util.SizeEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** {@link SideInputReader} that caches materialized views. */
 public class CachedSideInputReader implements SideInputReader {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachedSideInputReader.class);
+
+  /**
+   * Keep references for the whole lifecycle of CachedSideInputReader 
otherwise sideInput needs to
+   * be de-serialized again.
+   */
+  private Set sideInputReferences = new HashSet<>();
 
 Review comment:
   references removed because different solution was used. 
[more](https://github.com/apache/beam/pull/7091#issuecomment-451961840)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 181818)
Time Spent: 3.5h  (was: 3h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181817&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181817
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 07/Jan/19 14:58
Start Date: 07/Jan/19 14:58
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-451961840
 
 
   After trying several approaches we decided to keep it simple and go with 
expireAfterAccess to drop values from cache. 
   
   Solution with weak values didn't bring desired behavior because when 
`MultiDoFnFunction` finished for one executor, it immediately garbage collected 
de-serialized side input (because it lost references due to end of life for 
`CachedSideInputReader`). It kept side input in cache only if there were 
running multiple `MultiDoFnFunction` overlap. In our case for one JVM it a 
de-serialized side input up to 10x.
   
   With [expireAfterAccess]( 
https://github.com/google/guava/wiki/CachesExplained#timed-eviction) side input 
is de-serialized only once. I chose 5 min eviction duration as best compromise 
but I am open to discussion if it should be configurable. 
   
   Disadvantage for expireAfterAccess solution could be potential higher memory 
consumption if `SideInputStorage` isn't access long time so nothing can be 
evicted. I don't know how to recognize when `MultiDoFnFunction` is finished so 
I can call cache.cleanup() to trigger eviction for expired items. Also not sure 
if this is even a problem.
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 181817)
Time Spent: 3h 20m  (was: 3h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2019-01-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181792&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181792
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 07/Jan/19 14:31
Start Date: 07/Jan/19 14:31
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r245671371
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Cache deserialized side inputs for executor so every task doesnt need to 
deserialize them again.
+ * Side inputs are stored in {@link Cache} with weakValues so if there is no 
reference to a value,
+ * sideInput is garbage collected.
+ */
+public class SideInputStorage {
 
 Review comment:
   done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 181792)
Time Spent: 3h 10m  (was: 3h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173515&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173515
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 10/Dec/18 11:01
Start Date: 10/Dec/18 11:01
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: 
Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-445776385
 
 
   It would be nice to test that this behaves as expected and does not leak 
(not being GCed) and does not rematerialize.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 173515)
Time Spent: 3h  (was: 2h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173514&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173514
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 10/Dec/18 10:59
Start Date: 10/Dec/18 10:59
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r240166335
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 ##
 @@ -17,17 +17,31 @@
  */
 package org.apache.beam.runners.spark.util;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import com.google.common.cache.Cache;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.spark.util.SideInputStorage.Key;
+import org.apache.beam.runners.spark.util.SideInputStorage.Value;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.spark.util.SizeEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** {@link SideInputReader} that caches materialized views. */
 public class CachedSideInputReader implements SideInputReader {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachedSideInputReader.class);
+
+  /**
+   * Keep references for the whole lifecycle of CachedSideInputReader 
otherwise sideInput needs to
+   * be de-serialized again.
+   */
+  private Set sideInputReferences = new HashSet<>();
 
 Review comment:
   Aren't values ever removed from here? Or I am misreading this one, seems 
like it can overflow and even prevent `SideInputStorage` from being GCed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 173514)
Time Spent: 2h 50m  (was: 2h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173508&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173508
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 10/Dec/18 10:52
Start Date: 10/Dec/18 10:52
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r240163858
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Cache deserialized side inputs for executor so every task doesnt need to 
deserialize them again.
+ * Side inputs are stored in {@link Cache} with weakValues so if there is no 
reference to a value,
+ * sideInput is garbage collected.
+ */
+public class SideInputStorage {
 
 Review comment:
   package private and same for constructor, make access as tight as needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 173508)
Time Spent: 2h 40m  (was: 2.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173507&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173507
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 10/Dec/18 10:51
Start Date: 10/Dec/18 10:51
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r240163858
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Cache deserialized side inputs for executor so every task doesnt need to 
deserialize them again.
+ * Side inputs are stored in {@link Cache} with weakValues so if there is no 
reference to a value,
+ * sideInput is garbage collected.
+ */
+public class SideInputStorage {
 
 Review comment:
   package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 173507)
Time Spent: 2.5h  (was: 2h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-12-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=171493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171493
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 03/Dec/18 09:08
Start Date: 03/Dec/18 09:08
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-443638145
 
 
   Run Spark ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 171493)
Time Spent: 2h 20m  (was: 2h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=170162&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170162
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 28/Nov/18 09:23
Start Date: 28/Nov/18 09:23
Worklog Time Spent: 10m 
  Work Description: mareksimunek commented on issue #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-442377623
 
 
   Run Spark ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 170162)
Time Spent: 2h 10m  (was: 2h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168439
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 21/Nov/18 18:15
Start Date: 21/Nov/18 18:15
Worklog Time Spent: 10m 
  Work Description: VaclavPlajt commented on a change in pull request 
#7091: [BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r235493663
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 ##
 @@ -61,6 +66,17 @@
 public class MultiDoFnFunction
 implements PairFlatMapFunction>, 
TupleTag, WindowedValue> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiDoFnFunction.class);
+
+  /** JVM wide side input cache. */
+  private static final Map sideInputReaders =
+  Collections.synchronizedMap(new WeakHashMap<>());
+
+  /**
+   * Id that is consistent among executors. We can not use stepName because of 
possible collisions.
 
 Review comment:
   ok, I see.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 168439)
Time Spent: 2h  (was: 1h 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168327&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168327
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 21/Nov/18 15:58
Start Date: 21/Nov/18 15:58
Worklog Time Spent: 10m 
  Work Description: dmvk commented on a change in pull request #7091: 
[BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r235445177
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 ##
 @@ -61,6 +66,17 @@
 public class MultiDoFnFunction
 implements PairFlatMapFunction>, 
TupleTag, WindowedValue> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiDoFnFunction.class);
+
+  /** JVM wide side input cache. */
+  private static final Map sideInputReaders =
+  Collections.synchronizedMap(new WeakHashMap<>());
+
+  /**
+   * Id that is consistent among executors. We can not use stepName because of 
possible collisions.
 
 Review comment:
   After deserialization on the executor side


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 168327)
Time Spent: 1h 40m  (was: 1.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168328
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 21/Nov/18 15:59
Start Date: 21/Nov/18 15:59
Worklog Time Spent: 10m 
  Work Description: dmvk commented on issue #7091: [BEAM-5987] Spark: Share 
cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-440718069
 
 
   This still needs some effort as it does not handle the case when sideinput 
is used in different DoFns


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 168328)
Time Spent: 1h 50m  (was: 1h 40m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168286&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168286
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 21/Nov/18 14:06
Start Date: 21/Nov/18 14:06
Worklog Time Spent: 10m 
  Work Description: VaclavPlajt commented on a change in pull request 
#7091: [BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r235399531
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 ##
 @@ -61,6 +66,17 @@
 public class MultiDoFnFunction
 implements PairFlatMapFunction>, 
TupleTag, WindowedValue> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiDoFnFunction.class);
+
+  /** JVM wide side input cache. */
+  private static final Map sideInputReaders =
+  Collections.synchronizedMap(new WeakHashMap<>());
+
+  /**
+   * Id that is consistent among executors. We can not use stepName because of 
possible collisions.
 
 Review comment:
   I do not get meaning of 'consistent' here. Do you mean random even within 
one JVM ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 168286)
Time Spent: 1h 20m  (was: 1h 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168289&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168289
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 21/Nov/18 14:09
Start Date: 21/Nov/18 14:09
Worklog Time Spent: 10m 
  Work Description: VaclavPlajt commented on a change in pull request 
#7091: [BEAM-5987] Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#discussion_r235399531
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 ##
 @@ -61,6 +66,17 @@
 public class MultiDoFnFunction
 implements PairFlatMapFunction>, 
TupleTag, WindowedValue> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiDoFnFunction.class);
+
+  /** JVM wide side input cache. */
+  private static final Map sideInputReaders =
+  Collections.synchronizedMap(new WeakHashMap<>());
+
+  /**
+   * Id that is consistent among executors. We can not use stepName because of 
possible collisions.
 
 Review comment:
   I do not get meaning of 'consistent' here. Do you mean random (most likely 
distinct) even within one JVM ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 168289)
Time Spent: 1.5h  (was: 1h 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=167885&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167885
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 20/Nov/18 17:45
Start Date: 20/Nov/18 17:45
Worklog Time Spent: 10m 
  Work Description: dmvk commented on issue #7091: [BEAM-5987] Spark: Share 
cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091#issuecomment-440366991
 
 
   Run Spark ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167885)
Time Spent: 1h 10m  (was: 1h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=167851&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167851
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 20/Nov/18 17:09
Start Date: 20/Nov/18 17:09
Worklog Time Spent: 10m 
  Work Description: dmvk opened a new pull request #7091: [BEAM-5987] 
Spark: Share cached side inputs between tasks.
URL: https://github.com/apache/beam/pull/7091
 
 
   We should try to reuse deserialized side inputs among spark tasks.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 167851)
Time Spent: 1h  (was: 50m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.

[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163217&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163217
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 06/Nov/18 22:02
Start Date: 06/Nov/18 22:02
Worklog Time Spent: 10m 
  Work Description: iemejia closed pull request #6960: [BEAM-5987] Fix 
Spark SideInputReader performance.
URL: https://github.com/apache/beam/pull/6960
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index c70634c0733..4efe1e1438e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -35,6 +35,7 @@
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.spark.util.CachedSideInputReader;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.coders.Coder;
@@ -153,7 +154,7 @@ public TimerInternals timerInternals() {
 DoFnRunners.simpleRunner(
 options.get(),
 doFn,
-new SparkSideInputReader(sideInputs),
+CachedSideInputReader.of(new SparkSideInputReader(sideInputs)),
 outputManager,
 mainOutputTag,
 additionalOutputTags,
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
new file mode 100644
index 000..49200c6c4b4
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/** {@link SideInputReader} that caches materialized views. */
+public class CachedSideInputReader implements SideInputReader {
+
+  /**
+   * Create a new cached {@link SideInputReader}.
+   *
+   * @param delegate wrapped reader
+   * @return cached reader
+   */
+  public static CachedSideInputReader of(SideInputReader delegate) {
+return new CachedSideInputReader(delegate);
+  }
+
+  /**
+   * Composite key of {@link PCollectionView} and {@link BoundedWindow} used 
to identify
+   * materialized results.
+   *
+   * @param  type of result
+   */
+  private static class Key {
+
+private final PCollectionView view;
+private final BoundedWindow window;
+
+Key(PCollectionView view, BoundedWindow window) {
+  this.view = view;
+  this.window = window;
+}
+
+@Override
+public boolean equals(Object o) {
+  if (this == o) {
+return true;
+  }
+  if (o == null || getClass() != o.getClass()) {
+return false;
+  }
+  final Key key = (Key) o;
+  return Objects.equals(view, key.view) && Objects.equals(window, 
key.window);
+}
+
+@Override
+public int hashCode() {
+  return Objects.hash(view, window);
+}
+  }
+
+  /** Wrapped {@link SideInputReader} which results will be cached. */
+  private final SideInputReader delegate;
+
+  /** Materialized results. */
+  pr

[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163198&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163198
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 06/Nov/18 21:06
Start Date: 06/Nov/18 21:06
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #6960: [BEAM-5987] Fix Spark 
SideInputReader performance.
URL: https://github.com/apache/beam/pull/6960#issuecomment-436408303
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 163198)
Time Spent: 40m  (was: 0.5h)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163196&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163196
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 06/Nov/18 21:01
Start Date: 06/Nov/18 21:01
Worklog Time Spent: 10m 
  Work Description: dmvk commented on issue #6960: [BEAM-5987] Fix Spark 
SideInputReader performance.
URL: https://github.com/apache/beam/pull/6960#issuecomment-436406851
 
 
   retest this


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 163196)
Time Spent: 0.5h  (was: 20m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
> Attachments: Screen Shot 2018-11-06 at 13.05.36.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163024&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163024
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 06/Nov/18 12:20
Start Date: 06/Nov/18 12:20
Worklog Time Spent: 10m 
  Work Description: dmvk commented on issue #6960: [BEAM-5987] Fix Spark 
SideInputReader performance.
URL: https://github.com/apache/beam/pull/6960#issuecomment-436233825
 
 
   Run Spark ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 163024)
Time Spent: 20m  (was: 10m)

> Spark SideInputReader performance
> -
>
> Key: BEAM-5987
> URL: https://issues.apache.org/jira/browse/BEAM-5987
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.8.0
>Reporter: David Moravek
>Assignee: David Moravek
>Priority: Major
> Fix For: 2.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We did some profiling of a spark job and 90% of the application time was 
> spent on side input deserialization.  
> !image-2018-11-06-13-06-04-350.png!
> For spark, an easy fix is to cache materialized side inputs per bundle. This 
> improved running time of the profiled job from 3 hours to 30 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance

2018-11-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163018&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163018
 ]

ASF GitHub Bot logged work on BEAM-5987:


Author: ASF GitHub Bot
Created on: 06/Nov/18 12:11
Start Date: 06/Nov/18 12:11
Worklog Time Spent: 10m 
  Work Description: dmvk opened a new pull request #6960: [BEAM-5987] Fix 
Spark SideInputReader performance.
URL: https://github.com/apache/beam/pull/6960
 
 
   We did some profiling of a spark job and 90% of the application time was 
spent on side input deserialization.  
   
   For spark, an easy fix is to cache materialized side inputs per bundle. This 
improved running time of the profiled job from 3 hours to 30 minutes.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: