This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c354f7b  [FLINK-20333][python] Fix the issue that metaspace OOM will 
be thrown after submitting PyFlink UDF jobs multiple times to the standalone 
cluster
c354f7b is described below

commit c354f7bd679b9fa8c1e0d75feb3827ccca7f317b
Author: Wei Zhong <[email protected]>
AuthorDate: Wed Nov 25 11:59:02 2020 +0800

    [FLINK-20333][python] Fix the issue that metaspace OOM will be thrown after 
submitting PyFlink UDF jobs multiple times to the standalone cluster
    
    This closes #14205.
---
 .../python/AbstractPythonFunctionOperator.java     |  8 +++
 .../streaming/api/utils/ClassLeakCleaner.java      | 80 ++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 81ffb76..3d015f5 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -38,6 +38,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.ScheduledFuture;
 
+import static 
org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
+
 /**
  * Base class for all stream operators to execute Python functions.
  */
@@ -141,6 +143,12 @@ public abstract class AbstractPythonFunctionOperator<OUT>
                        invokeFinishBundle();
                } finally {
                        super.close();
+
+                       try {
+                               
cleanUpLeakingClasses(this.getClass().getClassLoader());
+                       } catch (Throwable t) {
+                               LOG.warn("Failed to clean up the leaking 
objects.", t);
+                       }
                }
        }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ClassLeakCleaner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ClassLeakCleaner.java
new file mode 100644
index 0000000..63f7bbc
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ClassLeakCleaner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.utils;
+
+import java.lang.ref.Reference;
+import java.lang.reflect.Field;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Utilities to clean up the leaking classes.
+ */
+public class ClassLeakCleaner {
+
+       /**
+        * For each classloader we only need to execute the cleanup method once.
+        */
+       private static boolean leakedClassesCleanedUp = false;
+
+       /**
+        * Clean up the soft references of the classes under the specified 
class loader.
+        */
+       public static synchronized void cleanUpLeakingClasses(ClassLoader 
classLoader)
+               throws ReflectiveOperationException, SecurityException, 
ClassCastException {
+               if (!leakedClassesCleanedUp) {
+                       // clear the soft references
+                       // see https://bugs.openjdk.java.net/browse/JDK-8199589 
for more details
+                       Class<?> clazz = 
Class.forName("java.io.ObjectStreamClass$Caches");
+                       clearCache(clazz, "localDescs", classLoader);
+                       clearCache(clazz, "reflectors", classLoader);
+
+                       // It uses finalizers heavily in Netty which still 
holds the references to the user class loader even after job finished.
+                       // so, trigger garbage collection explicitly to:
+                       // 1) trigger the execution of the `Finalizer`s of 
objects created by the finished jobs of this TaskManager
+                       // 2) the references to the class loader will then be 
released and so the user class loader could be garbage collected finally
+                       System.gc();
+                       leakedClassesCleanedUp = true;
+               }
+       }
+
+       private static void clearCache(Class<?> target, String mapName, 
ClassLoader classLoader)
+               throws ReflectiveOperationException, SecurityException, 
ClassCastException {
+               Field f = target.getDeclaredField(mapName);
+               f.setAccessible(true);
+               Map<?, ?> map = (Map<?, ?>) f.get(null);
+               Iterator<?> keys = map.keySet().iterator();
+               while (keys.hasNext()) {
+                       Object key = keys.next();
+                       if (key instanceof Reference) {
+                               Object clazz = ((Reference<?>) key).get();
+                               if (clazz instanceof Class) {
+                                       ClassLoader cl = ((Class<?>) 
clazz).getClassLoader();
+                                       while (cl != null) {
+                                               if (cl == classLoader) {
+                                                       keys.remove();
+                                                       break;
+                                               }
+                                               cl = cl.getParent();
+                                       }
+                               }
+                       }
+               }
+       }
+}

Reply via email to