This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 023568b [FLINK-22172][python] Fix the bug of shared resource among Python Operators of the same slot is not released 023568b is described below commit 023568b83d971b439468d680bddc66d584c6f7c4 Author: huangxingbo <hxbks...@gmail.com> AuthorDate: Fri Apr 9 14:33:59 2021 +0800 [FLINK-22172][python] Fix the bug of shared resource among Python Operators of the same slot is not released This closes #15537. --- .../api/runners/python/beam/BeamPythonFunctionRunner.java | 5 +---- .../api/runners/python/beam/PythonSharedResources.java | 14 -------------- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java index 4e2e1d2..1e95b67 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java @@ -294,10 +294,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner { try { if (sharedResources != null) { - if (sharedResources.getResourceHandle().release()) { - // release sharedResources iff there are no more Python operators sharing it - sharedResources.close(); - } + sharedResources.close(); } else { // if sharedResources is not null, the close of environmentManager will be managed // in sharedResources, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java index 3f5ec9e..c81e723 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java @@ -44,14 +44,10 @@ public final class PythonSharedResources implements AutoCloseable { /** Keep track of the PythonEnvironmentManagers of the Python operators in one slot. */ private final List<PythonEnvironmentManager> environmentManagers; - /** Keep track of the number of Python operators sharing this Python resource. */ - private int refCnt; - PythonSharedResources(JobBundleFactory jobBundleFactory, Environment environment) { this.jobBundleFactory = jobBundleFactory; this.environment = environment; this.environmentManagers = new ArrayList<>(); - this.refCnt = 0; } JobBundleFactory getJobBundleFactory() { @@ -64,16 +60,6 @@ public final class PythonSharedResources implements AutoCloseable { void addPythonEnvironmentManager(PythonEnvironmentManager environmentManager) { environmentManagers.add(environmentManager); - refCnt++; - } - - /** - * Release a Python operator which shares this Python resource. Returns true if there are no - * more Python operators sharing this Python resource. - */ - boolean release() { - refCnt--; - return refCnt == 0; } @Override