Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
Thanks, this looks similar. I'll do some work around. On Thu, Feb 29, 2024 at 5:15 PM Aleksandr Pilipenko wrote: > Based on the stacktrace, this looks like an issue described here: > https://issues.apache.org/jira/browse/FLINK-32964 > Is your configuration similar to the one described in the ticket? If so, > you can work around this issue by explicitly specifying the credentials > provider for connector, by doing so avoiding using > DefaultCredentialsProvider (AUTO). > > Caused by: java.lang.IllegalStateException: Connection pool shut down > ... > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials. > *WebIdentityTokenFileCredentialsProvider* > .resolveCredentials(WebIdentityTokenFileCredentialsProvider.java: > 143) ~[?:?] > ... > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials. > *DefaultCredentialsProvider* > .resolveCredentials(DefaultCredentialsProvider.java: > 128) ~[?:?] > > On Thu, 29 Feb 2024 at 02:24, Xiaolong Wang > wrote: > > > Sorry, I just attached a wrong file. Let me paste the error log: > > > > java.lang.RuntimeException: Maximum retries exceeded for > SubscribeToShard. > > Failed 10 times. > > at > > > > > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java: > > 211) ~[?:?] > > at > > > > > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java: > > 130) ~[?:?] > > at > > > > > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java: > > 114) ~[?:?] > > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > > ~[?:? > > ] > > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > ~[?:?] > > at java.lang.Thread.run(Unknown Source) ~[?:?] > > Caused by: java.lang.IllegalStateException: Connection pool shut down > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java: > > 34) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java: > > 269) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory > > $ > > > > > DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: > > 75) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory > > $ > > > > > InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: > > 57) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java: > > 176) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java: > > 186) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java: > > 185) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: > > 83) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: > > 56) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java: > > 72) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java: > > 254) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access > > $500(ApacheHttpClient.java:104) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient > > $1.call(ApacheHttpClient.java:231) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient > > $1.call(ApacheHttpClient.java:228) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java: > > 67) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java: > > 77) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java: > > 56) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stag
Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
Based on the stacktrace, this looks like an issue described here: https://issues.apache.org/jira/browse/FLINK-32964 Is your configuration similar to the one described in the ticket? If so, you can work around this issue by explicitly specifying the credentials provider for connector, by doing so avoiding using DefaultCredentialsProvider (AUTO). Caused by: java.lang.IllegalStateException: Connection pool shut down ... at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials. *WebIdentityTokenFileCredentialsProvider* .resolveCredentials(WebIdentityTokenFileCredentialsProvider.java: 143) ~[?:?] ... at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials. *DefaultCredentialsProvider* .resolveCredentials(DefaultCredentialsProvider.java: 128) ~[?:?] On Thu, 29 Feb 2024 at 02:24, Xiaolong Wang wrote: > Sorry, I just attached a wrong file. Let me paste the error log: > > java.lang.RuntimeException: Maximum retries exceeded for SubscribeToShard. > Failed 10 times. > at > > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java: > 211) ~[?:?] > at > > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java: > 130) ~[?:?] > at > > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java: > 114) ~[?:?] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:? > ] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: java.lang.IllegalStateException: Connection pool shut down > at > > org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java: > 34) ~[?:?] > at > > org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java: > 269) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory > $ > > DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: > 75) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory > $ > > InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: > 57) ~[?:?] > at > > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java: > 176) ~[?:?] > at > > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java: > 186) ~[?:?] > at > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java: > 185) ~[?:?] > at > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: > 83) ~[?:?] > at > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: > 56) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java: > 72) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java: > 254) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access > $500(ApacheHttpClient.java:104) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient > $1.call(ApacheHttpClient.java:231) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient > $1.call(ApacheHttpClient.java:228) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java: > 67) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java: > 77) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java: > 56) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java: > 39) ~[?:?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder > $ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > ~[?: > ?] > at > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder > $ComposingRequestPipeline
Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
Sorry, I just attached a wrong file. Let me paste the error log: java.lang.RuntimeException: Maximum retries exceeded for SubscribeToShard. Failed 10 times. at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java: 211) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java: 130) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java: 114) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:? ] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: java.lang.IllegalStateException: Connection pool shut down at org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java: 34) ~[?:?] at org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java: 269) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory $ DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: 75) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory $ InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: 57) ~[?:?] at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java: 176) ~[?:?] at org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java: 186) ~[?:?] at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java: 185) ~[?:?] at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: 83) ~[?:?] at org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: 56) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java: 72) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java: 254) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access $500(ApacheHttpClient.java:104) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient $1.call(ApacheHttpClient.java:231) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient $1.call(ApacheHttpClient.java:228) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java: 67) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java: 77) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java: 56) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java: 39) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder $ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?: ?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder $ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?: ?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder $ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?: ?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder $ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?: ?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java: 72) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java: 42) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java: 78) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipelin
Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
Hi, Could you please provide more information on the error you are observing? Attached file does not have anything related to Kinesis or any errors. Best, Aleksandr On Wed, 28 Feb 2024 at 02:28, Xiaolong Wang wrote: > Hi, > > I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis. > The job can start but will fail within 1 hour. Detailed error log > is attached. > > When I changed the version of the flink-connector-kinesis to `1.15.2` , > everything settled. > > Any idea to fix it ? > >
Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
Hi, I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis. The job can start but will fail within 1 hour. Detailed error log is attached. When I changed the version of the flink-connector-kinesis to `1.15.2` , everything settled. Any idea to fix it ? create table kafka_event_v1 ( `timestamp` bigint, serverTimestamp bigint, name string, data string, app string, `user` string, context string, browser row< url string, referrer string, userAgent string, `language` string, title string, viewportWidth int, viewportHeight int, contentWidth int, contentHeight int, cookies map, name string, version string, device row< model string, type string, vendor string >, engine row< name string, version string >, os row< name string, version string > >, abtests map, apikey string, lifecycleId string, sessionId string, instanceId string, requestId string, eventId string, `trigger` string, virtualId string, accountId string, ip string, serverTimestampLtz as to_timestamp(from_unixtime(serverTimestamp / 1000)), watermark for serverTimestampLtz as serverTimestampLtz - interval '5' second ) with ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'shared.kafka.smartnews.internal:9093', 'topic' = 'shared-cluster-sn-pixel-event-v1-dev', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'properties.group.id' = 'event_v2', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'SNTOKEN', 'properties.sasl.login.class' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenLogin', 'properties.sasl.login.callback.handler.class' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenCallbackHandler', 'properties.sasl.client.callback.handler.class' = 'com.smartnews.dp.kafka.security.sn.sasl.SnTokenSaslClientCallbackHandler', 'properties.sasl.jaas.config' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenLoginModule required username="sn-pixel" password="aQXJcNUsCuIZpICHO9bQ" env="prd";', 'properties.ssl.truststore.type' = 'PEM' ); create catalog iceberg_dev with ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://dev-hive-metastore.smartnews.internal:9083', 'warehouse'='s3a://smartnews-dmp/warehouse/development' ); insert into iceberg_dev.pixel.event_v2 /*+ options( 'partition.time-extractor.timestamp-pattern'='$dt 00:00:00', 'sink.partition-commit.policy.kind'='metastore,success-file', 'auto-compaction'='true' ) */ select `timestamp`, serverTimestamp, data, app, `user`, context, browser, abtests, lifecycleId, sessionId, instanceId, requestId, eventId, `trigger`, virtualId, accountId, ip, date_format(serverTimestampLtz, '-MM-dd') dt, apikey, name from default_catalog.default_database.kafka_event_v1 ;