This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new b29d1055f92 [hotfix][python] Use off-heap memory if managed memory 
fraction is 0
b29d1055f92 is described below

commit b29d1055f9225d189603b5aeeacf3199e4c98636
Author: Dian Fu <dia...@apache.org>
AuthorDate: Tue Dec 13 14:14:33 2022 +0800

    [hotfix][python] Use off-heap memory if managed memory fraction is 0
---
 .../python/beam/BeamPythonFunctionRunner.java       | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 474b633b24d..510d77723a1 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -248,14 +248,10 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
 
         Struct pipelineOptions = 
PipelineOptionsTranslation.toProto(portableOptions);
 
-        if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) {
-            Preconditions.checkArgument(
-                    managedMemoryFraction > 0 && managedMemoryFraction <= 1.0,
-                    String.format(
-                            "The configured managed memory fraction for Python 
worker process must be within (0, 1], was: %s. "
-                                    + "It may be because the consumer type 
\"Python\" was missing or set to 0 for the config option 
\"taskmanager.memory.managed.consumer-weights\".",
-                            managedMemoryFraction));
-
+        if (memoryManager != null
+                && config.get(USE_MANAGED_MEMORY)
+                && managedMemoryFraction > 0
+                && managedMemoryFraction <= 1.0) {
             final LongFunctionWithException<PythonSharedResources, Exception> 
initializer =
                     (size) ->
                             new PythonSharedResources(
@@ -274,6 +270,15 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
                     sharedResources.getResourceHandle().getEnvironment();
             stageBundleFactory = createStageBundleFactory(jobBundleFactory, 
environment);
         } else {
+            if (memoryManager != null
+                    && config.get(USE_MANAGED_MEMORY)
+                    && (managedMemoryFraction <= 0 || managedMemoryFraction > 
1.0)) {
+                LOG.warn(
+                        String.format(
+                                "The configured managed memory fraction for 
Python worker process must be within (0, 1], was: %s, use off-heap memory 
instead."
+                                        + "Please see config option 
\"taskmanager.memory.managed.consumer-weights\" for more details.",
+                                managedMemoryFraction));
+            }
             // there is no way to access the MemoryManager for the batch job 
of old planner,
             // fallback to the way that spawning a Python process for each 
Python operator
             jobBundleFactory = createJobBundleFactory(pipelineOptions);

Reply via email to