This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new b29d1055f92 [hotfix][python] Use off-heap memory if managed memory fraction is 0 b29d1055f92 is described below commit b29d1055f9225d189603b5aeeacf3199e4c98636 Author: Dian Fu <dia...@apache.org> AuthorDate: Tue Dec 13 14:14:33 2022 +0800 [hotfix][python] Use off-heap memory if managed memory fraction is 0 --- .../python/beam/BeamPythonFunctionRunner.java | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 474b633b24d..510d77723a1 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -248,14 +248,10 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions); - if (memoryManager != null && config.get(USE_MANAGED_MEMORY)) { - Preconditions.checkArgument( - managedMemoryFraction > 0 && managedMemoryFraction <= 1.0, - String.format( - "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. " - + "It may be because the consumer type \"Python\" was missing or set to 0 for the config option \"taskmanager.memory.managed.consumer-weights\".", - managedMemoryFraction)); - + if (memoryManager != null + && config.get(USE_MANAGED_MEMORY) + && managedMemoryFraction > 0 + && managedMemoryFraction <= 1.0) { final LongFunctionWithException<PythonSharedResources, Exception> initializer = (size) -> new PythonSharedResources( @@ -274,6 +270,15 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { sharedResources.getResourceHandle().getEnvironment(); stageBundleFactory = createStageBundleFactory(jobBundleFactory, environment); } else { + if (memoryManager != null + && config.get(USE_MANAGED_MEMORY) + && (managedMemoryFraction <= 0 || managedMemoryFraction > 1.0)) { + LOG.warn( + String.format( + "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s, use off-heap memory instead." + + "Please see config option \"taskmanager.memory.managed.consumer-weights\" for more details.", + managedMemoryFraction)); + } // there is no way to access the MemoryManager for the batch job of old planner, // fallback to the way that spawning a Python process for each Python operator jobBundleFactory = createJobBundleFactory(pipelineOptions);