This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 023568b  [FLINK-22172][python] Fix the bug of shared resource among 
Python Operators of the same slot is not released
023568b is described below

commit 023568b83d971b439468d680bddc66d584c6f7c4
Author: huangxingbo <hxbks...@gmail.com>
AuthorDate: Fri Apr 9 14:33:59 2021 +0800

    [FLINK-22172][python] Fix the bug of shared resource among Python Operators 
of the same slot is not released
    
    This closes #15537.
---
 .../api/runners/python/beam/BeamPythonFunctionRunner.java  |  5 +----
 .../api/runners/python/beam/PythonSharedResources.java     | 14 --------------
 2 files changed, 1 insertion(+), 18 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 4e2e1d2..1e95b67 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -294,10 +294,7 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
 
         try {
             if (sharedResources != null) {
-                if (sharedResources.getResourceHandle().release()) {
-                    // release sharedResources iff there are no more Python 
operators sharing it
-                    sharedResources.close();
-                }
+                sharedResources.close();
             } else {
                 // if sharedResources is not null, the close of 
environmentManager will be managed
                 // in sharedResources,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java
index 3f5ec9e..c81e723 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/PythonSharedResources.java
@@ -44,14 +44,10 @@ public final class PythonSharedResources implements 
AutoCloseable {
     /** Keep track of the PythonEnvironmentManagers of the Python operators in 
one slot. */
     private final List<PythonEnvironmentManager> environmentManagers;
 
-    /** Keep track of the number of Python operators sharing this Python 
resource. */
-    private int refCnt;
-
     PythonSharedResources(JobBundleFactory jobBundleFactory, Environment 
environment) {
         this.jobBundleFactory = jobBundleFactory;
         this.environment = environment;
         this.environmentManagers = new ArrayList<>();
-        this.refCnt = 0;
     }
 
     JobBundleFactory getJobBundleFactory() {
@@ -64,16 +60,6 @@ public final class PythonSharedResources implements 
AutoCloseable {
 
     void addPythonEnvironmentManager(PythonEnvironmentManager 
environmentManager) {
         environmentManagers.add(environmentManager);
-        refCnt++;
-    }
-
-    /**
-     * Release a Python operator which shares this Python resource. Returns 
true if there are no
-     * more Python operators sharing this Python resource.
-     */
-    boolean release() {
-        refCnt--;
-        return refCnt == 0;
     }
 
     @Override

Reply via email to