[ 
https://issues.apache.org/jira/browse/FLINK-14123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16942428#comment-16942428
 ] 

Xintong Song edited comment on FLINK-14123 at 10/2/19 1:59 AM:
---------------------------------------------------------------

[~StephanEwen], thanks for pulling me in.

Actually, there is a discussion regarding whether we should be backwards 
compatible on 'taskmanager.memory.fraction', in this 
[PR|[https://github.com/apache/flink/pull/9760]]. The problem is that, the 
definition of managed memory fraction becomes different.
 * In current codebase, 'taskmanager.memory.fraction' = managed memory / (total 
flink memory - network memory).
 * According to FLIP-49, the new config option 
'taskmanager.memory.managed.fraction' = managed memory / total flink memory.

If we support backwards compatibility on key 'taskmanager.memory.fraction', 
then it could be confusing for the user that the same configured fraction 
actually leads to different managed memory size. Maybe not supporting the 
legacy fraction could be a good way to draw attention of users on the 
definition change.

If we decide not supporting the legacy fraction, then there would be no 
necessary on changing the default value. If we decide to be backwards 
compatible on this, then it makes more sense to decrease the default value 
because of the definition changes. Actually, the default value of the new 
fraction according to FLIP-49 is 0.5.


was (Author: xintongsong):
[~StephanEwen],

I'm not sure about this.

 

Actually, there is a discussion regarding whether we should be backwards 
compatible on 'taskmanager.memory.fraction', in this 
[PR|[https://github.com/apache/flink/pull/9760]]. The problem is that, the 
definition of managed memory fraction becomes different.
 * In current codebase, 'taskmanager.memory.fraction' = managed memory / (total 
flink memory - network memory).
 * According to FLIP-49, the new config option 
'taskmanager.memory.managed.fraction' = managed memory / total flink memory.

If we support backwards compatibility on key 'taskmanager.memory.fraction', 
then it could be confusing for the user that the same configured fraction 
actually leads to different managed memory size. Maybe not supporting the 
legacy fraction could be a good way to draw attention of users on the 
definition change.

 

If we decide not supporting the legacy fraction, then there would be no 
necessary on changing the default value. If we decide to be backwards 
compatible on this, then it makes more sense to decrease the default value 
because of the definition changes. Actually, the default value of the new 
fraction according to FLIP-49 is 0.5.

> Change taskmanager.memory.fraction default value to 0.6
> -------------------------------------------------------
>
>                 Key: FLINK-14123
>                 URL: https://issues.apache.org/jira/browse/FLINK-14123
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Configuration
>    Affects Versions: 1.9.0
>            Reporter: liupengcheng
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, we are testing flink batch task, such as terasort, however, it 
> started only awhile then it failed due to OOM. 
>  
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: a807e1d635bd4471ceea4282477f8850)
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>       at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89)
>       at 
> com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>       at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1007)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1080)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1080)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>       at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>       ... 23 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: GC 
> overhead limit exceeded
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
>       at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
>       at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
> due to an exception: GC overhead limit exceeded
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>       at 
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:84)
>       at 
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:33)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:121)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:114)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
>       at 
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
> {code}
> After carefully debugging, I find that it's because the default JVM Option 
> value `NewRatio` is 2, that means the old generation ocuppies 2/3 (0.66) of 
> the heap memory, but we allocate 0.7 (`taskmanager.memory.fraction` default 
> value) of the heap memory in `UnilateralSortMerger`, that's a little bit 
> aggressive.
> DEBUG log:
> {code:java}
> 2019-09-18 13:39:53,869 DEBUG 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Instantiating 
> sorter with 8060 pages of sorting memory (=264110080 bytes total) divided 
> over 2 sort buffers (4030 pages per buffer). Using 4 buffers for writing 
> sorted results and merging maximally 128 streams at once. Using 0 memory 
> segments for large record spilling
> {code}
> gc.log:
> {code:java}
> 2019-09-18T13:41:37.503+0800: 119.744: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39766K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285450K(327680K), [Metaspace: 56311K->56311K(1099776K)], 0.0963178 
> secs] [Times: user=0.22 sys=0.00, real=0.10 secs] 
> 2019-09-18T13:41:37.599+0800: 119.841: Total time for which application 
> threads were stopped: 0.0967768 seconds, Stopping threads took: 0.0001084 
> seconds
> 2019-09-18T13:41:37.602+0800: 119.843: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39770K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285453K(327680K), [Metaspace: 56311K->56311K(1099776K)], 0.2078965 
> secs] [Times: user=0.56 sys=0.06, real=0.21 secs] 
> 2019-09-18T13:41:37.810+0800: 120.051: Total time for which application 
> threads were stopped: 0.2084774 seconds, Stopping threads took: 0.0001217 
> seconds
> 2019-09-18T13:41:37.811+0800: 120.053: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39766K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285450K(327680K), [Metaspace: 56312K->56312K(1099776K)], 0.0893081 
> secs] [Times: user=0.19 sys=0.00, real=0.09 secs] 
> 2019-09-18T13:41:37.901+0800: 120.143: Total time for which application 
> threads were stopped: 0.0898797 seconds, Stopping threads took: 0.0001453 
> seconds
> 2019-09-18T13:41:37.903+0800: 120.145: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39767K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285451K(327680K), [Metaspace: 56313K->56313K(1099776K)], 0.0862173 
> secs] [Times: user=0.19 sys=0.01, real=0.09 secs] 
> 2019-09-18T13:41:37.989+0800: 120.231: Total time for which application 
> threads were stopped: 0.0870664 seconds, Stopping threads took: 0.0005005 
> seconds
> 2019-09-18T13:41:37.991+0800: 120.232: Total time for which application 
> threads were stopped: 0.0003551 seconds, Stopping threads took: 0.0001456 
> seconds
> 2019-09-18T13:41:37.991+0800: 120.233: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39767K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285451K(327680K), [Metaspace: 56313K->56313K(1099776K)], 0.0860513 
> secs] [Times: user=0.19 sys=0.00, real=0.08 secs] 
> 2019-09-18T13:41:38.077+0800: 120.319: Total time for which application 
> threads were stopped: 0.0864708 seconds, Stopping threads took: 0.0001602 
> seconds
> 2019-09-18T13:41:38.079+0800: 120.320: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39772K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285455K(327680K), [Metaspace: 56313K->56313K(1099776K)], 0.0926346 
> secs] [Times: user=0.20 sys=0.00, real=0.09 secs] 
> 2019-09-18T13:41:38.171+0800: 120.413: Total time for which application 
> threads were stopped: 0.0932214 seconds, Stopping threads took: 0.0001315 
> seconds
> 2019-09-18T13:41:38.172+0800: 120.414: Total time for which application 
> threads were stopped: 0.0003511 seconds, Stopping threads took: 0.0001082 
> seconds
> 2019-09-18T13:41:38.177+0800: 120.418: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39661K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285344K(327680K), [Metaspace: 56313K->56313K(1099776K)], 0.0964601 
> secs] [Times: user=0.21 sys=0.00, real=0.10 secs] 
> 2019-09-18T13:41:38.273+0800: 120.515: Total time for which application 
> threads were stopped: 0.0970543 seconds, Stopping threads took: 0.0001037 
> seconds
> 2019-09-18T13:41:38.276+0800: 120.517: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39662K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285345K(327680K), [Metaspace: 56313K->56313K(1099776K)], 0.1040062 
> secs] [Times: user=0.22 sys=0.00, real=0.11 secs] 
> 2019-09-18T13:41:38.380+0800: 120.621: Total time for which application 
> threads were stopped: 0.1046574 seconds, Stopping threads took: 0.0000903 
> seconds
> 2019-09-18T13:41:38.382+0800: 120.624: [Full GC (Ergonomics) [PSYoungGen: 
> 40960K->39663K(81920K)] [ParOldGen: 245683K->245683K(245760K)] 
> 286643K->285346K(327680K), [Metaspace: 56313K->56313K(1099776K)], 0.0943451 
> secs] [Times: user=0.19 sys=0.01, real=0.09 secs] 
> 2019-09-18T13:41:38.477+0800: 120.718: Total time for which application 
> threads were stopped: 0.0949320 seconds, Stopping threads took: 0.0000874 
> seconds
> {code}
> {code:java}
>  
> CommandLine flags: -XX:GCLogFileSize=10485760 -XX:+HeapDumpOnOutOfMemoryError 
> -XX:HeapDumpPath=./flink-tm-heapdump.hprof -XX:InitialHeapSize=377487360 
> -XX:MaxDirectMemorySize=696254464 -XX:MaxHeapSize=377487360
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to