[ 
https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236835&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236835
 ]

ASF GitHub Bot logged work on BEAM-5775:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/19 12:43
            Start Date: 03/May/19 12:43
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on issue #8371: [BEAM-5775] Move 
(most) of the batch spark pipelines' transformations to using lazy 
serialization.
URL: https://github.com/apache/beam/pull/8371#issuecomment-489081256
 
 
   Could you notice any performance improvement with this PR? I like it for 
consistency, but I have found improvements and regressions depending on the 
pipelines.
   
   I also have a weird issue with this one @mikekap I was running nexmark to 
see if I could find considerable improvements due to this PR, but when I invoke 
it multiple times it fails, curiously this does not happen for example with 
current master. Would you mind to take a look to see if maybe is some 
configuration, it is strange.
   
   ```bash
   ./gradlew :beam-sdks-java-nexmark:run \
       -Pnexmark.runner=":beam-runners-spark" \
       -Pnexmark.args="
           --runner=SparkRunner
           --suite=SMOKE
           --streamTimeout=60
           --streaming=false
           --manageResources=false
           --monitorJobs=true"
   ```
   The exception log
   
   ```
   
==========================================================================================
   Run started 2019-05-03T12:36:02.235Z and ran for PT42.696S
   
   Default configuration:
   
{"debug":true,"query":null,"sourceType":"DIRECT","sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1}
   
   Configurations:
     Conf  Description
     0000  query:PASSTHROUGH; streamTimeout:60
     0001  query:CURRENCY_CONVERSION; streamTimeout:60
     0002  query:SELECTION; streamTimeout:60
     0003  query:LOCAL_ITEM_SUGGESTION; streamTimeout:60
     0004  query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:60
     0005  query:HOT_ITEMS; streamTimeout:60
     0006  query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000; 
streamTimeout:60
     0007  query:HIGHEST_BID; streamTimeout:60
     0008  query:MONITOR_NEW_USERS; streamTimeout:60
     0009  query:WINNING_BIDS; numEvents:10000; streamTimeout:60
     0010  query:LOG_TO_SHARDED_FILES; streamTimeout:60
     0011  query:USER_SESSIONS; streamTimeout:60
     0012  query:PROCESSING_TIME_WINDOWS; streamTimeout:60
     0013  query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60
     0014  query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60
   
   Performance:
     Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results 
   (Baseline)
     0000           1.5                     66093.9                      100000 
             
     0001           0.8                    130378.1                       92000 
             
     0002           0.3                    325732.9                         351 
             
     0003           2.3                     43327.6                         580 
             
     0004           0.9                     11402.5                          40 
             
     0005           2.0                     48947.6                          12 
             
     0006           0.5                     19267.8                         103 
             
     0007           2.4                     40833.0                           1 
             
     0008           1.6                     63775.5                        6000 
             
     0009           0.5                     20120.7                         298 
             
     0010           0.9                    114155.3                           1 
             
     0011           1.0                     97371.0                        1919 
             
     0012           0.9                    109769.5                        1919 
             
     0013           0.4                    269541.8                       92000 
             
     0014  *** not run ***
   
==========================================================================================
   
   2019-05-03T12:36:44.932Z Generating 100000 events in batch mode
   19/05/03 14:36:47 ERROR org.apache.spark.SparkContext: Error initializing 
SparkContext.
   java.lang.IllegalStateException: failed to create a child event loop
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
           at 
io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
           at 
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:77)
           at 
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:72)
           at 
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:59)
           at 
org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:50)
           at 
org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
           at 
org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:75)
           at 
org.apache.spark.network.TransportContext.createServer(TransportContext.java:114)
           at 
org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(NettyBlockTransferService.scala:83)
           at 
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
           at 
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
           at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2269)
           at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
           at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2261)
           at 
org.apache.spark.network.netty.NettyBlockTransferService.createServer(NettyBlockTransferService.scala:87)
           at 
org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:75)
           at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:233)
           at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
           at 
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
           at 
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98)
           at 
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64)
           at 
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213)
           at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89)
           at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
           at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
           at 
org.apache.beam.sdk.nexmark.NexmarkLauncher.run(NexmarkLauncher.java:1177)
           at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:90)
           at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:79)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: io.netty.channel.ChannelException: failed to open a new selector
           at 
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
           at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:149)
           at 
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:127)
           at 
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36)
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
           ... 36 more
   Caused by: java.io.IOException: Too many open files
           at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)
           at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130)
           at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69)
           at 
sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
           at 
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
           ... 40 more
   19/05/03 14:36:47 WARN org.apache.spark.metrics.MetricsSystem: Stopping a 
MetricsSystem that is not running
   
   
==========================================================================================
   Run started 2019-05-03T12:36:02.235Z and ran for PT45.018S
   
   Default configuration:
   
{"debug":true,"query":null,"sourceType":"DIRECT","sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1}
   
   Configurations:
     Conf  Description
     0000  query:PASSTHROUGH; streamTimeout:60
   Exception in thread "main"   0001  query:CURRENCY_CONVERSION; 
streamTimeout:60
     0002  query:SELECTION; streamTimeout:60
     0003  query:LOCAL_ITEM_SUGGESTION; streamTimeout:60
     0004  query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:60
     0005  query:HOT_ITEMS; streamTimeout:60
     0006  query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000; 
streamTimeout:60
     0007  query:HIGHEST_BID; streamTimeout:60
     0008  query:MONITOR_NEW_USERS; streamTimeout:60
     0009  query:WINNING_BIDS; numEvents:10000; streamTimeout:60
     0010  query:LOG_TO_SHARDED_FILES; streamTimeout:60
     0011  query:USER_SESSIONS; streamTimeout:60
   java.lang.RuntimeException: java.lang.IllegalStateException: failed to 
create a child event loop
     0012  query:PROCESSING_TIME_WINDOWS; streamTimeout:60
           at org.apache.beam.sdk.nexmark.Main.runAll(Main.java:128)
           at org.apache.beam.sdk.nexmark.Main.main(Main.java:415)
   Caused by: java.lang.IllegalStateException: failed to create a child event 
loop
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
     0013  query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
     0014  query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60
   
   Performance:
           at 
io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
           at 
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:77)
           at 
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:72)
           at 
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:59)
           at 
org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:50)
           at 
org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
           at 
org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:75)
           at 
org.apache.spark.network.TransportContext.createServer(TransportContext.java:114)
     Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results 
   (Baseline)
           at 
org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(NettyBlockTransferService.scala:83)
     0000           1.5                     66093.9                      100000 
             
           at 
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
     0001           0.8                    130378.1                       92000 
             
           at 
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
     0002           0.3                    325732.9                         351 
             
           at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2269)
           at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
           at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2261)
     0003           2.3                     43327.6                         580 
             
           at 
org.apache.spark.network.netty.NettyBlockTransferService.createServer(NettyBlockTransferService.scala:87)
     0004           0.9                     11402.5                          40 
             
           at 
org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:75)
     0005           2.0                     48947.6                          12 
             
           at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:233)
           at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
     0006           0.5                     19267.8                         103 
             
           at 
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
     0007           2.4                     40833.0                           1 
             
           at 
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98)
     0008           1.6                     63775.5                        6000 
             
           at 
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64)
           at 
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213)
     0009           0.5                     20120.7                         298 
             
           at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89)
           at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
           at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
     0010           0.9                    114155.3                           1 
             
           at 
org.apache.beam.sdk.nexmark.NexmarkLauncher.run(NexmarkLauncher.java:1177)
           at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:90)
           at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:79)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     0011           1.0                     97371.0                        1919 
             
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     0012           0.9                    109769.5                        1919 
             
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     0013           0.4                    269541.8                       92000 
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          
     0014  *** not run ***
           at java.lang.Thread.run(Thread.java:748)
   Caused by: io.netty.channel.ChannelException: failed to open a new selector
           at 
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
           at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:149)
   
==========================================================================================
   
           at 
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:127)
           at 
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36)
           at 
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
           ... 36 more
   Caused by: java.io.IOException: Too many open files
           at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)
           at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130)
           at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69)
           at 
sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
           at 
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
           ... 40 more
   
   > Task :beam-sdks-java-nexmark:run FAILED
   
   FAILURE: Build failed with an exception.
   ```
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 236835)
    Time Spent: 10h 50m  (was: 10h 40m)

> Make the spark runner not serialize data unless spark is spilling to disk
> -------------------------------------------------------------------------
>
>                 Key: BEAM-5775
>                 URL: https://issues.apache.org/jira/browse/BEAM-5775
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Mike Kaplinskiy
>            Assignee: Mike Kaplinskiy
>            Priority: Minor
>             Fix For: 2.13.0
>
>          Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. 
> This lets Spark keep the data in memory avoiding the serialization round 
> trip. Unfortunately the logic is fairly coarse - as soon as you switch to 
> MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen 
> to keep the data in memory, incurring the serialization overhead.
>  
> Ideally Beam would serialize the data lazily - as Spark chooses to spill to 
> disk. This would be a change in behavior when using beam, but luckily Spark 
> has a solution for folks that want data serialized in memory - 
> MEMORY_AND_DISK_SER will keep the data serialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to