Getting null pointer exception in a basic setup, don't know why
nsformMatchers.java:274) at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:289) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:587) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468) at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:267) at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:217) at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:118) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:92) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:45) at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:18) at picocli.CommandLine.executeUserObject(CommandLine.java:1853) at picocli.CommandLine.access$1100(CommandLine.java:145) at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2243) at picocli.CommandLine$RunLast.handle(CommandLine.java:2237) at picocli.CommandLine$RunLast.handle(CommandLine.java:2201) at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2068) at picocli.CommandLine.execute(CommandLine.java:1978) at no.spaeren.thesis.App.main(App.java:24) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136) at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:184) at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136) Any help with this would be greatly appriciated. Best regards, Teodor Spæren
Re: Help measuring upcoming performance increase in flink runner on production systems
Hey! My option is not default as of now, since it can break pipelines which rely on the faulty flink implementation. I'm creating my own benchmarks locally and will run against those, but the idea of adding it to the official benchmark runs sounds interesting, thanks for bringing it up! Teodor On Tue, Dec 15, 2020 at 06:51:38PM -0800, Ahmet Altay wrote: Hi Teodor, Thank you for working on this. If I remember correctly, there were some opportunities to improve in the previous paper (e.g. not focusing deprecated runners, long running benchmarks, varying data sizes). And I am excited that you are keeping the community as part of your research process and we will be happy to help you where we can. Related to your question. Was the new option used by default? If that is the case you will probably see its impact on the metrics dashboard [1]. And if it is not on by default, you can add your variant as a new benchmark and compare the difference across many runs in a controlled benchmarking environment. Would that help? Ahmet [1] http://metrics.beam.apache.org/d/1/getting-started?orgId=1 On Tue, Dec 15, 2020 at 5:48 AM Teodor Spæren wrote: Hey! Yeah, that paper was what prompted my master thesis! I definitivly will post here, once I get more data :) Teodor On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote: >Hi Teodor, > >Although I’m sure you’ve come across it, this might have some valuable resources or methodologies to consider as you explore this a bit more: > >https://arxiv.org/pdf/1907.08302.pdf > >I’m looking forward to reading about your finding, especially using a more recent iteration of Beam! > >Rion > >> On Dec 14, 2020, at 6:37 AM, Teodor Spæren wrote: >> >> Just bumping this so people see it now that 2.26.0 is out :) >> >>> On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote: >>> Hey! >>> >>> My name is Teodor Spæren and I'm writing a master thesis investigating the performance overhead of using Beam instead of using the underlying systems directly. My focus has been on Flink and I've made a discovery about some unnecessary copying between operators in the Flink runner[1][2]. I wrote a fixed for this and it got accepted and merged, >>> and will be in the upcoming 2.26.0 release[3]. >>> >>> I'm writing this email to ask if anyone on these mailing lists would be willing to send me some result of applying this option when the new version of beam releases. Anything will be very much appreciated, stories, screenshots of performance monitoring before and after, hard numbers, anything! If you include the cluster size and the workload that would be awesome too! My master thesis is set to be complete the coming summer, so there is no real hurry :) >>> >>> The thesis will be freely accessible[4] and I hope that these findings will be of help to the beam community. If anyone wishes to submit stories, but remain anonymous that is also ok :) >>> >>> The best way to contact me would be to send an email my way here, or on teod...@mail.uio.no. >>> >>> Any help is appreciated, thanks for your attention! >>> >>> Best regards, >>> Teodor Spæren >>> >>> >>> [1]: https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E >>> [2]: https://issues.apache.org/jira/browse/BEAM-11146 >>> [3]: https://github.com/apache/beam/pull/13240 >>> [4]: https://www.duo.uio.no/
Re: Help measuring upcoming performance increase in flink runner on production systems
Hey! Yeah, that paper was what prompted my master thesis! I definitivly will post here, once I get more data :) Teodor On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote: Hi Teodor, Although I’m sure you’ve come across it, this might have some valuable resources or methodologies to consider as you explore this a bit more: https://arxiv.org/pdf/1907.08302.pdf I’m looking forward to reading about your finding, especially using a more recent iteration of Beam! Rion On Dec 14, 2020, at 6:37 AM, Teodor Spæren wrote: Just bumping this so people see it now that 2.26.0 is out :) On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote: Hey! My name is Teodor Spæren and I'm writing a master thesis investigating the performance overhead of using Beam instead of using the underlying systems directly. My focus has been on Flink and I've made a discovery about some unnecessary copying between operators in the Flink runner[1][2]. I wrote a fixed for this and it got accepted and merged, and will be in the upcoming 2.26.0 release[3]. I'm writing this email to ask if anyone on these mailing lists would be willing to send me some result of applying this option when the new version of beam releases. Anything will be very much appreciated, stories, screenshots of performance monitoring before and after, hard numbers, anything! If you include the cluster size and the workload that would be awesome too! My master thesis is set to be complete the coming summer, so there is no real hurry :) The thesis will be freely accessible[4] and I hope that these findings will be of help to the beam community. If anyone wishes to submit stories, but remain anonymous that is also ok :) The best way to contact me would be to send an email my way here, or on teod...@mail.uio.no. Any help is appreciated, thanks for your attention! Best regards, Teodor Spæren [1]: https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E [2]: https://issues.apache.org/jira/browse/BEAM-11146 [3]: https://github.com/apache/beam/pull/13240 [4]: https://www.duo.uio.no/
Re: Help measuring upcoming performance increase in flink runner on production systems
Just bumping this so people see it now that 2.26.0 is out :) On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote: Hey! My name is Teodor Spæren and I'm writing a master thesis investigating the performance overhead of using Beam instead of using the underlying systems directly. My focus has been on Flink and I've made a discovery about some unnecessary copying between operators in the Flink runner[1][2]. I wrote a fixed for this and it got accepted and merged, and will be in the upcoming 2.26.0 release[3]. I'm writing this email to ask if anyone on these mailing lists would be willing to send me some result of applying this option when the new version of beam releases. Anything will be very much appreciated, stories, screenshots of performance monitoring before and after, hard numbers, anything! If you include the cluster size and the workload that would be awesome too! My master thesis is set to be complete the coming summer, so there is no real hurry :) The thesis will be freely accessible[4] and I hope that these findings will be of help to the beam community. If anyone wishes to submit stories, but remain anonymous that is also ok :) The best way to contact me would be to send an email my way here, or on teod...@mail.uio.no. Any help is appreciated, thanks for your attention! Best regards, Teodor Spæren [1]: https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E [2]: https://issues.apache.org/jira/browse/BEAM-11146 [3]: https://github.com/apache/beam/pull/13240 [4]: https://www.duo.uio.no/
Help measuring upcoming performance increase in flink runner on production systems
Hey! My name is Teodor Spæren and I'm writing a master thesis investigating the performance overhead of using Beam instead of using the underlying systems directly. My focus has been on Flink and I've made a discovery about some unnecessary copying between operators in the Flink runner[1][2]. I wrote a fixed for this and it got accepted and merged, and will be in the upcoming 2.26.0 release[3]. I'm writing this email to ask if anyone on these mailing lists would be willing to send me some result of applying this option when the new version of beam releases. Anything will be very much appreciated, stories, screenshots of performance monitoring before and after, hard numbers, anything! If you include the cluster size and the workload that would be awesome too! My master thesis is set to be complete the coming summer, so there is no real hurry :) The thesis will be freely accessible[4] and I hope that these findings will be of help to the beam community. If anyone wishes to submit stories, but remain anonymous that is also ok :) The best way to contact me would be to send an email my way here, or on teod...@mail.uio.no. Any help is appreciated, thanks for your attention! Best regards, Teodor Spæren [1]: https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E [2]: https://issues.apache.org/jira/browse/BEAM-11146 [3]: https://github.com/apache/beam/pull/13240 [4]: https://www.duo.uio.no/
Design rational behind copying via serializing in flink runner
Hey! First time posting to a mailing list, hope I did it correctly :) I'm writing a master thesis at the University of Oslo and right now I'm looking at the performance overhead of using Beam with the Flink runnner versus plain Flink. I've written a simple program, a custom source outputing 0, 1, 2, 3, up to N, going into a single identity operator and then int a filter which only matches N and prints that out. This is just to compare performance. I've been doing some profiling of simple programs and one observation is the performance difference in the serialization. The hotspot is [1], which is used multiple places, but one place is [2], which is called from [3]. As far as I can tell, [1] seems to be implementing copying by first serializing and then deserializing and there are no way for the actual types to change this. In flink, you have control over the copy() method, like in [4] and so for certain types you can just do a simple return as you do here. My queston is if I've understood the flow correctly so far and if so what the reason for doing it this way. Is it to avoid demanding that the type implement some type of cloning? And would it be possible to push this downward in the stack and allow the encoders to do define the copy schemantics? I'm willing to do the work here, just want to know if it would work on an arcitectural level. If there is any known overheads of using beam that you would like to point out, I would love to hear about it. Best regards, Teodor Spæren [1]: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140 [2]: https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85 [3]: https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85 [4]: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53