[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334069239
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334069133
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334069013
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334008401
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333995806
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334004815
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+   private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+   private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+   private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+   private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+   assertThat(configStrs.length % 2, is(0));
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   assertThat(configStr, is("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333996047
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334000883
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+   private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+   private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+   private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+   private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+   assertThat(configStrs.length % 2, is(0));
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   assertThat(configStr, is("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333994742
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r33374
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+   private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+   private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+   private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+   private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+   assertThat(configStrs.length % 2, is(0));
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   assertThat(configStr, is("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334001095
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+   private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+   private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+   private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+   private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+   assertThat(configStrs.length % 2, is(0));
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   assertThat(configStr, is("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333993395
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334001729
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+   private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+   private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+   private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+   private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+   assertThat(configStrs.length % 2, is(0));
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   assertThat(configStr, is("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333994184
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334000122
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+   private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+   private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+   private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+   private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+   assertThat(configStrs.length % 2, is(0));
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   assertThat(configStr, is("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333990070
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333988304
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333995702
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333995458
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334001380
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+   private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+   private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+   private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+   private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   String[] configStrs = dynamicConfigsStr.split(" ");
+   assertThat(configStrs.length % 2, is(0));
+   for (int i = 0; i < configStrs.length; ++i) {
+   String configStr = configStrs[i];
+   if (i % 2 == 0) {
+   assertThat(configStr, is("-D"));
+   } else {
+   String[] configKV = configStr.split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize 

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333993776
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333945358
 
 

 ##
 File path: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 ##
 @@ -96,7 +96,7 @@ private static Configuration getConfig() throws Exception {
 
Configuration config = new Configuration();

config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
-   config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+   config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
"4m");
 
 Review comment:
   ok, makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333945440
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.MemorySize;
+
+/**
+ * Describe the specifics of different resource dimensions of the TaskExecutor.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
 
 Review comment:
   Ok, sounds good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333945019
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -260,6 +263,189 @@
text("\"ip\" - uses host's ip address 
as binding address"))
.build());
 
+   // 

+   //  Memory Options
+   // 

+
+   /**
+* Total Process Memory size for the TaskExecutors.
+*/
+   public static final ConfigOption TOTAL_PROCESS_MEMORY =
+   key("taskmanager.memory.total-process.size")
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-11 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333945062
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -260,6 +263,189 @@
text("\"ip\" - uses host's ip address 
as binding address"))
.build());
 
+   // 

+   //  Memory Options
+   // 

+
+   /**
+* Total Process Memory size for the TaskExecutors.
+*/
+   public static final ConfigOption TOTAL_PROCESS_MEMORY =
+   key("taskmanager.memory.total-process.size")
+   .noDefaultValue()
+   .withDescription("Total Process Memory size for the 
TaskExecutors. This includes all the memory that a"
+   + " TaskExecutor consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. On"
+   + " containerized setups, this value is 
automatically configured to the size of the container.");
+
+   /**
+* Total Flink Memory size for the TaskExecutors.
+*/
+   @Documentation.CommonOption(position = 
Documentation.CommonOption.POSITION_MEMORY)
+   public static final ConfigOption TOTAL_FLINK_MEMORY =
+   key("taskmanager.memory.total-flink.size")
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333652584
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 ##
 @@ -111,6 +113,7 @@
/**
 * Maximum memory size for network buffers.
 
 Review comment:
   `@deprecated` JavaDoc is missing. When deprecating things, it is always 
helpful to say why and what should be used instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333660886
 
 

 ##
 File path: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 ##
 @@ -96,7 +96,7 @@ private static Configuration getConfig() throws Exception {
 
Configuration config = new Configuration();

config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
-   config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+   config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
"4m");
 
 Review comment:
   Should we replace these config keys with the non deprecated version?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333658742
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -260,6 +263,189 @@
text("\"ip\" - uses host's ip address 
as binding address"))
.build());
 
+   // 

+   //  Memory Options
+   // 

+
+   /**
+* Total Process Memory size for the TaskExecutors.
+*/
+   public static final ConfigOption TOTAL_PROCESS_MEMORY =
+   key("taskmanager.memory.total-process.size")
 
 Review comment:
   Maybe
   ```suggestion
key("taskmanager.memory.process.size")
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333659120
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -260,6 +263,189 @@
text("\"ip\" - uses host's ip address 
as binding address"))
.build());
 
+   // 

+   //  Memory Options
+   // 

+
+   /**
+* Total Process Memory size for the TaskExecutors.
+*/
+   public static final ConfigOption TOTAL_PROCESS_MEMORY =
+   key("taskmanager.memory.total-process.size")
+   .noDefaultValue()
+   .withDescription("Total Process Memory size for the 
TaskExecutors. This includes all the memory that a"
+   + " TaskExecutor consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. On"
+   + " containerized setups, this value is 
automatically configured to the size of the container.");
+
+   /**
+* Total Flink Memory size for the TaskExecutors.
+*/
+   @Documentation.CommonOption(position = 
Documentation.CommonOption.POSITION_MEMORY)
+   public static final ConfigOption TOTAL_FLINK_MEMORY =
+   key("taskmanager.memory.total-flink.size")
 
 Review comment:
   Maybe not, if `flink` alone could be confused with `framework`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333664352
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ * 
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
+   .add(taskExecutorResourceSpec.getShuffleMemSize(

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333658914
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -260,6 +263,189 @@
text("\"ip\" - uses host's ip address 
as binding address"))
.build());
 
+   // 

+   //  Memory Options
+   // 

+
+   /**
+* Total Process Memory size for the TaskExecutors.
+*/
+   public static final ConfigOption TOTAL_PROCESS_MEMORY =
+   key("taskmanager.memory.total-process.size")
+   .noDefaultValue()
+   .withDescription("Total Process Memory size for the 
TaskExecutors. This includes all the memory that a"
+   + " TaskExecutor consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. On"
+   + " containerized setups, this value is 
automatically configured to the size of the container.");
+
+   /**
+* Total Flink Memory size for the TaskExecutors.
+*/
+   @Documentation.CommonOption(position = 
Documentation.CommonOption.POSITION_MEMORY)
+   public static final ConfigOption TOTAL_FLINK_MEMORY =
+   key("taskmanager.memory.total-flink.size")
 
 Review comment:
   Maybe
   
   ```suggestion
key("taskmanager.memory.flink.size")
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333655871
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -260,6 +263,189 @@
text("\"ip\" - uses host's ip address 
as binding address"))
.build());
 
+   // 

+   //  Memory Options
+   // 

+
+   /**
+* Total Process Memory size for the TaskExecutors.
+*/
+   public static final ConfigOption TOTAL_PROCESS_MEMORY =
+   key("taskmanager.memory.total-process.size")
+   .noDefaultValue()
+   .withDescription("Total Process Memory size for the 
TaskExecutors. This includes all the memory that a"
+   + " TaskExecutor consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. On"
+   + " containerized setups, this value is 
automatically configured to the size of the container.");
 
 Review comment:
   Not sure whether the last sentence makes sense with the `automatically 
configured`. I guess it should say that in a containerized environment, you 
should set this value to the container memory.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-10 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r333661581
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.MemorySize;
+
+/**
+ * Describe the specifics of different resource dimensions of the TaskExecutor.
+ *
+ * A TaskExecutor's memory consists of the following components.
+ * 
+ * Framework Heap Memory
+ * Task Heap Memory
+ * Task Off-Heap Memory
+ * Shuffle Memory
+ * Managed Memory
+ * 
+ * On-Heap Managed Memory
+ * Off-Heap Managed Memory
+ * 
+ * JVM Metaspace
+ * JVM Overhead
+ * 
+ * Among all the components, Framework Heap Memory, Task Heap Memory and 
On-Heap Managed Memory use on heap memory,
+ * while the rest use off heap memory. We use Total Process Memory to refer to 
all the memory components, while Total
+ * Flink Memory refering to all the components except JVM Metaspace and JVM 
Overhead.
+ *
+ * The relationships of TaskExecutor memory components are shown below.
+ * 
+ *   ┌ ─ ─ Total Process Memory  ─ ─ ┐
+ *┌ ─ ─ Total Flink Memory  ─ ─ ┐
+ *   │ ┌───┐ │
+ *││   Framework Heap Memory   ││  ─┐
+ *   │ └───┘ │  │
+ *│┌───┐│   │
+ *   │ │ Task Heap Memory  │ │ ─┤
+ *│└───┘│   │
+ *   │ ┌───┐ │  │
+ *┌─  ││   Task Off-Heap Memory││   │
+ *│  │ └───┘ │  ├─ On-Heap
+ *│   │┌───┐│   │
+ *├─ │ │  Shuffle Memory   │ │  │
+ *│   │└───┘│   │
+ *│  │ ┌─ Managed Memory ──┐ │  │
+ *│   ││┌─┐││   │
+ *│  │ ││ On-Heap Managed Memory  ││ │ ─┘
+ *│   ││├─┤││
+ *  Off-Heap ─┼─ │ ││ Off-Heap Managed Memory ││ │
+ *│   ││└─┘││
+ *│  │ └───┘ │
+ *│   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+ *│  │┌─┐│
+ *├─  │JVM Metaspace│
+ *│  │└─┘│
+ *│   ┌─┐
+ *└─ ││JVM Overhead ││
+ *└─┘
+ *   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
 
 Review comment:
   This is duplicated information. I would keep it in one place and link to it. 
Otherwise I fear that they might go out of sync at some point in time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r330469316
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r330469447
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -37,6 +37,24 @@
 
private TaskExecutorResourceUtils() {}
 
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
 
 Review comment:
   Good point. Thanks for the clarification.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r330468448
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-10-02 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r330467036
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,323 @@
+package org.apache.flink.runtime.clusterframework;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
+   static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m");
+   static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("800m");
+   static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1g");
+
+   static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   for (String configStr : dynamicConfigsStr.split(" ")) {
+   assertThat(configStr, startsWith("-D"));
+   String[] configKV = configStr.substring(2).split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize directSize = null;
+   MemorySize metaspaceSize = null;
+   for (String paramStr : jvmParamsStr.split(" ")) {
+   if (paramStr.startsWith("-Xmx")) {
+   heapSizeMax = 
MemorySize.par

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329055349
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329067007
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,323 @@
+package org.apache.flink.runtime.clusterframework;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
+   static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m");
+   static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("800m");
+   static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1g");
+
+   static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   for (String configStr : dynamicConfigsStr.split(" ")) {
+   assertThat(configStr, startsWith("-D"));
+   String[] configKV = configStr.substring(2).split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize directSize = null;
+   MemorySize metaspaceSize = null;
+   for (String paramStr : jvmParamsStr.split(" ")) {
+   if (paramStr.startsWith("-Xmx")) {
+   heapSizeMax = 
MemorySize.par

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329056494
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
 
 Review comment:
   Let's call it then `deriveJVMMetaSpaceAndOverHeadFromTotalFlinkMemo

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329068200
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,323 @@
+package org.apache.flink.runtime.clusterframework;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
+   static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m");
+   static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("800m");
+   static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1g");
+
+   static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   for (String configStr : dynamicConfigsStr.split(" ")) {
+   assertThat(configStr, startsWith("-D"));
+   String[] configKV = configStr.substring(2).split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize directSize = null;
+   MemorySize metaspaceSize = null;
+   for (String paramStr : jvmParamsStr.split(" ")) {
+   if (paramStr.startsWith("-Xmx")) {
+   heapSizeMax = 
MemorySize.par

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329055935
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329051847
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.MemorySize;
+
+/**
+ * Describe the specifics of different resource dimensions of the TaskExecutor.
+ */
+public class TaskExecutorResourceSpec {
+
+   private MemorySize frameworkHeapSize;
+
+   private MemorySize taskHeapSize;
+
+   private MemorySize taskOffHeapSize;
+
+   private MemorySize shuffleMemSize;
+
+   private MemorySize onHeapManagedMemorySize;
+
+   private MemorySize offHeapManagedMemorySize;
+
+   private MemorySize jvmMetaspaceSize;
+
+   private MemorySize jvmOverheadSize;
 
 Review comment:
   all field should be `final`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329054365
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
 
 Review comment:
   Instead of returning a `Tuple2` here, please return a class where i

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329063421
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
 
 Review comment:
   I think the way we calculate the resource requirements is quite hard to 
understand for a user. Depending on which values the user configured, some have 
precedence over others and some values will silently be ignored. For example, 
if task heap memory and managed memory is configured then Flink does not 
respect the total flink memory size. This is confusing in my opinion.
   
   Instead I would suggest to do the following: We take all configured values 
and compute the missing values based on the input. The user's input should 
always have precedence otherwise it becomes hard to understand. Before 
returning the result, we do a sanity check that things actually add up. If not, 
then we fail and notify the user about the misconfigured setup. For example, if 
the users configures total process memory < total flink memory, then I would 
expect that we fail.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r328688810
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -260,6 +260,178 @@
text("\"ip\" - uses host's ip address 
as binding address"))
.build());
 
+   // 

+   //  Memory Options
+   // 

+
+   /**
+* Total Process Memory size for the TaskExecutors.
+*/
+   public static final ConfigOption TOTAL_PROCESS_MEMORY =
+   key("taskmanager.memory.total-process.size")
+   .noDefaultValue()
+   .withDescription("Total Process Memory size for the 
TaskExecutors. This includes all the memory that a"
+   + " TaskExecutor consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. On"
+   + " containerized setups, this value is 
automatically configured to the size of the container.");
+
+   /**
+* Total Flink Memory size for the TaskExecutors.
+*/
+   @Documentation.CommonOption(position = 
Documentation.CommonOption.POSITION_MEMORY)
+   public static final ConfigOption TOTAL_FLINK_MEMORY =
+   key("taskmanager.memory.total-flink.size")
+   .noDefaultValue()
+   .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
+   .withDescription("Total Flink Memory size for the 
TaskExecutors. This includes all the memory that a"
+   + " TaskExecutor consumes, except for JVM Metaspace and 
JVM Overhead. It consists of Framework Heap Memory,"
+   + " Task Heap Memory, Task Off-Heap Memory, Managed 
Memory, and Shuffle Memory.");
+
+   /**
+* Framework Heap Memory size for TaskExecutors.
+*/
+   public static final ConfigOption FRAMEWORK_HEAP_MEMORY =
+   key("taskmanager.memory.framework.heap.size")
+   .defaultValue("128m")
+   .withDescription("Framework Heap Memory size for 
TaskExecutors. This is the size of JVM heap memory reserved"
+   + " for TaskExecutor framework, which will not 
be allocated to task slots.");
+
+   /**
+* Task Heap Memory size for TaskExecutors.
+*/
+   public static final ConfigOption TASK_HEAP_MEMORY =
+   key("taskmanager.memory.task.heap.size")
+   .noDefaultValue()
+   .withDescription("Task Heap Memory size for 
TaskExecutors. This is the size of JVM heap memory reserved for"
+   + " user code. If not specified, it will be 
derived as Total Flink Memory minus (On-Heap and Off-Heap)"
+   + " Managed Memory and Shuffle Memory.");
 
 Review comment:
   I guess it should also say that `FRAMEWORK_HEAP_MEMORY` is subtracted from 
`Total Flink Memory`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r328682428
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -200,10 +200,10 @@
" the task manager JVM as specified by 
taskmanager.memory.fraction.");
 
/**
-* Fraction of free memory allocated by the memory manager if {@link 
#MANAGED_MEMORY_SIZE} is
+* Fraction of free memory allocated by the memory manager if {@link 
#LEGACY_MANAGED_MEMORY_SIZE} is
 * not set.
 */
-   public static final ConfigOption MANAGED_MEMORY_FRACTION =
+   public static final ConfigOption LEGACY_MANAGED_MEMORY_FRACTION =
 
 Review comment:
   What about deprecating this `ConfigOption`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329052823
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
 
 Review comment:
   Just a side comment: I think it makes sense to add tests for a newly 
introduced class in the same commit as the new class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329070304
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,323 @@
+package org.apache.flink.runtime.clusterframework;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
+   static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m");
+   static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("800m");
+   static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1g");
+
+   static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   for (String configStr : dynamicConfigsStr.split(" ")) {
+   assertThat(configStr, startsWith("-D"));
+   String[] configKV = configStr.substring(2).split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize directSize = null;
+   MemorySize metaspaceSize = null;
+   for (String paramStr : jvmParamsStr.split(" ")) {
+   if (paramStr.startsWith("-Xmx")) {
+   heapSizeMax = 
MemorySize.par

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329051310
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
 ##
 @@ -115,6 +115,27 @@ public String toString() {
return bytes + " bytes";
}
 
+   // 

+   //  Calculations
+   // 

+
+   public MemorySize add(MemorySize that) {
+   return new MemorySize(Math.addExact(this.bytes, that.bytes));
+   }
+
+   public MemorySize subtract(MemorySize that) {
+   return new MemorySize(Math.subtractExact(this.bytes, 
that.bytes));
+   }
+
+   public MemorySize multiply(double multiplier) {
+   checkArgument(multiplier >= 0, "multiplier must be >= 0");
+   double product = this.bytes * multiplier;
 
 Review comment:
   I think the error can be up to 9 bits
   
   ```
   final long value = Long.MAX_VALUE - 511;
   
final double result = value * 1.0;
final long longresult = (long) result;
   
System.out.println(value);
System.out.println(result);
System.out.println(longresult);
System.out.println(Math.abs(longresult - value)); // 511
   ```
   
   Here one can see that the difference is `511`. Hence it is not only a single 
byte in difference. Moreover, the result can be bigger than the input if 
multiplied by `1.0`.
   
   However, it should be save to multiple long values which only use the first 
52 bits due to the mantissa of a `double`. Maybe one could add a checkstate or 
we use `BigDecimal` to be on the safe side. Given that this path should not be 
taken a lot it could be a good safety net.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329066092
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -37,6 +37,24 @@
 
private TaskExecutorResourceUtils() {}
 
+   // 

+   //  Generating JVM Parameters
+   // 

+
+   public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
+   final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
+   .add(taskExecutorResourceSpec.getTaskHeapSize())
+   
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+   final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
 
 Review comment:
   What about `getOffHeapManagedMemorySize`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329060662
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329058300
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329058759
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329064182
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -348,6 +348,18 @@
+ TaskManagerOptions.MEMORY_OFF_HEAP.key() + 
"', to use either all on-heap memory or all off-heap memory"
+ " for Managed Memory.");
 
+   /**
+* Off-Heap Managed Memory size.
+* This config option should only be used for passing derived off-heap 
managed memory size into TaskExecutors. It
+* should not be exposed to users.
+*/
+   @Documentation.ExcludeFromDocumentation
+   public static final ConfigOption MANAGED_MEMORY_OFFHEAP_SIZE =
+   key("taskmanager.memory.managed.off-heap.size")
+   .noDefaultValue()
+   .withDescription("DO NOT USE THIS CONFIG OPTION! This 
is the config option for Off-Heap Managed Memory size."
 
 Review comment:
   Why shouldn't it be used? Can't we do a sanity check and fail if things 
don't add up?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329068246
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,323 @@
+package org.apache.flink.runtime.clusterframework;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+   static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
+   static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m");
+   static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("800m");
+   static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1g");
+
+   static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+   MemorySize.parse("1m"),
+   MemorySize.parse("2m"),
+   MemorySize.parse("3m"),
+   MemorySize.parse("4m"),
+   MemorySize.parse("5m"),
+   MemorySize.parse("6m"),
+   MemorySize.parse("7m"),
+   MemorySize.parse("8m"));
+
+   @Test
+   public void testGenerateDynamicConfigurations() {
+   String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+   Map configs = new HashMap<>();
+   for (String configStr : dynamicConfigsStr.split(" ")) {
+   assertThat(configStr, startsWith("-D"));
+   String[] configKV = configStr.substring(2).split("=");
+   assertThat(configKV.length, is(2));
+   configs.put(configKV[0], configKV[1]);
+   }
+
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+   
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+   }
+
+   @Test
+   public void testGenerateJvmParameters() throws Exception {
+   String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+   MemorySize heapSizeMax = null;
+   MemorySize heapSizeMin = null;
+   MemorySize directSize = null;
+   MemorySize metaspaceSize = null;
+   for (String paramStr : jvmParamsStr.split(" ")) {
+   if (paramStr.startsWith("-Xmx")) {
+   heapSizeMax = 
MemorySize.par

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329071067
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,323 @@
+package org.apache.flink.runtime.clusterframework;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
 
 Review comment:
   What about backwards compatibility tests?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329068779
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##
 @@ -0,0 +1,323 @@
+package org.apache.flink.runtime.clusterframework;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
 
 Review comment:
   There are no tests verifying that a misconfigured cluster fails with the 
appropriate exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329058986
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for TaskExecutor memory configurations.
+ */
+public class TaskExecutorResourceUtils {
+
+   private TaskExecutorResourceUtils() {}
+
+   // 

+   //  Memory Configuration Calculations
+   // 

+
+   public static TaskExecutorResourceSpec resourceSpecFromConfig(final 
Configuration config) throws ConfigurationException {
+   if (isTaskHeapMemorySizeExplicitlyConfigured(config) && 
isManagedMemorySizeExplicitlyConfigured(config)) {
+   // both task heap memory and managed memory is 
configured, use these to derive total flink memory
+   return 
deriveResourceSpecWithExplicitTaskAndManagedMemory(config);
+   } else if (isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
+   // total flink memory is configured, but not task heap 
and managed memory, derive from total flink memory
+   return deriveResourceSpecWithTotalFlinkMemory(config);
+   } else if 
(isTotalProcessMemorySizeExplicitlyConfigured(config)) {
+   return deriveResourceSpecWithTotalProcessMemory(config);
+   } else {
+   throw new ConfigurationException("Either Task Heap 
Memory size and Managed Memory size, or Total Flink"
+   + " Memory size, or Total Process Memory size 
need to be configured explicitly.");
+   }
+   }
+
+   private static TaskExecutorResourceSpec 
deriveResourceSpecWithExplicitTaskAndManagedMemory(final Configuration config) {
+   // derive total flink internal memory sizes from explicitly 
configure task heap memory size and managed memory size
+
+   final MemorySize frameworkHeapMemorySize = 
getFrameworkHeapMemorySize(config);
+   final MemorySize taskHeapMemorySize = 
getTaskHeapMemorySize(config);
+   final MemorySize taskOffHeapMemorySize = 
getTaskOffHeapMemorySize(config);
+
+   final MemorySize managedMemorySize = 
getManagedMemorySize(config);
+   final Tuple2 managedMemorySizeTuple2 = 
deriveOnHeapAndOffHeapManagedMemorySizeFromManagedMemorySize(config, 
managedMemorySize);
+   final MemorySize onHeapManagedMemorySize = 
managedMemorySizeTuple2.f0;
+   final MemorySize offHeapManagedMemorySize = 
managedMemorySizeTuple2.f1;
+
+   final MemorySize shuffleMemorySize = 
deriveShuffleMemoryWithInverseFraction(config,
+   
frameworkHeapMemorySize.add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize));
+
+   // derive total flink external memory sizes from derived total 
flink memory size
+
+   final MemorySize totalFlinkMemorySize = frameworkHeapMemorySize
+   .add(taskHeapMemorySize)
+   .add(taskOffHeapMemorySize)
+   .add(shuffleMemorySize)
+   .add(managedMemorySize);
+
+   final Tuple2 
totalFlinkExternalMemorySizeTuple2 = 
deriveTotalFlinkExternalMemorySizes(config, totalFlinkMemorySize);
+
+   return new TaskExecutorResourceSpec(
+   frameworkH

[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r328682276
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -189,9 +189,9 @@
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager. If not
-* set, a relative fraction will be allocated, as defined by {@link 
#MANAGED_MEMORY_FRACTION}.
+* set, a relative fraction will be allocated, as defined by {@link 
#LEGACY_MANAGED_MEMORY_FRACTION}.
 */
-   public static final ConfigOption MANAGED_MEMORY_SIZE =
+   public static final ConfigOption LEGACY_MANAGED_MEMORY_SIZE =
 
 Review comment:
   What about deprecating this `ConfigOption`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics

2019-09-27 Thread GitBox
tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r329051310
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
 ##
 @@ -115,6 +115,27 @@ public String toString() {
return bytes + " bytes";
}
 
+   // 

+   //  Calculations
+   // 

+
+   public MemorySize add(MemorySize that) {
+   return new MemorySize(Math.addExact(this.bytes, that.bytes));
+   }
+
+   public MemorySize subtract(MemorySize that) {
+   return new MemorySize(Math.subtractExact(this.bytes, 
that.bytes));
+   }
+
+   public MemorySize multiply(double multiplier) {
+   checkArgument(multiplier >= 0, "multiplier must be >= 0");
+   double product = this.bytes * multiplier;
 
 Review comment:
   I think the error can be up to 9 bits
   
   ```
   final long value = Long.MAX_VALUE - 511;
   
   final double result = value * 1.0;
   final long longresult = (long) result;
   
   System.out.println(value);
   System.out.println(result);
   System.out.println(longresult);
   System.out.println(Math.abs(longresult - value)); // 511
   ```
   
   Here one can see that the difference is `511`. Hence it is not only a single 
byte in difference. Moreover, the result can be bigger than the input if 
multiplied by `1.0`.
   
   However, it should be save to multiple long values which only use the first 
52 bits due to the mantissa of a `double`. Maybe one could add a checkstate or 
we use `BigDecimal` to be on the safe side. Given that this path should not be 
taken a lot it could be a good safety net.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services