[ 
https://issues.apache.org/jira/browse/FLINK-28007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-28007:
-------------------------------
    Description: 
h3. Background

The unit & integration tests for the aws connectors in the Flink repository 
create clients using static helper methods in flink-connector-aws-base, in the 
AWSServicesTestUtils class.

These static helper methods create the asynchronous flavour of the clients 
required by aws connectors.

*Task*

* Change these to the synchronous version for each aws client.

  was:
h3. Background

AWS SDK v2 async clients use a Netty async client for Kinesis Data 
Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a 
shared thread pool for Netty to use for network operations when one is not 
configured. The thread pool is managed by a shared ELG (event loop group), and 
this is stored in a static field. We do not configure this for the AWS 
connectors in the Flink codebase. 

When threads are spawned within the ELG, they inherit the context classloader 
from the current thread. If the ELG is created from a shared classloader, for 
instance Flink parent classloader, or MiniCluster parent classloader, multiple 
Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink 
job, it will inherit the Flink user classloader. When this job completes/fails, 
the classloader is destroyed, however the Netty thread is still referencing it, 
and this leads to below exception.

h3. Impact

This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via 
the Flink User Classloader. It is expected this is the standard deployment 
configuration.

This issue is known to impact:
- Flink mini cluster, for example in integration tests (FLINK-26064)
- Flink cluster loading AWS SDK v2 via parent classloader

h3. Suggested solution

There are a few possible solutions, as discussed 
https://github.com/apache/flink/pull/18733
1. Create a separate ELG per Flink job
2. Create a separate ELG per subtask
3. Attach the correct classloader to ELG spawned threads

h3. Error Stack

(shortened stack trace, as full is too large)
{noformat}
Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04         at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
Feb 09 20:05:04         at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
(...)
Feb 09 20:05:04 Caused by: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04         at 
software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
Feb 09 20:05:04         at 
software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
Feb 09 20:05:04         at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
Feb 09 20:05:04         at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
Feb 09 20:05:04         at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
Feb 09 20:05:04         at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
(...)
Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
closed classloader. Please check if you store classloaders directly or 
indirectly in static fields. If the stacktrace suggests that the leak occurs in 
a third party library and cannot be fixed immediately, you can disable this 
check with the configuration 'classloader.check-leaked-classloader'.
Feb 09 20:05:04         at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
Feb 09 20:05:04         at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
Feb 09 20:05:04         at 
java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)
Feb 09 20:05:04         at 
java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
Feb 09 20:05:04         at 
java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
Feb 09 20:05:04         at 
javax.xml.stream.FactoryFinder$1.run(FactoryFinder.java:352)
Feb 09 20:05:04         at java.security.AccessController.doPrivileged(Native 
Method)
Feb 09 20:05:04         at 
javax.xml.stream.FactoryFinder.findServiceProvider(FactoryFinder.java:341)
Feb 09 20:05:04         at 
javax.xml.stream.FactoryFinder.find(FactoryFinder.java:313)
Feb 09 20:05:04         at 
javax.xml.stream.FactoryFinder.find(FactoryFinder.java:227)
Feb 09 20:05:04         at 
javax.xml.stream.XMLInputFactory.newInstance(XMLInputFactory.java:154)
Feb 09 20:05:04         at 
software.amazon.awssdk.protocols.query.unmarshall.XmlDomParser.createXmlInputFactory(XmlDomParser.java:124)
Feb 09 20:05:04         at 
java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284)
Feb 09 20:05:04         at 
java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
Feb 09 20:05:04         at java.lang.ThreadLocal.get(ThreadLocal.java:170)
(...)
{noformat}



> Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
> ----------------------------------------------------------------
>
>                 Key: FLINK-28007
>                 URL: https://issues.apache.org/jira/browse/FLINK-28007
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common, Connectors / Kinesis
>    Affects Versions: 1.15.0, 1.15.1, 1.15.2
>            Reporter: Zichen Liu
>            Assignee: Zichen Liu
>            Priority: Minor
>             Fix For: 1.16.0
>
>
> h3. Background
> The unit & integration tests for the aws connectors in the Flink repository 
> create clients using static helper methods in flink-connector-aws-base, in 
> the AWSServicesTestUtils class.
> These static helper methods create the asynchronous flavour of the clients 
> required by aws connectors.
> *Task*
> * Change these to the synchronous version for each aws client.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to