[ 
https://issues.apache.org/jira/browse/FLINK-23593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393287#comment-17393287
 ] 

Stephan Ewen commented on FLINK-23593:
--------------------------------------

Here is a summary from discussing this offline with [~twalthr].

**Meaningful Change**

The general change of behavior is meaningful. Not having tasks share their 
slots during batch execution means we don't fragment the memory budget as much 
between different tasks that most likely don't run concurrently anyways.

It should give more reliable performance at scale and more predictable behavior 
by default.

**Regression acceptable**

We are altering behavior here that has a performance impact, so some amount of 
change in the benchmarks is expected.

In particular, slot sharing is beneficial for small scale:
* small data means one slot's memory is enough to accommodate all tasks
* fewer slots allocated means a bit less overhead during slot allocation, less 
bookkeeping.

Not slot sharing is beneficial for larger scale:
* more memory per operator
* means often fewer concurrent tasks so more network buffers per task

**Trying to explain the Regression**

The executed data flow is pretty much the same in all cases. The tasks and the 
network stack (local channels, batch shuffles) don't actually care whether they 
are in one slot or another.

My working assumption is that the difference is caused by a few factors in the 
startup overhead. More slots are required to be allocated, more TM / JM 
coordination at startup.

Another option could be that if the keyed operator (with the sorting) gets its 
own dedicated slot (when not slot sharing), it gets more memory. The sorter 
reserves its full share of memory from the MemoryManager, which in turn 
allocates it at startup (and initializes it to zero). While more memory is 
generally good, it also has a slightly longer initialization phase.
[~zhuzh] could that be an explanation?

I think Timo's benchmarks are quite good, comparing slot-sharing vs. 
not-slot-sharing within the same code snapshot, also relative to the different 
batch shuffle settings. That's really what we want to understand here.
The difference between the slot sharing and not sharing depending on the 
shuffle modes is pretty small here.

{code}
Benchmark                                                                   
Mode  Cnt     Score    Error   Units
SortingBoundedInputBenchmarks.sortedTwoInputNoSlotSharingBlocking          
thrpt   30  1642.628 ± 21.183  ops/ms
SortingBoundedInputBenchmarks.sortedTwoInputSlotSharingBlocking            
thrpt   30  1681.684 ± 17.065  ops/ms

SortingBoundedInputBenchmarks.sortedTwoInputNoSlotSharingPipelined         
thrpt   30  1761.725 ± 18.225  ops/ms
SortingBoundedInputBenchmarks.sortedTwoInputSlotSharingPipelined           
thrpt   30  1731.022 ± 32.813  
{code}

_(Note, I removed the cases with "ForwardPipelined" because it is the same as 
"Blocking" in that benchmark. There are no forward exchanges, the sink is 
chained, the sources connect via keyBy())_

> Performance regression on 15.07.2021
> ------------------------------------
>
>                 Key: FLINK-23593
>                 URL: https://issues.apache.org/jira/browse/FLINK-23593
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Benchmarks
>    Affects Versions: 1.14.0
>            Reporter: Piotr Nowojski
>            Assignee: Timo Walther
>            Priority: Blocker
>             Fix For: 1.14.0
>
>
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedMultiInput&env=2
> http://codespeed.dak8s.net:8000/timeline/?ben=sortedTwoInput&env=2
> {noformat}
> pnowojski@piotr-mbp: [~/flink -  ((no branch, bisect started on pr/16589))] $ 
> git ls f4afbf3e7de..eb8100f7afe
> eb8100f7afe [4 weeks ago] (pn/bad, bad, refs/bisect/bad) 
> [FLINK-22017][coordination] Allow BLOCKING result partition to be 
> individually consumable [Thesharing]
> d2005268b1e [4 weeks ago] (HEAD, pn/bisect-4, bisect-4) 
> [FLINK-22017][coordination] Get the ConsumedPartitionGroup that 
> IntermediateResultPartition and DefaultResultPartition belong to [Thesharing]
> d8b1a6fd368 [3 weeks ago] [FLINK-23372][streaming-java] Disable 
> AllVerticesInSameSlotSharingGroupByDefault in batch mode [Timo Walther]
> 4a78097d038 [3 weeks ago] (pn/bisect-3, bisect-3, 
> refs/bisect/good-4a78097d0385749daceafd8326930c8cc5f26f1a) 
> [FLINK-21928][clients][runtime] Introduce static method constructors of 
> DuplicateJobSubmissionException for better readability. [David Moravek]
> 172b9e32215 [3 weeks ago] [FLINK-21928][clients] JobManager failover should 
> succeed, when trying to resubmit already terminated job in application mode. 
> [David Moravek]
> f483008db86 [3 weeks ago] [FLINK-21928][core] Introduce 
> org.apache.flink.util.concurrent.FutureUtils#handleException method, that 
> allows future to recover from the specied exception. [David Moravek]
> d7ac08c2ac0 [3 weeks ago] (pn/bisect-2, bisect-2, 
> refs/bisect/good-d7ac08c2ac06b9ff31707f3b8f43c07817814d4f) 
> [FLINK-22843][docs-zh] Document and code are inconsistent [ZhiJie Yang]
> 16c3ea427df [3 weeks ago] [hotfix] Split the final checkpoint related tests 
> to a separate test class. [Yun Gao]
> 31b3d37a22c [7 weeks ago] [FLINK-21089][runtime] Skip the execution of new 
> sources if finished on restore [Yun Gao]
> 20fe062e1b5 [3 weeks ago] [FLINK-21089][runtime] Skip execution for the 
> legacy source task if finished on restore [Yun Gao]
> 874c627114b [3 weeks ago] [FLINK-21089][runtime] Skip the lifecycle method of 
> operators if finished on restore [Yun Gao]
> ceaf24b1d88 [3 weeks ago] (pn/bisect-1, bisect-1, 
> refs/bisect/good-ceaf24b1d881c2345a43f305d40435519a09cec9) [hotfix] Fix 
> isClosed() for operator wrapper and proxy operator close to the operator 
> chain [Yun Gao]
> 41ea591a6db [3 weeks ago] [FLINK-22627][runtime] Remove unused slot request 
> protocol [Yangze Guo]
> 489346b60f8 [3 months ago] [FLINK-22627][runtime] Remove PendingSlotRequest 
> [Yangze Guo]
> 8ffb4d2af36 [3 months ago] [FLINK-22627][runtime] Remove TaskManagerSlot 
> [Yangze Guo]
> 72073741588 [3 months ago] [FLINK-22627][runtime] Remove SlotManagerImpl and 
> its related tests [Yangze Guo]
> bdb3b7541b3 [3 months ago] [hotfix][yarn] Remove unused internal options in 
> YarnConfigOptionsInternal [Yangze Guo]
> a6a9b192eac [3 weeks ago] [FLINK-23201][streaming] Reset alignment only for 
> the currently processed checkpoint [Anton Kalashnikov]
> b35701a35c7 [3 weeks ago] [FLINK-23201][streaming] Calculate checkpoint 
> alignment time only for last started checkpoint [Anton Kalashnikov]
> 3abec22c536 [3 weeks ago] [FLINK-23107][table-runtime] Separate 
> implementation of deduplicate rank from other rank functions [Shuo Cheng]
> 1a195f5cc59 [3 weeks ago] [FLINK-16093][docs-zh] Translate "System Functions" 
> page of "Functions" into Chinese (#16348) [ZhiJie Yang]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to