[ 
https://issues.apache.org/jira/browse/FLINK-30304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-30304:
----------------------------------
    Description: 
AWS Sinks based on Async Sink can enter a deadlock situation if the AWS async 
client throws error outside of the future. We observed this with a local 
application:
{code:java}
java.lang.NullPointerException
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.closedChannelMessage(NettyUtils.java:135)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:71)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:310)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:189)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.ensureAcquiredChannelIsHealthy(HealthCheckedChannelPool.java:114)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$tryAcquire$1(HealthCheckedChannelPool.java:97)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at 
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at 
org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829){code}
Related AWS SDK issues that can cause this:
 * [https://github.com/aws/aws-sdk-java-v2/issues/3435]
 * [https://github.com/aws/aws-sdk-java-v2/issues/1812]

If an error is thrown and not handled by the future then the AsyncSink will 
never decrement {{inFlightRequestCount}}. the job will hang while trying flush 
for checkpoint

  was:
AWS Sinks based on Async Sink can enter a deadlock situation if the AWS async 
client throws error outside of the future. We observed this with a local 
application:
{code:java}
java.lang.NullPointerException
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.closedChannelMessage(NettyUtils.java:135)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:71)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:310)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:189)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.ensureAcquiredChannelIsHealthy(HealthCheckedChannelPool.java:114)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$tryAcquire$1(HealthCheckedChannelPool.java:97)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at 
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at 
org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829){code}
Related AWS SDK issues that can cause this:
 * [https://github.com/aws/aws-sdk-java-v2/issues/3435]
 * [https://github.com/aws/aws-sdk-java-v2/issues/1812]

If an error is thrown from this 
[line|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java#L177],
 then the AsyncSink will never decrement {{inFlightRequestCount}}. the job will 
hang while trying flush for checkpoint


> Possible Deadlock in Kinesis/Firehose/DynamoDB Connector
> --------------------------------------------------------
>
>                 Key: FLINK-30304
>                 URL: https://issues.apache.org/jira/browse/FLINK-30304
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / DynamoDB, Connectors / Firehose, Connectors 
> / Kinesis
>    Affects Versions: 1.16.0, 1.15.3, aws-connector-3.0.0
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Critical
>             Fix For: 1.17.0, 1.16.1, 1.15.4, aws-connector-4.0.0, 
> aws-connector-3.1.0
>
>
> AWS Sinks based on Async Sink can enter a deadlock situation if the AWS async 
> client throws error outside of the future. We observed this with a local 
> application:
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.closedChannelMessage(NettyUtils.java:135)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:71)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:310)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:189)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.ensureAcquiredChannelIsHealthy(HealthCheckedChannelPool.java:114)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$tryAcquire$1(HealthCheckedChannelPool.java:97)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> at 
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at 
> org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.base/java.lang.Thread.run(Thread.java:829){code}
> Related AWS SDK issues that can cause this:
>  * [https://github.com/aws/aws-sdk-java-v2/issues/3435]
>  * [https://github.com/aws/aws-sdk-java-v2/issues/1812]
> If an error is thrown and not handled by the future then the AsyncSink will 
> never decrement {{inFlightRequestCount}}. the job will hang while trying 
> flush for checkpoint



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to