[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236835&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236835 ]
ASF GitHub Bot logged work on BEAM-5775: ---------------------------------------- Author: ASF GitHub Bot Created on: 03/May/19 12:43 Start Date: 03/May/19 12:43 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-489081256 Could you notice any performance improvement with this PR? I like it for consistency, but I have found improvements and regressions depending on the pipelines. I also have a weird issue with this one @mikekap I was running nexmark to see if I could find considerable improvements due to this PR, but when I invoke it multiple times it fails, curiously this does not happen for example with current master. Would you mind to take a look to see if maybe is some configuration, it is strange. ```bash ./gradlew :beam-sdks-java-nexmark:run \ -Pnexmark.runner=":beam-runners-spark" \ -Pnexmark.args=" --runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true" ``` The exception log ``` ========================================================================================== Run started 2019-05-03T12:36:02.235Z and ran for PT42.696S Default configuration: {"debug":true,"query":null,"sourceType":"DIRECT","sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1} Configurations: Conf Description 0000 query:PASSTHROUGH; streamTimeout:60 0001 query:CURRENCY_CONVERSION; streamTimeout:60 0002 query:SELECTION; streamTimeout:60 0003 query:LOCAL_ITEM_SUGGESTION; streamTimeout:60 0004 query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:60 0005 query:HOT_ITEMS; streamTimeout:60 0006 query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000; streamTimeout:60 0007 query:HIGHEST_BID; streamTimeout:60 0008 query:MONITOR_NEW_USERS; streamTimeout:60 0009 query:WINNING_BIDS; numEvents:10000; streamTimeout:60 0010 query:LOG_TO_SHARDED_FILES; streamTimeout:60 0011 query:USER_SESSIONS; streamTimeout:60 0012 query:PROCESSING_TIME_WINDOWS; streamTimeout:60 0013 query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60 0014 query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60 Performance: Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results (Baseline) 0000 1.5 66093.9 100000 0001 0.8 130378.1 92000 0002 0.3 325732.9 351 0003 2.3 43327.6 580 0004 0.9 11402.5 40 0005 2.0 48947.6 12 0006 0.5 19267.8 103 0007 2.4 40833.0 1 0008 1.6 63775.5 6000 0009 0.5 20120.7 298 0010 0.9 114155.3 1 0011 1.0 97371.0 1919 0012 0.9 109769.5 1919 0013 0.4 269541.8 92000 0014 *** not run *** ========================================================================================== 2019-05-03T12:36:44.932Z Generating 100000 events in batch mode 19/05/03 14:36:47 ERROR org.apache.spark.SparkContext: Error initializing SparkContext. java.lang.IllegalStateException: failed to create a child event loop at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88) at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58) at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47) at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59) at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:77) at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:72) at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:59) at org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:50) at org.apache.spark.network.server.TransportServer.init(TransportServer.java:95) at org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:75) at org.apache.spark.network.TransportContext.createServer(TransportContext.java:114) at org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(NettyBlockTransferService.scala:83) at org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87) at org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2269) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2261) at org.apache.spark.network.netty.NettyBlockTransferService.createServer(NettyBlockTransferService.scala:87) at org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:75) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:233) at org.apache.spark.SparkContext.<init>(SparkContext.scala:510) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) at org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98) at org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64) at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213) at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at org.apache.beam.sdk.nexmark.NexmarkLauncher.run(NexmarkLauncher.java:1177) at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:90) at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:79) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.channel.ChannelException: failed to open a new selector at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175) at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:149) at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:127) at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36) at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84) ... 36 more Caused by: java.io.IOException: Too many open files at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method) at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130) at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69) at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173) ... 40 more 19/05/03 14:36:47 WARN org.apache.spark.metrics.MetricsSystem: Stopping a MetricsSystem that is not running ========================================================================================== Run started 2019-05-03T12:36:02.235Z and ran for PT45.018S Default configuration: {"debug":true,"query":null,"sourceType":"DIRECT","sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1} Configurations: Conf Description 0000 query:PASSTHROUGH; streamTimeout:60 Exception in thread "main" 0001 query:CURRENCY_CONVERSION; streamTimeout:60 0002 query:SELECTION; streamTimeout:60 0003 query:LOCAL_ITEM_SUGGESTION; streamTimeout:60 0004 query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:60 0005 query:HOT_ITEMS; streamTimeout:60 0006 query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000; streamTimeout:60 0007 query:HIGHEST_BID; streamTimeout:60 0008 query:MONITOR_NEW_USERS; streamTimeout:60 0009 query:WINNING_BIDS; numEvents:10000; streamTimeout:60 0010 query:LOG_TO_SHARDED_FILES; streamTimeout:60 0011 query:USER_SESSIONS; streamTimeout:60 java.lang.RuntimeException: java.lang.IllegalStateException: failed to create a child event loop 0012 query:PROCESSING_TIME_WINDOWS; streamTimeout:60 at org.apache.beam.sdk.nexmark.Main.runAll(Main.java:128) at org.apache.beam.sdk.nexmark.Main.main(Main.java:415) Caused by: java.lang.IllegalStateException: failed to create a child event loop at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88) at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58) 0013 query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60 at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47) 0014 query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60 Performance: at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59) at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:77) at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:72) at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:59) at org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:50) at org.apache.spark.network.server.TransportServer.init(TransportServer.java:95) at org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:75) at org.apache.spark.network.TransportContext.createServer(TransportContext.java:114) Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results (Baseline) at org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(NettyBlockTransferService.scala:83) 0000 1.5 66093.9 100000 at org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87) 0001 0.8 130378.1 92000 at org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87) 0002 0.3 325732.9 351 at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2269) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2261) 0003 2.3 43327.6 580 at org.apache.spark.network.netty.NettyBlockTransferService.createServer(NettyBlockTransferService.scala:87) 0004 0.9 11402.5 40 at org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:75) 0005 2.0 48947.6 12 at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:233) at org.apache.spark.SparkContext.<init>(SparkContext.scala:510) 0006 0.5 19267.8 103 at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) 0007 2.4 40833.0 1 at org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98) 0008 1.6 63775.5 6000 at org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64) at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213) 0009 0.5 20120.7 298 at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) 0010 0.9 114155.3 1 at org.apache.beam.sdk.nexmark.NexmarkLauncher.run(NexmarkLauncher.java:1177) at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:90) at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:79) at java.util.concurrent.FutureTask.run(FutureTask.java:266) 0011 1.0 97371.0 1919 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) 0012 0.9 109769.5 1919 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 0013 0.4 269541.8 92000 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 0014 *** not run *** at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.channel.ChannelException: failed to open a new selector at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175) at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:149) ========================================================================================== at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:127) at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36) at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84) ... 36 more Caused by: java.io.IOException: Too many open files at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method) at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130) at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69) at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173) ... 40 more > Task :beam-sdks-java-nexmark:run FAILED FAILURE: Build failed with an exception. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 236835) Time Spent: 10h 50m (was: 10h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > ------------------------------------------------------------------------- > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark > Reporter: Mike Kaplinskiy > Assignee: Mike Kaplinskiy > Priority: Minor > Fix For: 2.13.0 > > Time Spent: 10h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)