dianfu commented on a change in pull request #12370:
URL: https://github.com/apache/flink/pull/12370#discussion_r433644752



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala
##########
@@ -150,6 +152,64 @@ trait CommonPythonBase {
     }
     realEnv
   }
+
+  private def isPythonWorkerUsingManagedMemory(config: Configuration): Boolean 
= {
+    val clazz = loadClass("org.apache.flink.python.PythonOptions")
+    config.getBoolean(clazz.getField("USE_MANAGED_MEMORY").get(null)
+      .asInstanceOf[ConfigOption[java.lang.Boolean]])
+  }
+
+  private def getPythonWorkerMemory(config: Configuration): MemorySize = {
+    val clazz = loadClass("org.apache.flink.python.PythonOptions")
+    val pythonFrameworkMemorySize = MemorySize.parse(
+      config.getString(
+        clazz.getField("PYTHON_FRAMEWORK_MEMORY_SIZE").get(null)
+          .asInstanceOf[ConfigOption[String]]))
+    val pythonBufferMemorySize = MemorySize.parse(
+      config.getString(
+        clazz.getField("PYTHON_DATA_BUFFER_MEMORY_SIZE").get(null)
+          .asInstanceOf[ConfigOption[String]]))
+    pythonFrameworkMemorySize.add(pythonBufferMemorySize)
+  }
+
+  private def checkPythonWorkerMemory(
+      config: Configuration, env: StreamExecutionEnvironment = null): Unit = {
+    if (!isPythonWorkerUsingManagedMemory(config)) {
+      val taskOffHeapMemory = 
config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY)
+      val requiredPythonWorkerOffHeapMemory = getPythonWorkerMemory(config)
+      if (taskOffHeapMemory.compareTo(requiredPythonWorkerOffHeapMemory) < 0) {
+        throw new TableException(String.format("The configured Task Off-Heap 
Memory %s is less " +
+          "than the least required Python worker Memory %s. The Task Off-Heap 
Memory can be " +
+          "configured using the configuration key 
'taskmanager.memory.task.off-heap.size'.",
+          taskOffHeapMemory, requiredPythonWorkerOffHeapMemory))
+      }
+    } else if (env != null && isRocksDbUsingManagedMemory(env)) {
+      throw new TableException("Currently it doesn't support to use Managed 
Memory for both " +
+        "RocksDB state backend and Python worker at the same time. You can 
either configure " +
+        "RocksDB state backend to use Task Off-Heap Memory via the 
configuration key " +
+        "'state.backend.rocksdb.memory.managed' or configure Python worker to 
use " +
+        "Task Off-Heap Memory via the configuration key " +
+        "'python.fn-execution.memory.managed'.")
+    }
+  }
+
+  private def isRocksDbUsingManagedMemory(env: StreamExecutionEnvironment): 
Boolean = {

Review comment:
       Do you mean the duplicate methods between the old planner and the blink 
planner? If so, I'm afraid it's currently not possible as there is no way to 
share code between them(this is also by design as we are not planning to 
maintain two planners in the long term). Although it allows to share the code 
technically by moving the shared code to flink-table-common, I think it's not a 
good idea to do that for this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to