[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=196229&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196229 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 08/Feb/19 14:05 Start Date: 08/Feb/19 14:05 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7091: [BEAM-5987] Cache and share materialized side inputs between Spark tasks URL: https://github.com/apache/beam/pull/7091 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196229) Time Spent: 9h 40m (was: 9.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 9h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=196228&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196228 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 08/Feb/19 14:05 Start Date: 08/Feb/19 14:05 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Cache and share materialized side inputs between Spark tasks URL: https://github.com/apache/beam/pull/7091#issuecomment-461813165 Merged! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196228) Time Spent: 9.5h (was: 9h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 9.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=196215&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196215 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 08/Feb/19 13:19 Start Date: 08/Feb/19 13:19 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454794706 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196215) Time Spent: 9h 20m (was: 9h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 9h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=193444&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193444 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 01/Feb/19 16:21 Start Date: 01/Feb/19 16:21 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-456470145 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193444) Time Spent: 8h 50m (was: 8h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 8h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=193446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193446 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 01/Feb/19 16:21 Start Date: 01/Feb/19 16:21 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-456840543 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193446) Time Spent: 9h 10m (was: 9h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 9h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=193445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193445 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 01/Feb/19 16:21 Start Date: 01/Feb/19 16:21 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-456840404 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193445) Time Spent: 9h (was: 8h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 9h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=189011&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189011 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 23/Jan/19 15:17 Start Date: 23/Jan/19 15:17 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-456840404 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 189011) Time Spent: 8.5h (was: 8h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 8.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=189012&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189012 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 23/Jan/19 15:17 Start Date: 23/Jan/19 15:17 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-456840543 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 189012) Time Spent: 8h 40m (was: 8.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 8h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188327&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188327 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 22/Jan/19 16:41 Start Date: 22/Jan/19 16:41 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-456470145 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188327) Time Spent: 8h 10m (was: 8h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 8h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188332&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188332 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 22/Jan/19 16:48 Start Date: 22/Jan/19 16:48 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-456472767 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188332) Time Spent: 8h 20m (was: 8h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 8h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188326&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188326 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 22/Jan/19 16:41 Start Date: 22/Jan/19 16:41 Worklog Time Spent: 10m Work Description: mareksimunek commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r249863813 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesn't need to deserialize them again. + * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess. + */ +class SideInputStorage { + + /** JVM deserialized side input cache. */ + private static final Cache, Optional> materializedSideInputs = Review comment: I also think you have bigger troubles if you have collision in `view` or `window`(on more places `sparkRunner` relies on that) so I will leave it as it is. Is that ok? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188326) Time Spent: 8h (was: 7h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 8h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188317&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188317 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 22/Jan/19 16:24 Start Date: 22/Jan/19 16:24 Worklog Time Spent: 10m Work Description: mareksimunek commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r249856567 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") -final Map, T> materializedCasted = (Map) materialized; -return materializedCasted.computeIfAbsent( -new Key<>(view, window), key -> delegate.get(view, window)); +final Cache, Optional> materializedCasted = +(Cache) SideInputStorage.getMaterializedSideInputs(); + +Key sideInputKey = new Key<>(view, window); + +try { + Optional optionalResult = + materializedCasted.get( + sideInputKey, + () -> { +final T result = delegate.get(view, window); +LOG.info( +"Caching de-serialized side input for {} of size [{}B] in memory.", +sideInputKey, +SizeEstimator.estimate(result)); +return Optional.ofNullable(result); + }); + return optionalResult.orElse(null); Review comment: Thanks for suggestions, Optional combo would be not very readable. I made my own wrapper where I simply wrap the value so I can put `null` into the cache. https://github.com/apache/beam/pull/7091/files#diff-b123f0f1ca9646966a641a458b74cfbcR92 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188317) Time Spent: 7h 50m (was: 7h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 7h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188313 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 22/Jan/19 16:22 Start Date: 22/Jan/19 16:22 Worklog Time Spent: 10m Work Description: mareksimunek commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r249855350 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java ## @@ -85,6 +88,13 @@ private SideInputBroadcast createBroadcastHelper( PCollectionView view, JavaSparkContext context) { Tuple2>>> tuple2 = pviews.get(view); SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2); +String pCollectionName = +view.getPCollection() != null ? view.getPCollection().getName() : "UNKNOWN"; +LOG.info( Review comment: changed to debug This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188313) Time Spent: 7.5h (was: 7h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 7.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=188314&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188314 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 22/Jan/19 16:22 Start Date: 22/Jan/19 16:22 Worklog Time Spent: 10m Work Description: mareksimunek commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r249855378 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -17,17 +17,23 @@ */ package org.apache.beam.runners.spark.util; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; +import com.google.common.cache.Cache; Review comment: fixed to vendored This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188314) Time Spent: 7h 40m (was: 7.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 7h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=186632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186632 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 17/Jan/19 23:59 Start Date: 17/Jan/19 23:59 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248886560 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesn't need to deserialize them again. + * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess. + */ +class SideInputStorage { + + /** JVM deserialized side input cache. */ + private static final Cache, Optional> materializedSideInputs = Review comment: The window should use `windowCoder.structuralValue(window)` which is required to behave identically to a full serialization for shuffle purposes. The view itself is just a tag so it should have good enough equals as-is. There is already caching in the now-donated Dataflow Java worker, if you look through uses and subclasses of `SideInputReader`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186632) Time Spent: 7h 20m (was: 7h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 7h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=186302&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186302 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 17/Jan/19 13:53 Start Date: 17/Jan/19 13:53 Worklog Time Spent: 10m Work Description: amitsela commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248677871 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesn't need to deserialize them again. + * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess. + */ +class SideInputStorage { + + /** JVM deserialized side input cache. */ + private static final Cache, Optional> materializedSideInputs = Review comment: how do other runners cache side inputs? by which key? this sounds like something the SDK could provide guidance on (@kennknowles) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186302) Time Spent: 7h 10m (was: 7h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 7h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=186126&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186126 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 17/Jan/19 02:34 Start Date: 17/Jan/19 02:34 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248521765 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") -final Map, T> materializedCasted = (Map) materialized; -return materializedCasted.computeIfAbsent( -new Key<>(view, window), key -> delegate.get(view, window)); +final Cache, Optional> materializedCasted = +(Cache) SideInputStorage.getMaterializedSideInputs(); + +Key sideInputKey = new Key<>(view, window); + +try { + Optional optionalResult = + materializedCasted.get( + sideInputKey, + () -> { +final T result = delegate.get(view, window); +LOG.info( +"Caching de-serialized side input for {} of size [{}B] in memory.", +sideInputKey, +SizeEstimator.estimate(result)); +return Optional.ofNullable(result); + }); + return optionalResult.orElse(null); Review comment: Ah, thank you for clarifying. That is a good attempt. The problem is that these will incorrectly be turned into the same thing: `Optional.ofNullable(null).orElse(null) == null` `Optional.ofNullable(Optional.absent()).orElse(null) == null` The fact that `Optional.of(null)` throws NPE is a mistake in the design (both Java and Guava). Maybe the point of the design is to convince people to not use `null`, which [is a billion dollar good idea](https://en.wikipedia.org/wiki/Null_pointer#History). But it makes `Optional` not correctly parametric in `T`. I think that if you actually convert `null` into `Optional.of(Optional.absent())` and other values `v` into `Optional.of(Optional.of(v))` you can simulate the behavior it should have had in the first place. Or you could make your own little replacement of `Optional`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186126) Time Spent: 7h (was: 6h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 7h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185878&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185878 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:55 Start Date: 16/Jan/19 16:55 Worklog Time Spent: 10m Work Description: mareksimunek commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248365302 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") -final Map, T> materializedCasted = (Map) materialized; -return materializedCasted.computeIfAbsent( -new Key<>(view, window), key -> delegate.get(view, window)); +final Cache, Optional> materializedCasted = +(Cache) SideInputStorage.getMaterializedSideInputs(); + +Key sideInputKey = new Key<>(view, window); + +try { + Optional optionalResult = + materializedCasted.get( + sideInputKey, + () -> { +final T result = delegate.get(view, window); +LOG.info( +"Caching de-serialized side input for {} of size [{}B] in memory.", +sideInputKey, +SizeEstimator.estimate(result)); +return Optional.ofNullable(result); + }); + return optionalResult.orElse(null); Review comment: I chose this solution because guava `Cache` doesn't allow null values and I didn't realize I will break semantic meaning. I will try to find out different solution. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185878) Time Spent: 6h 50m (was: 6h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 6h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185868&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185868 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:45 Start Date: 16/Jan/19 16:45 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248359016 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java ## @@ -85,6 +88,13 @@ private SideInputBroadcast createBroadcastHelper( PCollectionView view, JavaSparkContext context) { Tuple2>>> tuple2 = pviews.get(view); SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2); +String pCollectionName = +view.getPCollection() != null ? view.getPCollection().getName() : "UNKNOWN"; +LOG.info( Review comment: Maybe `LOG.debug`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185868) Time Spent: 6h 20m (was: 6h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 6h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185871&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185871 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:45 Start Date: 16/Jan/19 16:45 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248360258 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { Review comment: Ah, the meaning of that comment is that `null` is a value. You can have a `PCollection<@Nullable Foo>` that contains just one copy of `null` and use `View.asSingleton()` and the side input returns the `null`. In other words, `get` _must_ return a value of type `T`. But the type `T` may itself be `@Nullable Something`. The annotation on `SideInputReader` should be removed. It is is incorrect if we use a static analysis that understands this. Findbugs does not understand this but we should aspire for our annotations to be correct so the documentation is clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185871) Time Spent: 6h 40m (was: 6.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 6h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185869&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185869 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:45 Start Date: 16/Jan/19 16:45 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248359142 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -17,17 +17,23 @@ */ package org.apache.beam.runners.spark.util; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; +import com.google.common.cache.Cache; Review comment: Vendored Guava? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185869) Time Spent: 6.5h (was: 6h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 6.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185867&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185867 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:45 Start Date: 16/Jan/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248355041 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { Review comment: I am worried on the possible semantics consequences of `CachedSideInputReader.get()` returning a `null` value when it is not in the Cache. Wouldn't it imply that a window could get an empty side input assigned? [The documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32) on this is not really clear (pinging @kennknowles to see if I am misreading it). Wonder if there is a test to validate that this cannot happen or if we can create one somehow? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185867) Time Spent: 6h 10m (was: 6h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 6h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185870&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185870 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:45 Start Date: 16/Jan/19 16:45 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248360966 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { @SuppressWarnings("unchecked") -final Map, T> materializedCasted = (Map) materialized; -return materializedCasted.computeIfAbsent( -new Key<>(view, window), key -> delegate.get(view, window)); +final Cache, Optional> materializedCasted = +(Cache) SideInputStorage.getMaterializedSideInputs(); + +Key sideInputKey = new Key<>(view, window); + +try { + Optional optionalResult = + materializedCasted.get( + sideInputKey, + () -> { +final T result = delegate.get(view, window); +LOG.info( +"Caching de-serialized side input for {} of size [{}B] in memory.", +sideInputKey, +SizeEstimator.estimate(result)); +return Optional.ofNullable(result); + }); + return optionalResult.orElse(null); Review comment: Ismaël is right. The `delegate.get` is not `@Nullable` at this place in the abstraction. You don't need to check for it (unless there's some other bug somewhere) and you shouldn't convert `Optional.absent()` to `null`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185870) Time Spent: 6h 40m (was: 6.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 6h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185866&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185866 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:45 Start Date: 16/Jan/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248355041 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { Review comment: I am worried on the possible semantics consequences of `CachedSideInputReader.get()` returning a `null` value when it is not in the `Cache` for this. Wouldn't it imply that a window could get an empty side input assigned? [The documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32) on this is not really clear (pinging @kennknowles to see if I am misreading it). Wonder if there is a test to validate that this cannot happen or if we can create one somehow? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185866) Time Spent: 6h (was: 5h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 6h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185859&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185859 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:36 Start Date: 16/Jan/19 16:36 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248354273 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesn't need to deserialize them again. + * Side inputs are stored in {@link Cache} with 5 minutes expireAfterAccess. + */ +class SideInputStorage { + + /** JVM deserialized side input cache. */ + private static final Cache, Optional> materializedSideInputs = Review comment: I am a bit worried on the possible consequence of a collision of the `Key` tuple in particular if a bad implementation of equals is around. This is not relative to this PR but since the state is now static this makes the likelihood of this happening bigger. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185859) Time Spent: 5h 40m (was: 5.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 5h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185861&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185861 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:38 Start Date: 16/Jan/19 16:38 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248355041 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { Review comment: I am worried on the possible semantics consequences of `SideInputReader.get()` returning a `null` value when it is not in the `Cache` for this. Wouldn't it imply that a window could get an empty side input assigned? [The documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32) on this is not really clear (pinging @kennknowles to see if I am misreading it). Wonder if there is a test to validate that this cannot happen or if we can create one somehow? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185861) Time Spent: 5h 50m (was: 5h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 5h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185860&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185860 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 16:36 Start Date: 16/Jan/19 16:36 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r248355041 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -86,9 +55,27 @@ private CachedSideInputReader(SideInputReader delegate) { @Override public T get(PCollectionView view, BoundedWindow window) { Review comment: I am worried on the possible semantics consequences of get returning a `null` value when it is not in the `Cache` for this `SideInputReader` wouldn't it imply that a window could get an empty side input assigned? [The documentation](https://github.com/apache/beam/blob/6c2fe17cfdea1be1fdcfb02267894f0d37a671b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java#L32) on this is not really clear (pinging @kennknowles to see if I am misreading it). Wonder if there is a test to validate that this cannot happen or if we can create one somehow? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185860) Time Spent: 5h 40m (was: 5.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 5h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185799&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185799 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 14:18 Start Date: 16/Jan/19 14:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454795081 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185799) Time Spent: 5.5h (was: 5h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 5.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185789 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 14:08 Start Date: 16/Jan/19 14:08 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454791089 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185789) Time Spent: 5h (was: 4h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185796 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 14:17 Start Date: 16/Jan/19 14:17 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454794706 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185796) Time Spent: 5h 20m (was: 5h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 5h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185795&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185795 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 14:16 Start Date: 16/Jan/19 14:16 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454794367 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185795) Time Spent: 5h 10m (was: 5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 5h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185788&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185788 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 14:08 Start Date: 16/Jan/19 14:08 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454790500 Spark Validates Runner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185788) Time Spent: 4h 50m (was: 4h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 4h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185787 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 14:08 Start Date: 16/Jan/19 14:08 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454791089 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185787) Time Spent: 4h 40m (was: 4.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 4h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=185786&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185786 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 16/Jan/19 14:06 Start Date: 16/Jan/19 14:06 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-454790500 Spark Validates Runner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185786) Time Spent: 4.5h (was: 4h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 4.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=184265&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184265 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 11/Jan/19 17:00 Start Date: 11/Jan/19 17:00 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-453585179 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 184265) Time Spent: 4h 20m (was: 4h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 4h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=184241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184241 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 11/Jan/19 15:15 Start Date: 11/Jan/19 15:15 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-453549228 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 184241) Time Spent: 4h 10m (was: 4h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 4h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=184234&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184234 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 11/Jan/19 14:42 Start Date: 11/Jan/19 14:42 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-453538646 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 184234) Time Spent: 4h (was: 3h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 4h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181820&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181820 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 07/Jan/19 15:02 Start Date: 07/Jan/19 15:02 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-451961840 After trying several approaches we decided to keep it simple and go with expireAfterAccess to drop values from cache. Solution with weak values didn't bring desired behavior because when `MultiDoFnFunction` finished for one executor, it immediately garbage collected de-serialized side input (because it lost references due to end of life for `CachedSideInputReader`). It kept side input in cache only if there was overlapped running of multiple `MultiDoFnFunction` . In our case for one JVM it a de-serialized side input up to 10x. With [expireAfterAccess]( https://github.com/google/guava/wiki/CachesExplained#timed-eviction) side input is de-serialized only once. I chose [5 min eviction duration](https://github.com/apache/beam/pull/7091/files#diff-b123f0f1ca9646966a641a458b74cfbcR35) as best compromise but I am open to discussion if it should be configurable. Disadvantage for expireAfterAccess solution could be potential higher memory consumption if `SideInputStorage` isn't access long time so nothing can be evicted. I don't know how to recognize when `MultiDoFnFunction` is finished so I can call cache.cleanup() to trigger eviction for expired items. Also not sure if this is even a problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 181820) Time Spent: 3h 50m (was: 3h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 3h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181819&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181819 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 07/Jan/19 15:01 Start Date: 07/Jan/19 15:01 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-451961840 After trying several approaches we decided to keep it simple and go with expireAfterAccess to drop values from cache. Solution with weak values didn't bring desired behavior because when `MultiDoFnFunction` finished for one executor, it immediately garbage collected de-serialized side input (because it lost references due to end of life for `CachedSideInputReader`). It kept side input in cache only if there was overlapped running of multiple `MultiDoFnFunction` . In our case for one JVM it a de-serialized side input up to 10x. With [expireAfterAccess]( https://github.com/google/guava/wiki/CachesExplained#timed-eviction) side input is de-serialized only once. I chose 5 min eviction duration as best compromise but I am open to discussion if it should be configurable. Disadvantage for expireAfterAccess solution could be potential higher memory consumption if `SideInputStorage` isn't access long time so nothing can be evicted. I don't know how to recognize when `MultiDoFnFunction` is finished so I can call cache.cleanup() to trigger eviction for expired items. Also not sure if this is even a problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 181819) Time Spent: 3h 40m (was: 3.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 3h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181818&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181818 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 07/Jan/19 14:59 Start Date: 07/Jan/19 14:59 Worklog Time Spent: 10m Work Description: mareksimunek commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r245680983 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -17,17 +17,31 @@ */ package org.apache.beam.runners.spark.util; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; +import com.google.common.cache.Cache; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.spark.util.SideInputStorage.Key; +import org.apache.beam.runners.spark.util.SideInputStorage.Value; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.spark.util.SizeEstimator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** {@link SideInputReader} that caches materialized views. */ public class CachedSideInputReader implements SideInputReader { + private static final Logger LOG = LoggerFactory.getLogger(CachedSideInputReader.class); + + /** + * Keep references for the whole lifecycle of CachedSideInputReader otherwise sideInput needs to + * be de-serialized again. + */ + private Set sideInputReferences = new HashSet<>(); Review comment: references removed because different solution was used. [more](https://github.com/apache/beam/pull/7091#issuecomment-451961840) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 181818) Time Spent: 3.5h (was: 3h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 3.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181817&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181817 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 07/Jan/19 14:58 Start Date: 07/Jan/19 14:58 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-451961840 After trying several approaches we decided to keep it simple and go with expireAfterAccess to drop values from cache. Solution with weak values didn't bring desired behavior because when `MultiDoFnFunction` finished for one executor, it immediately garbage collected de-serialized side input (because it lost references due to end of life for `CachedSideInputReader`). It kept side input in cache only if there were running multiple `MultiDoFnFunction` overlap. In our case for one JVM it a de-serialized side input up to 10x. With [expireAfterAccess]( https://github.com/google/guava/wiki/CachesExplained#timed-eviction) side input is de-serialized only once. I chose 5 min eviction duration as best compromise but I am open to discussion if it should be configurable. Disadvantage for expireAfterAccess solution could be potential higher memory consumption if `SideInputStorage` isn't access long time so nothing can be evicted. I don't know how to recognize when `MultiDoFnFunction` is finished so I can call cache.cleanup() to trigger eviction for expired items. Also not sure if this is even a problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 181817) Time Spent: 3h 20m (was: 3h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 3h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=181792&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-181792 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 07/Jan/19 14:31 Start Date: 07/Jan/19 14:31 Worklog Time Spent: 10m Work Description: mareksimunek commented on pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r245671371 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesnt need to deserialize them again. + * Side inputs are stored in {@link Cache} with weakValues so if there is no reference to a value, + * sideInput is garbage collected. + */ +public class SideInputStorage { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 181792) Time Spent: 3h 10m (was: 3h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 3h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173515&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173515 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 10/Dec/18 11:01 Start Date: 10/Dec/18 11:01 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-445776385 It would be nice to test that this behaves as expected and does not leak (not being GCed) and does not rematerialize. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173515) Time Spent: 3h (was: 2h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 3h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173514&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173514 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 10/Dec/18 10:59 Start Date: 10/Dec/18 10:59 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r240166335 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java ## @@ -17,17 +17,31 @@ */ package org.apache.beam.runners.spark.util; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; +import com.google.common.cache.Cache; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.spark.util.SideInputStorage.Key; +import org.apache.beam.runners.spark.util.SideInputStorage.Value; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.spark.util.SizeEstimator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** {@link SideInputReader} that caches materialized views. */ public class CachedSideInputReader implements SideInputReader { + private static final Logger LOG = LoggerFactory.getLogger(CachedSideInputReader.class); + + /** + * Keep references for the whole lifecycle of CachedSideInputReader otherwise sideInput needs to + * be de-serialized again. + */ + private Set sideInputReferences = new HashSet<>(); Review comment: Aren't values ever removed from here? Or I am misreading this one, seems like it can overflow and even prevent `SideInputStorage` from being GCed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173514) Time Spent: 2h 50m (was: 2h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 2h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173508&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173508 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 10/Dec/18 10:52 Start Date: 10/Dec/18 10:52 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r240163858 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesnt need to deserialize them again. + * Side inputs are stored in {@link Cache} with weakValues so if there is no reference to a value, + * sideInput is garbage collected. + */ +public class SideInputStorage { Review comment: package private and same for constructor, make access as tight as needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173508) Time Spent: 2h 40m (was: 2.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=173507&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173507 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 10/Dec/18 10:51 Start Date: 10/Dec/18 10:51 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r240163858 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.Objects; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Cache deserialized side inputs for executor so every task doesnt need to deserialize them again. + * Side inputs are stored in {@link Cache} with weakValues so if there is no reference to a value, + * sideInput is garbage collected. + */ +public class SideInputStorage { Review comment: package private This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173507) Time Spent: 2.5h (was: 2h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 2.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=171493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171493 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 03/Dec/18 09:08 Start Date: 03/Dec/18 09:08 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-443638145 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 171493) Time Spent: 2h 20m (was: 2h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 2h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=170162&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-170162 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 28/Nov/18 09:23 Start Date: 28/Nov/18 09:23 Worklog Time Spent: 10m Work Description: mareksimunek commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-442377623 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 170162) Time Spent: 2h 10m (was: 2h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 2h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168439 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 21/Nov/18 18:15 Start Date: 21/Nov/18 18:15 Worklog Time Spent: 10m Work Description: VaclavPlajt commented on a change in pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r235493663 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ## @@ -61,6 +66,17 @@ public class MultiDoFnFunction implements PairFlatMapFunction>, TupleTag, WindowedValue> { + private static final Logger LOG = LoggerFactory.getLogger(MultiDoFnFunction.class); + + /** JVM wide side input cache. */ + private static final Map sideInputReaders = + Collections.synchronizedMap(new WeakHashMap<>()); + + /** + * Id that is consistent among executors. We can not use stepName because of possible collisions. Review comment: ok, I see. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 168439) Time Spent: 2h (was: 1h 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 2h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168327&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168327 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 21/Nov/18 15:58 Start Date: 21/Nov/18 15:58 Worklog Time Spent: 10m Work Description: dmvk commented on a change in pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r235445177 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ## @@ -61,6 +66,17 @@ public class MultiDoFnFunction implements PairFlatMapFunction>, TupleTag, WindowedValue> { + private static final Logger LOG = LoggerFactory.getLogger(MultiDoFnFunction.class); + + /** JVM wide side input cache. */ + private static final Map sideInputReaders = + Collections.synchronizedMap(new WeakHashMap<>()); + + /** + * Id that is consistent among executors. We can not use stepName because of possible collisions. Review comment: After deserialization on the executor side This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 168327) Time Spent: 1h 40m (was: 1.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 1h 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168328 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 21/Nov/18 15:59 Start Date: 21/Nov/18 15:59 Worklog Time Spent: 10m Work Description: dmvk commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-440718069 This still needs some effort as it does not handle the case when sideinput is used in different DoFns This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 168328) Time Spent: 1h 50m (was: 1h 40m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 1h 50m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168286&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168286 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 21/Nov/18 14:06 Start Date: 21/Nov/18 14:06 Worklog Time Spent: 10m Work Description: VaclavPlajt commented on a change in pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r235399531 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ## @@ -61,6 +66,17 @@ public class MultiDoFnFunction implements PairFlatMapFunction>, TupleTag, WindowedValue> { + private static final Logger LOG = LoggerFactory.getLogger(MultiDoFnFunction.class); + + /** JVM wide side input cache. */ + private static final Map sideInputReaders = + Collections.synchronizedMap(new WeakHashMap<>()); + + /** + * Id that is consistent among executors. We can not use stepName because of possible collisions. Review comment: I do not get meaning of 'consistent' here. Do you mean random even within one JVM ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 168286) Time Spent: 1h 20m (was: 1h 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 1h 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=168289&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168289 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 21/Nov/18 14:09 Start Date: 21/Nov/18 14:09 Worklog Time Spent: 10m Work Description: VaclavPlajt commented on a change in pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#discussion_r235399531 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ## @@ -61,6 +66,17 @@ public class MultiDoFnFunction implements PairFlatMapFunction>, TupleTag, WindowedValue> { + private static final Logger LOG = LoggerFactory.getLogger(MultiDoFnFunction.class); + + /** JVM wide side input cache. */ + private static final Map sideInputReaders = + Collections.synchronizedMap(new WeakHashMap<>()); + + /** + * Id that is consistent among executors. We can not use stepName because of possible collisions. Review comment: I do not get meaning of 'consistent' here. Do you mean random (most likely distinct) even within one JVM ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 168289) Time Spent: 1.5h (was: 1h 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 1.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=167885&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167885 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 20/Nov/18 17:45 Start Date: 20/Nov/18 17:45 Worklog Time Spent: 10m Work Description: dmvk commented on issue #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091#issuecomment-440366991 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167885) Time Spent: 1h 10m (was: 1h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 1h 10m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=167851&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167851 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 20/Nov/18 17:09 Start Date: 20/Nov/18 17:09 Worklog Time Spent: 10m Work Description: dmvk opened a new pull request #7091: [BEAM-5987] Spark: Share cached side inputs between tasks. URL: https://github.com/apache/beam/pull/7091 We should try to reuse deserialized side inputs among spark tasks. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 167851) Time Spent: 1h (was: 50m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163217&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163217 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 06/Nov/18 22:02 Start Date: 06/Nov/18 22:02 Worklog Time Spent: 10m Work Description: iemejia closed pull request #6960: [BEAM-5987] Fix Spark SideInputReader performance. URL: https://github.com/apache/beam/pull/6960 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index c70634c0733..4efe1e1438e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.runners.spark.util.CachedSideInputReader; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.coders.Coder; @@ -153,7 +154,7 @@ public TimerInternals timerInternals() { DoFnRunners.simpleRunner( options.get(), doFn, -new SparkSideInputReader(sideInputs), +CachedSideInputReader.of(new SparkSideInputReader(sideInputs)), outputManager, mainOutputTag, additionalOutputTags, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java new file mode 100644 index 000..49200c6c4b4 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/CachedSideInputReader.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** {@link SideInputReader} that caches materialized views. */ +public class CachedSideInputReader implements SideInputReader { + + /** + * Create a new cached {@link SideInputReader}. + * + * @param delegate wrapped reader + * @return cached reader + */ + public static CachedSideInputReader of(SideInputReader delegate) { +return new CachedSideInputReader(delegate); + } + + /** + * Composite key of {@link PCollectionView} and {@link BoundedWindow} used to identify + * materialized results. + * + * @param type of result + */ + private static class Key { + +private final PCollectionView view; +private final BoundedWindow window; + +Key(PCollectionView view, BoundedWindow window) { + this.view = view; + this.window = window; +} + +@Override +public boolean equals(Object o) { + if (this == o) { +return true; + } + if (o == null || getClass() != o.getClass()) { +return false; + } + final Key key = (Key) o; + return Objects.equals(view, key.view) && Objects.equals(window, key.window); +} + +@Override +public int hashCode() { + return Objects.hash(view, window); +} + } + + /** Wrapped {@link SideInputReader} which results will be cached. */ + private final SideInputReader delegate; + + /** Materialized results. */ + pr
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163198&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163198 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 06/Nov/18 21:06 Start Date: 06/Nov/18 21:06 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6960: [BEAM-5987] Fix Spark SideInputReader performance. URL: https://github.com/apache/beam/pull/6960#issuecomment-436408303 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 163198) Time Spent: 40m (was: 0.5h) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 40m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163196&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163196 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 06/Nov/18 21:01 Start Date: 06/Nov/18 21:01 Worklog Time Spent: 10m Work Description: dmvk commented on issue #6960: [BEAM-5987] Fix Spark SideInputReader performance. URL: https://github.com/apache/beam/pull/6960#issuecomment-436406851 retest this This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 163196) Time Spent: 0.5h (was: 20m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Attachments: Screen Shot 2018-11-06 at 13.05.36.png > > Time Spent: 0.5h > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163024&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163024 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 06/Nov/18 12:20 Start Date: 06/Nov/18 12:20 Worklog Time Spent: 10m Work Description: dmvk commented on issue #6960: [BEAM-5987] Fix Spark SideInputReader performance. URL: https://github.com/apache/beam/pull/6960#issuecomment-436233825 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 163024) Time Spent: 20m (was: 10m) > Spark SideInputReader performance > - > > Key: BEAM-5987 > URL: https://issues.apache.org/jira/browse/BEAM-5987 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.8.0 >Reporter: David Moravek >Assignee: David Moravek >Priority: Major > Fix For: 2.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > We did some profiling of a spark job and 90% of the application time was > spent on side input deserialization. > !image-2018-11-06-13-06-04-350.png! > For spark, an easy fix is to cache materialized side inputs per bundle. This > improved running time of the profiled job from 3 hours to 30 minutes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5987) Spark SideInputReader performance
[ https://issues.apache.org/jira/browse/BEAM-5987?focusedWorklogId=163018&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163018 ] ASF GitHub Bot logged work on BEAM-5987: Author: ASF GitHub Bot Created on: 06/Nov/18 12:11 Start Date: 06/Nov/18 12:11 Worklog Time Spent: 10m Work Description: dmvk opened a new pull request #6960: [BEAM-5987] Fix Spark SideInputReader performance. URL: https://github.com/apache/beam/pull/6960 We did some profiling of a spark job and 90% of the application time was spent on side input deserialization. For spark, an easy fix is to cache materialized side inputs per bundle. This improved running time of the profiled job from 3 hours to 30 minutes. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: