Getting null pointer exception in a basic setup, don't know why

2021-05-06 Thread Teodor Spæren
nsformMatchers.java:274)
at 
org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:289)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:587)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:579)
at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:239)
at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:468)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:267)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:217)
at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:118)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:92)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:45)
at no.spaeren.thesis.benchmarks.beam.BeamSimple.call(BeamSimple.java:18)
at picocli.CommandLine.executeUserObject(CommandLine.java:1853)
at picocli.CommandLine.access$1100(CommandLine.java:145)
at 
picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2243)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2237)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2201)
at 
picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2068)
at picocli.CommandLine.execute(CommandLine.java:1978)
at no.spaeren.thesis.App.main(App.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)

at 
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:184)
at org.apache.flink.client.cli.CliFrontend.info(CliFrontend.java:366)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1067)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)


Any help with this would be greatly appriciated.

Best regards,
Teodor Spæren


Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-21 Thread Teodor Spæren
Hey! My option is not default as of now, since it can break pipelines 
which rely on the faulty flink implementation. I'm creating my own 
benchmarks locally and will run against those, but the idea of adding it 
to the official benchmark runs sounds interesting, thanks for bringing 
it up!


Teodor

On Tue, Dec 15, 2020 at 06:51:38PM -0800, Ahmet Altay wrote:

Hi Teodor,

Thank you for working on this. If I remember correctly, there were some
opportunities to improve in the previous paper (e.g. not focusing
deprecated runners, long running benchmarks, varying data sizes). And I am
excited that you are keeping the community as part of your research process
and we will be happy to help you where we can.

Related to your question. Was the new option used by default? If that
is the case you will probably see its impact on the metrics dashboard [1].
And if it is not on by default, you can add your variant as a new benchmark
and compare the difference across many runs in a controlled benchmarking
environment. Would that help?

Ahmet

[1] http://metrics.beam.apache.org/d/1/getting-started?orgId=1


On Tue, Dec 15, 2020 at 5:48 AM Teodor Spæren 
wrote:


Hey!

Yeah, that paper was what prompted my master thesis! I definitivly will
post here, once I get more data :)

Teodor

On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote:
>Hi Teodor,
>
>Although I’m sure you’ve come across it, this might have some valuable
resources or methodologies to consider as you explore this a bit more:
>
>https://arxiv.org/pdf/1907.08302.pdf
>
>I’m looking forward to reading about your finding, especially using a
more recent iteration of Beam!
>
>Rion
>
>> On Dec 14, 2020, at 6:37 AM, Teodor Spæren 
wrote:
>>
>> Just bumping this so people see it now that 2.26.0 is out :)
>>
>>> On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:
>>> Hey!
>>>
>>> My name is Teodor Spæren and I'm writing a master thesis investigating
the performance overhead of using Beam instead of using the underlying
systems directly. My focus has been on Flink and I've made a discovery
about some unnecessary copying between operators in the Flink runner[1][2].
I wrote a fixed for this and it got accepted and merged,
>>> and will be in the upcoming 2.26.0 release[3].
>>>
>>> I'm writing this email to ask if anyone on these mailing lists would
be willing to send me some result of applying this option when the new
version of beam releases. Anything will be very much appreciated, stories,
screenshots of performance monitoring before and after, hard numbers,
anything! If you include the cluster size and the workload that would be
awesome too! My master thesis is set to be complete the coming summer, so
there is no real hurry :)
>>>
>>> The thesis will be freely accessible[4] and I hope that these findings
will be of help to the beam community. If anyone wishes to submit stories,
but remain anonymous that is also ok :)
>>>
>>> The best way to contact me would be to send an email my way here, or
on teod...@mail.uio.no.
>>>
>>> Any help is appreciated, thanks for your attention!
>>>
>>> Best regards,
>>> Teodor Spæren
>>>
>>>
>>> [1]:
https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
>>> [2]: https://issues.apache.org/jira/browse/BEAM-11146
>>> [3]: https://github.com/apache/beam/pull/13240
>>> [4]: https://www.duo.uio.no/



Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-15 Thread Teodor Spæren

Hey!

Yeah, that paper was what prompted my master thesis! I definitivly will 
post here, once I get more data :)


Teodor

On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote:

Hi Teodor,

Although I’m sure you’ve come across it, this might have some valuable 
resources or methodologies to consider as you explore this a bit more:

https://arxiv.org/pdf/1907.08302.pdf

I’m looking forward to reading about your finding, especially using a more 
recent iteration of Beam!

Rion


On Dec 14, 2020, at 6:37 AM, Teodor Spæren  wrote:

Just bumping this so people see it now that 2.26.0 is out :)


On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:
Hey!

My name is Teodor Spæren and I'm writing a master thesis investigating the 
performance overhead of using Beam instead of using the underlying systems 
directly. My focus has been on Flink and I've made a discovery about some 
unnecessary copying between operators in the Flink runner[1][2]. I wrote a 
fixed for this and it got accepted and merged,
and will be in the upcoming 2.26.0 release[3].

I'm writing this email to ask if anyone on these mailing lists would be willing 
to send me some result of applying this option when the new version of beam 
releases. Anything will be very much appreciated, stories, screenshots of 
performance monitoring before and after, hard numbers, anything! If you include 
the cluster size and the workload that would be awesome too! My master thesis 
is set to be complete the coming summer, so there is no real hurry :)

The thesis will be freely accessible[4] and I hope that these findings will be 
of help to the beam community. If anyone wishes to submit stories, but remain 
anonymous that is also ok :)

The best way to contact me would be to send an email my way here, or on 
teod...@mail.uio.no.

Any help is appreciated, thanks for your attention!

Best regards,
Teodor Spæren


[1]: 
https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
[2]: https://issues.apache.org/jira/browse/BEAM-11146
[3]: https://github.com/apache/beam/pull/13240
[4]: https://www.duo.uio.no/


Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-14 Thread Teodor Spæren

Just bumping this so people see it now that 2.26.0 is out :)

On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:

Hey!

My name is Teodor Spæren and I'm writing a master thesis investigating 
the performance overhead of using Beam instead of using the underlying 
systems directly. My focus has been on Flink and I've made a discovery 
about some unnecessary copying between operators in the Flink 
runner[1][2]. I wrote a fixed for this and it got accepted and merged,

and will be in the upcoming 2.26.0 release[3].

I'm writing this email to ask if anyone on these mailing lists would 
be willing to send me some result of applying this option when the new 
version of beam releases. Anything will be very much appreciated, 
stories, screenshots of performance monitoring before and after, hard 
numbers, anything! If you include the cluster size and the workload 
that would be awesome too! My master thesis is set to be complete the 
coming summer, so there is no real hurry :)


The thesis will be freely accessible[4] and I hope that these findings 
will be of help to the beam community. If anyone wishes to submit 
stories, but remain anonymous that is also ok :)


The best way to contact me would be to send an email my way here, or 
on teod...@mail.uio.no.


Any help is appreciated, thanks for your attention!

Best regards,
Teodor Spæren


[1]: 
https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
[2]: https://issues.apache.org/jira/browse/BEAM-11146
[3]: https://github.com/apache/beam/pull/13240
[4]: https://www.duo.uio.no/


Help measuring upcoming performance increase in flink runner on production systems

2020-11-25 Thread Teodor Spæren

Hey!

My name is Teodor Spæren and I'm writing a master thesis investigating 
the performance overhead of using Beam instead of using the underlying 
systems directly. My focus has been on Flink and I've made a discovery 
about some unnecessary copying between operators in the Flink 
runner[1][2]. I wrote a fixed for this and it got accepted and merged,

and will be in the upcoming 2.26.0 release[3].

I'm writing this email to ask if anyone on these mailing lists would be 
willing to send me some result of applying this option when the new 
version of beam releases. Anything will be very much appreciated, 
stories, screenshots of performance monitoring before and after, hard 
numbers, anything! If you include the cluster size and the workload that 
would be awesome too! My master thesis is set to be complete the coming 
summer, so there is no real hurry :)


The thesis will be freely accessible[4] and I hope that these 
findings will be of help to the beam community. If anyone wishes to 
submit stories, but remain anonymous that is also ok :)


The best way to contact me would be to send an email my way here, or on 
teod...@mail.uio.no.


Any help is appreciated, thanks for your attention!

Best regards,
Teodor Spæren


[1]: 
https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
[2]: https://issues.apache.org/jira/browse/BEAM-11146
[3]: https://github.com/apache/beam/pull/13240
[4]: https://www.duo.uio.no/


Design rational behind copying via serializing in flink runner

2020-08-28 Thread Teodor Spæren

Hey!

First time posting to a mailing list, hope I did it correctly :)

I'm writing a master thesis at the University of Oslo and right now I'm 
looking at the performance overhead of using Beam with the Flink runnner 
versus plain Flink.


I've written a simple program, a custom source outputing 0, 1, 2, 3, up 
to N, going into a single identity operator and then int a filter which 
only matches N and prints that out. This is just to compare performance.


I've been doing some profiling of simple programs and one observation is 
the performance difference in the serialization. The hotspot is [1], 
which is used multiple places, but one place is [2], which is called 
from [3]. As far as I can tell, [1] seems to be implementing copying by 
first serializing and then deserializing and there are no way for the 
actual types to change this. In flink, you have control over the copy() 
method, like in [4] and so for certain types you can just do a simple 
return as you do here.


My queston is if I've understood the flow correctly so far and if so 
what the reason for doing it this way. Is it to avoid demanding that the 
type implement some type of cloning? And would it be possible to push 
this downward in the stack and allow the encoders to do define the copy 
schemantics? I'm willing to do the work here, just want to know if it 
would work on an arcitectural level.


If there is any known overheads of using beam that you would like to 
point out, I would love to hear about it.


Best regards,
Teodor Spæren

[1]: 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140
[2]: 
https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85
[3]: 
https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85
[4]: 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53