zhuzhurk commented on a change in pull request #12370:
URL: https://github.com/apache/flink/pull/12370#discussion_r433648255



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala
##########
@@ -150,6 +152,64 @@ trait CommonPythonBase {
     }
     realEnv
   }
+
+  private def isPythonWorkerUsingManagedMemory(config: Configuration): Boolean 
= {
+    val clazz = loadClass("org.apache.flink.python.PythonOptions")
+    config.getBoolean(clazz.getField("USE_MANAGED_MEMORY").get(null)
+      .asInstanceOf[ConfigOption[java.lang.Boolean]])
+  }
+
+  private def getPythonWorkerMemory(config: Configuration): MemorySize = {
+    val clazz = loadClass("org.apache.flink.python.PythonOptions")
+    val pythonFrameworkMemorySize = MemorySize.parse(
+      config.getString(
+        clazz.getField("PYTHON_FRAMEWORK_MEMORY_SIZE").get(null)
+          .asInstanceOf[ConfigOption[String]]))
+    val pythonBufferMemorySize = MemorySize.parse(
+      config.getString(
+        clazz.getField("PYTHON_DATA_BUFFER_MEMORY_SIZE").get(null)
+          .asInstanceOf[ConfigOption[String]]))
+    pythonFrameworkMemorySize.add(pythonBufferMemorySize)
+  }
+
+  private def checkPythonWorkerMemory(
+      config: Configuration, env: StreamExecutionEnvironment = null): Unit = {
+    if (!isPythonWorkerUsingManagedMemory(config)) {
+      val taskOffHeapMemory = 
config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY)
+      val requiredPythonWorkerOffHeapMemory = getPythonWorkerMemory(config)
+      if (taskOffHeapMemory.compareTo(requiredPythonWorkerOffHeapMemory) < 0) {
+        throw new TableException(String.format("The configured Task Off-Heap 
Memory %s is less " +
+          "than the least required Python worker Memory %s. The Task Off-Heap 
Memory can be " +
+          "configured using the configuration key 
'taskmanager.memory.task.off-heap.size'.",
+          taskOffHeapMemory, requiredPythonWorkerOffHeapMemory))
+      }
+    } else if (env != null && isRocksDbUsingManagedMemory(env)) {
+      throw new TableException("Currently it doesn't support to use Managed 
Memory for both " +
+        "RocksDB state backend and Python worker at the same time. You can 
either configure " +
+        "RocksDB state backend to use Task Off-Heap Memory via the 
configuration key " +
+        "'state.backend.rocksdb.memory.managed' or configure Python worker to 
use " +
+        "Task Off-Heap Memory via the configuration key " +
+        "'python.fn-execution.memory.managed'.")
+    }
+  }
+
+  private def isRocksDbUsingManagedMemory(env: StreamExecutionEnvironment): 
Boolean = {

Review comment:
       I see.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to