This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c354f7b [FLINK-20333][python] Fix the issue that metaspace OOM will
be thrown after submitting PyFlink UDF jobs multiple times to the standalone
cluster
c354f7b is described below
commit c354f7bd679b9fa8c1e0d75feb3827ccca7f317b
Author: Wei Zhong <[email protected]>
AuthorDate: Wed Nov 25 11:59:02 2020 +0800
[FLINK-20333][python] Fix the issue that metaspace OOM will be thrown after
submitting PyFlink UDF jobs multiple times to the standalone cluster
This closes #14205.
---
.../python/AbstractPythonFunctionOperator.java | 8 +++
.../streaming/api/utils/ClassLeakCleaner.java | 80 ++++++++++++++++++++++
2 files changed, 88 insertions(+)
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 81ffb76..3d015f5 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -38,6 +38,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ScheduledFuture;
+import static
org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
+
/**
* Base class for all stream operators to execute Python functions.
*/
@@ -141,6 +143,12 @@ public abstract class AbstractPythonFunctionOperator<OUT>
invokeFinishBundle();
} finally {
super.close();
+
+ try {
+
cleanUpLeakingClasses(this.getClass().getClassLoader());
+ } catch (Throwable t) {
+ LOG.warn("Failed to clean up the leaking
objects.", t);
+ }
}
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ClassLeakCleaner.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ClassLeakCleaner.java
new file mode 100644
index 0000000..63f7bbc
--- /dev/null
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/ClassLeakCleaner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.utils;
+
+import java.lang.ref.Reference;
+import java.lang.reflect.Field;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Utilities to clean up the leaking classes.
+ */
+public class ClassLeakCleaner {
+
+ /**
+ * For each classloader we only need to execute the cleanup method once.
+ */
+ private static boolean leakedClassesCleanedUp = false;
+
+ /**
+ * Clean up the soft references of the classes under the specified
class loader.
+ */
+ public static synchronized void cleanUpLeakingClasses(ClassLoader
classLoader)
+ throws ReflectiveOperationException, SecurityException,
ClassCastException {
+ if (!leakedClassesCleanedUp) {
+ // clear the soft references
+ // see https://bugs.openjdk.java.net/browse/JDK-8199589
for more details
+ Class<?> clazz =
Class.forName("java.io.ObjectStreamClass$Caches");
+ clearCache(clazz, "localDescs", classLoader);
+ clearCache(clazz, "reflectors", classLoader);
+
+ // It uses finalizers heavily in Netty which still
holds the references to the user class loader even after job finished.
+ // so, trigger garbage collection explicitly to:
+ // 1) trigger the execution of the `Finalizer`s of
objects created by the finished jobs of this TaskManager
+ // 2) the references to the class loader will then be
released and so the user class loader could be garbage collected finally
+ System.gc();
+ leakedClassesCleanedUp = true;
+ }
+ }
+
+ private static void clearCache(Class<?> target, String mapName,
ClassLoader classLoader)
+ throws ReflectiveOperationException, SecurityException,
ClassCastException {
+ Field f = target.getDeclaredField(mapName);
+ f.setAccessible(true);
+ Map<?, ?> map = (Map<?, ?>) f.get(null);
+ Iterator<?> keys = map.keySet().iterator();
+ while (keys.hasNext()) {
+ Object key = keys.next();
+ if (key instanceof Reference) {
+ Object clazz = ((Reference<?>) key).get();
+ if (clazz instanceof Class) {
+ ClassLoader cl = ((Class<?>)
clazz).getClassLoader();
+ while (cl != null) {
+ if (cl == classLoader) {
+ keys.remove();
+ break;
+ }
+ cl = cl.getParent();
+ }
+ }
+ }
+ }
+ }
+}