Classloading in worker thread context

2021-11-23 Thread John Karp
Hi,

Due to limitations with Flink's metrics API* I'm using Micrometer to
collect supplemental application metrics in my Flink 1.13 application
(hosted on Kinesis Data Analytics), and publish them to AWS
CloudWatch. For the most part it works well. However when handling
errors from CloudWatch, the worker thread hits
java.lang.NoClassDefFoundError.

This is despite the classes in question being included in the
application JAR (which I confirmed by dumping the JAR contents.) I
hold the Micrometer registry in a "private static final" field, which
may be part of the issue; I want the metric registry to be a singleton
in the JVM to minimize costs incurred publishing to CloudWatch.
(Micrometer is fully thread-safe.)

I know this metrics approach isn't very standard in the Flink world,
but perhaps is there a way it can be made to work? (Note: this is
hosted on Kinesis Data Analytics where the Flink configuration cannot
generally be changed.)

Exception 1:

java.lang.NoClassDefFoundError:
software/amazon/awssdk/core/internal/util/ThrowableUtils
at 
software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.lambda$doExecute$4(BaseAsyncClientHandler.java:222)
~[?:?]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
~[?:?]
... 42 more
Wrapped by: java.util.concurrent.CompletionException:
java.lang.NoClassDefFoundError:
software/amazon/awssdk/core/internal/util/ThrowableUtils
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
~[?:?]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
~[?:?]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932)
~[?:?]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
~[?:?]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
~[?:?]
at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:54)
~[?:?]
...
at 
software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:209)
~[?:?]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
~[?:?]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
~[?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
at java.lang.Thread.run(Thread.java:829) ~[?:?]

Exception 2:

java.lang.IllegalStateException: Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in
static fields. If the stacktrace suggests that the leak occurs in a
third party library and cannot be fixed immediately, you can disable
this check with the configuration
'classloader.check-leaked-classloader'.
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:169)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:193)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1196)
~[?:?]
at 
java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)
~[?:?]
at 
java.util.ServiceLoader$LazyClassPathLookupIterator$1.run(ServiceLoader.java:1268)
~[?:?]
at 
java.util.ServiceLoader$LazyClassPathLookupIterator$1.run(ServiceLoader.java:1267)
~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at 
java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1270)
~[?:?]
at java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300) ~[?:?]
at java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385) ~[?:?]
at javax.xml.stream.FactoryFinder$1.run(FactoryFinder.java:348) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at 
javax.xml.stream.FactoryFinder.findServiceProvider(FactoryFinder.java:337)
~[?:?]
at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:309) ~[?:?]
at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:222) ~[?:?]
at 
javax.xml.stream.XMLInputFactory.newInstance(XMLInputFactory.java:161)
~[?:?]
 

Metrics outside of RichFunction context

2021-08-19 Thread John Karp
Hi,

I'm using StreamingFileSink to collect records into avro files. Inside of
the BulkWriter implementation, I have to do some operations (such as
dynamic schema lookup) which I want to record metrics about. However, the
BulkWriter API, as it is defined, does not accept a RuntimeContext or
MetricsGroup from the StreamingFileSink, so I seemingly don't have access
to the metrics API. And it doesn't look like StreamingFileSink is
particularly extensible. What alternatives do I have besides forking
StreamingFileSink, or bypassing the Flink metrics API entirely?

Thanks,
John