Classloading in worker thread context
Hi, Due to limitations with Flink's metrics API* I'm using Micrometer to collect supplemental application metrics in my Flink 1.13 application (hosted on Kinesis Data Analytics), and publish them to AWS CloudWatch. For the most part it works well. However when handling errors from CloudWatch, the worker thread hits java.lang.NoClassDefFoundError. This is despite the classes in question being included in the application JAR (which I confirmed by dumping the JAR contents.) I hold the Micrometer registry in a "private static final" field, which may be part of the issue; I want the metric registry to be a singleton in the JVM to minimize costs incurred publishing to CloudWatch. (Micrometer is fully thread-safe.) I know this metrics approach isn't very standard in the Flink world, but perhaps is there a way it can be made to work? (Note: this is hosted on Kinesis Data Analytics where the Flink configuration cannot generally be changed.) Exception 1: java.lang.NoClassDefFoundError: software/amazon/awssdk/core/internal/util/ThrowableUtils at software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.lambda$doExecute$4(BaseAsyncClientHandler.java:222) ~[?:?] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?] ... 42 more Wrapped by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: software/amazon/awssdk/core/internal/util/ThrowableUtils at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?] at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:54) ~[?:?] ... at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:209) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?] Exception 2: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:169) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:193) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1196) ~[?:?] at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221) ~[?:?] at java.util.ServiceLoader$LazyClassPathLookupIterator$1.run(ServiceLoader.java:1268) ~[?:?] at java.util.ServiceLoader$LazyClassPathLookupIterator$1.run(ServiceLoader.java:1267) ~[?:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:?] at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1270) ~[?:?] at java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300) ~[?:?] at java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385) ~[?:?] at javax.xml.stream.FactoryFinder$1.run(FactoryFinder.java:348) ~[?:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:?] at javax.xml.stream.FactoryFinder.findServiceProvider(FactoryFinder.java:337) ~[?:?] at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:309) ~[?:?] at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:222) ~[?:?] at javax.xml.stream.XMLInputFactory.newInstance(XMLInputFactory.java:161) ~[?:?]
Metrics outside of RichFunction context
Hi, I'm using StreamingFileSink to collect records into avro files. Inside of the BulkWriter implementation, I have to do some operations (such as dynamic schema lookup) which I want to record metrics about. However, the BulkWriter API, as it is defined, does not accept a RuntimeContext or MetricsGroup from the StreamingFileSink, so I seemingly don't have access to the metrics API. And it doesn't look like StreamingFileSink is particularly extensible. What alternatives do I have besides forking StreamingFileSink, or bypassing the Flink metrics API entirely? Thanks, John