Hi Teodor, this sounds very cool! :-) Great job!
Jan On 11/3/20 7:53 AM, Teodor Spæren wrote:
Realized the output in the previous mail might not be easy to see on all setups, so here is a condenced verison, the last column being the ratio between the events per second of the faster-copy and normal:Conf normal faster-copy ff / normal 0 49900.2 122100.1 2.447 1 50916.5 181488.2 3.564 2 80840.7 363636.4 4.498 3 67521.9 235849.1 3.493 4 12135.9 13459 1.109 5 39093 54083.3 1.383 6 11248.6 12594.5 1.120 7 1510.9 1540.7 1.020 8 68352.7 195312.5 2.8579 10 11 36603.2 41981.5 1.14712 60096.2 94250.7 1.568 13 579.5 577.2 0.996 14 13945.1 15405.9 1.105It is worth noting here that even though the non aggregation ones show the most improvement, the rest of the queries are not to shabby either :D- Teodor Spæren On Tue, Nov 03, 2020 at 07:31:47AM +0100, Teodor Spæren wrote:Hey Jan!I have since created a PR and Jira issue for this and I've now run the Nexmark suite with the change applied and not applied. This is just a quick result, but it is very promising!Here is without "fasterCopy":==========================================================================================Run started 2020-11-03T05:55:32.161Z and ran for PT492.410S Default configuration:{"debug":true,"query":null,"sourceType":"DIRECT","generateEventFilePathPrefix":null,"sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","pubsubMessageSerializationMethod":"CODER","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1}Configurations: Conf Description 0000 query:PASSTHROUGH; streamTimeout:60 0001 query:CURRENCY_CONVERSION; streamTimeout:60 0002 query:SELECTION; streamTimeout:60 0003 query:LOCAL_ITEM_SUGGESTION; streamTimeout:600004 query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:600005 query:HOT_ITEMS; streamTimeout:600006 query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000; streamTimeout:600007 query:HIGHEST_BID; streamTimeout:60 0008 query:MONITOR_NEW_USERS; streamTimeout:60 0009 query:WINNING_BIDS; numEvents:10000; streamTimeout:60 0010 query:LOG_TO_SHARDED_FILES; streamTimeout:60 0011 query:USER_SESSIONS; streamTimeout:60 0012 query:PROCESSING_TIME_WINDOWS; streamTimeout:60 0013 query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60 0014 query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60 Performance:Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results (Baseline) 0000 2.0 49900.2 100000 0001 2.0 50916.5 92000 0002 1.2 80840.7 351 0003 1.5 67521.9 580 0004 0.8 12135.9 40 0005 2.6 39093.0 12 0006 0.9 11248.6 401 0007 66.2 1510.9 1 0008 1.5 68352.7 6000 0009 *** not run ***0010 *** not run ***0011 2.7 36603.2 1919 0012 1.7 60096.2 1919 0013 172.6 579.5 92000 0014 7.2 13945.1 92000 ==========================================================================================And here we are with "fasterCopy":==========================================================================================Run started 2020-11-03T06:13:40.224Z and ran for PT483.138S Default configuration:{"debug":true,"query":null,"sourceType":"DIRECT","generateEventFilePathPrefix":null,"sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","pubsubMessageSerializationMethod":"CODER","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1}Configurations: Conf Description 0000 query:PASSTHROUGH; streamTimeout:60 0001 query:CURRENCY_CONVERSION; streamTimeout:60 0002 query:SELECTION; streamTimeout:60 0003 query:LOCAL_ITEM_SUGGESTION; streamTimeout:600004 query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:600005 query:HOT_ITEMS; streamTimeout:600006 query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000; streamTimeout:600007 query:HIGHEST_BID; streamTimeout:60 0008 query:MONITOR_NEW_USERS; streamTimeout:60 0009 query:WINNING_BIDS; numEvents:10000; streamTimeout:60 0010 query:LOG_TO_SHARDED_FILES; streamTimeout:60 0011 query:USER_SESSIONS; streamTimeout:60 0012 query:PROCESSING_TIME_WINDOWS; streamTimeout:60 0013 query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60 0014 query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60 Performance:Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results (Baseline) 0000 0.8 122100.1 100000 0001 0.6 181488.2 92000 0002 0.3 363636.4 351 0003 0.4 235849.1 580 0004 0.7 13459.0 40 0005 1.8 54083.3 12 0006 0.8 12594.5 401 0007 64.9 1540.7 1 0008 0.5 195312.5 6000 0009 *** not run ***0010 *** not run ***0011 2.4 41981.5 1919 0012 1.1 94250.7 1919 0013 173.3 577.2 92000 0014 6.5 15405.9 92000 ==========================================================================================For most of these there are marked improvements! I will continue to look at this, but these are some strong results I think!- Teodor On Tue, Oct 27, 2020 at 01:53:11PM +0100, Jan Lukavský wrote:Hi,I tend to be +1 for the flag, but before that, we might want to have a deeper analysis of the performance impact. I believe the penalty will be (in percentage) much lower in cases of more practical jobs (e.g. having at least one shuffle).@Teodor, would you be willing to provide us with some measurements of jobs doing something more practical, than simple stateless mappings? E.g. a few jobs doing 1, 2 and 3 shuffle phases to see what is the impact of these more complex scenarios on the performance penalty?Cheers, Jan On 10/27/20 1:24 PM, David Morávek wrote:you made a really good argument ;) I'm inclined to an experimental opt-in flag that would enable this. It would be great if we could automatically check for violations - kind of a safety net, for mistakes in user code.Just to note, direct runner enforcement may not cover all cases, as it only checks binary representation after serialization. Also there are programmers that don't write tests, especially during prototyping (not an argument for perf. penalty, but something to keep in mind).Max, WDYT?On Tue, Oct 27, 2020 at 12:44 PM Teodor Spæren <[email protected] <mailto:[email protected]>> wrote:Some more thoughts:As it says on the DirectRunner [1] page, the DirectRunner is meant tocheck that users don't rely on semantics that are not guaranteed by the Beam model. Programs that rely on the Flink runner deep cloning the inputs between each operator in the pipeline is relying on a semantic that is not guaranteed by the Beam model, and those pipelines would fail if ran on the DirectRunner. As I stated in the previous email, I have some example programs that return different outputs on the Flink runner and on the DirectRunner. Ihave not tested these programs on other runners, so I don't know whatthey would return. If they return different answers than the DirectRunner, I'm inclined to say that the DirectRunner should either be changed, or the runners be changed. From my very limited point of view, the Flink runner seems to be spending a lot of extra time implementing a semantic guarantee that the Beam model explicitly doesn't support. Best regards, Teodor Spæren [1]: https://beam.apache.org/documentation/runners/direct/ On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote: >Hey David, > >I think I might have worded this poorly, because what I meant is that >from what I can see in [1], the BEAM model explicitly states that >PCollections should be treated as immutable. The direct runner also >tests for this. Do the other runners also protect the user from>misusing the system so? If not we have a situation where running the>same pipeline on two different runners will yield different answers. I >can show some examples that return different examples for the Flink >and the Direct Runner. > >I agree that a breaking existing pipelines is a no-no, but I do think>that we could simply gate this behind an option on the Flink runner.>>I also tried to search for this before, but did not find any mention>of it, can you link me to some discussions about this in the past? > >Thanks for reply :D > >Best regards, >Teodor Spæren > >[1]: https://beam.apache.org/documentation/programming-guide/#immutability > > >On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote: >>Hi Teodor, >>>>Thanks for bringing this up. This is a known, long standing "issue".>>Unfortunately there are few things we need to consider: >> >>- As you correctly noted, the *Beam model doesn't enforce immutability* of >>input / output elements, so this is the price. >>- We* can not break *existing pipelines. >>- Flink Runner needs to provide the *same guarantees as the Beam model*. >> >>There are definitely some things we can do here, to make things faster: >> >>- We can try the similar approach as HadoopIO >>(HadoopInputFormatReader#isKnownImmutable), to check for known immutable >>types (KV, primitives, protobuf, other known internal immutable structures). >>-* If the type is immutable, we can safely reuse it.* This should cover >>most of the performance costs without breaking the guarantees Beam model >>provides. >>- We can enable registration of custom "immutable" types via pipeline >>options? (this may be an unnecessary knob, so this needs a further >>discussion) >> >>WDYT? >> >>D. >> >> >>On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <[email protected] <mailto:[email protected]>> >>wrote: >> >>>Hey! >>> >>>I'm a student at the University of Oslo, and I'm writing a master thesis>>>about the possibility of using Beam to benchmark stream processing>>>systems. An important factor in this is the overhead associated with >>>using Beam over writing code for the runner directly. [1] found that>>>there was a large overhead associated with using Beam, but did not >>>investigate where this overhead came from. I've done benchmarks and>>>confirmed the findings there, where for simple chains of identity >>>operators, Beam is 43x times slower than the Flink equivalent. >>> >>>These are very simple pipelines, with custom sources that just output a >>>series of integers. By profiling I've found that most of the overhead >>>comes from serializing and deserializing. Specifically the way>>>TypeSerializer's, [2], is implemented in [3], where each object is>>>serialized and then deserialized between every operator. Looking into >>>the semantics of Beam, no operator should change the input, so we don't >>>need to do a copy here. The function in [3] could potentially be changed >>>to a single `return` statement. >>>>>>Doing this removes 80% of the overhead in my tests. This is a very>>>synthetic example, but it's a low hanging fruit and might give a speed >>>boost to many pipelines when run on the Flink runnner. I would like to >>>make this my first contribution to Beam, but as the guide [4] says, I >>>thought I'd ask here first to see if there a is a reason not to do this. >>>>>>Only objection I can see, is that it might break existing pipelines>>>which rely on the Flink runner saving them from not following the >>>immutability guarantee. I see this as a small loss as they are relying >>>on an implementation detail of the Flink runner. >>> >>>I hope I have explained this adequately and eagerly away any feedback :) >>> >>>Best regards, >>>Teodor Spæren >>> >>>[1]: https://arxiv.org/abs/1907.08302 >>>[2]: >>>https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java >>>[3]: >>>https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84 >>>[4]: https://beam.apache.org/contribute/ >>>
