davidradl commented on code in PR #27025:
URL: https://github.com/apache/flink/pull/27025#discussion_r2372562497


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java:
##########
@@ -186,15 +186,25 @@ public void configCompactFilter(
 
     private static class ListElementFilterFactory<T>
             implements FlinkCompactionFilter.ListElementFilterFactory {
+        // The classloader that should pass to the filter instances created by 
this factory.
+        // This classloader is the user code classloader when the factory is 
created.
+        private final ClassLoader classLoader;
         private final TypeSerializer<T> serializer;
 
         private ListElementFilterFactory(TypeSerializer<T> serializer) {
             this.serializer = serializer;
+            ClassLoader contextClassLoader = null;
+            try {
+                contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+            } catch (Throwable e) {
+                LOG.info("Cannot get context classloader for list state's 
compaction filter.", e);
+            }
+            this.classLoader = contextClassLoader;
         }
 
         @Override
         public FlinkCompactionFilter.ListElementFilter 
createListElementFilter() {
-            return new ListElementFilter<>(serializer);
+            return new ListElementFilter<>(serializer, classLoader);

Review Comment:
   could we have a unit test for this method also please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to