Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-29 Thread Xiaolong Wang
Thanks, this looks similar. I'll do some work around.

On Thu, Feb 29, 2024 at 5:15 PM Aleksandr Pilipenko 
wrote:

> Based on the stacktrace, this looks like an issue described here:
> https://issues.apache.org/jira/browse/FLINK-32964
> Is your configuration similar to the one described in the ticket? If so,
> you can work around this issue by explicitly specifying the credentials
> provider for connector, by doing so avoiding using
> DefaultCredentialsProvider (AUTO).
>
> Caused by: java.lang.IllegalStateException: Connection pool shut down
> ...
> at
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.
> *WebIdentityTokenFileCredentialsProvider*
> .resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:
> 143) ~[?:?]
> ...
> at
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.
> *DefaultCredentialsProvider*
> .resolveCredentials(DefaultCredentialsProvider.java:
> 128) ~[?:?]
>
> On Thu, 29 Feb 2024 at 02:24, Xiaolong Wang
>  wrote:
>
> > Sorry, I just attached a wrong file. Let me paste the error log:
> >
> > java.lang.RuntimeException: Maximum retries exceeded for
> SubscribeToShard.
> > Failed 10 times.
> > at
> >
> >
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:
> > 211) ~[?:?]
> > at
> >
> >
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:
> > 130) ~[?:?]
> > at
> >
> >
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:
> > 114) ~[?:?]
> > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> > ~[?:?
> > ]
> > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > ~[?:?]
> > at java.lang.Thread.run(Unknown Source) ~[?:?]
> > Caused by: java.lang.IllegalStateException: Connection pool shut down
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:
> > 34) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:
> > 269) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
> > $
> >
> >
> DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
> > 75) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
> > $
> >
> >
> InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
> > 57) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:
> > 176) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:
> > 186) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:
> > 185) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
> > 83) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
> > 56) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:
> > 72) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:
> > 254) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access
> > $500(ApacheHttpClient.java:104) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient
> > $1.call(ApacheHttpClient.java:231) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient
> > $1.call(ApacheHttpClient.java:228) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:
> > 67) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:
> > 77) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:
> > 56) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stag

Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-29 Thread Aleksandr Pilipenko
Based on the stacktrace, this looks like an issue described here:
https://issues.apache.org/jira/browse/FLINK-32964
Is your configuration similar to the one described in the ticket? If so,
you can work around this issue by explicitly specifying the credentials
provider for connector, by doing so avoiding using
DefaultCredentialsProvider (AUTO).

Caused by: java.lang.IllegalStateException: Connection pool shut down
...
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.
*WebIdentityTokenFileCredentialsProvider*
.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:
143) ~[?:?]
...
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.
*DefaultCredentialsProvider*
.resolveCredentials(DefaultCredentialsProvider.java:
128) ~[?:?]

On Thu, 29 Feb 2024 at 02:24, Xiaolong Wang
 wrote:

> Sorry, I just attached a wrong file. Let me paste the error log:
>
> java.lang.RuntimeException: Maximum retries exceeded for SubscribeToShard.
> Failed 10 times.
> at
>
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:
> 211) ~[?:?]
> at
>
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:
> 130) ~[?:?]
> at
>
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:
> 114) ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?
> ]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.IllegalStateException: Connection pool shut down
> at
>
> org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:
> 34) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:
> 269) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
> $
>
> DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
> 75) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
> $
>
> InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
> 57) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:
> 176) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:
> 186) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:
> 185) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
> 83) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
> 56) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:
> 72) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:
> 254) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access
> $500(ApacheHttpClient.java:104) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient
> $1.call(ApacheHttpClient.java:231) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient
> $1.call(ApacheHttpClient.java:228) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:
> 67) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:
> 77) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:
> 56) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:
> 39) ~[?:?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder
> $ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> ~[?:
> ?]
> at
>
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder
> $ComposingRequestPipeline

Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-28 Thread Xiaolong Wang
Sorry, I just attached a wrong file. Let me paste the error log:

java.lang.RuntimeException: Maximum retries exceeded for SubscribeToShard.
Failed 10 times.
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:
211) ~[?:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:
130) ~[?:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:
114) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?
]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: Connection pool shut down
at
org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:
34) ~[?:?]
at
org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:
269) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
$
DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
75) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
$
InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
57) ~[?:?]
at
org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:
176) ~[?:?]
at
org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:
186) ~[?:?]
at
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:
185) ~[?:?]
at
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
83) ~[?:?]
at
org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
56) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:
72) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:
254) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access
$500(ApacheHttpClient.java:104) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient
$1.call(ApacheHttpClient.java:231) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient
$1.call(ApacheHttpClient.java:228) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:
67) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:
77) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:
56) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:
39) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder
$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?:
?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder
$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?:
?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder
$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?:
?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder
$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) ~[?:
?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:
72) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:
42) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:
78) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipelin

Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-28 Thread Aleksandr Pilipenko
Hi,

Could you please provide more information on the error you are observing?
Attached file does not have anything related to Kinesis or any errors.

Best,
Aleksandr

On Wed, 28 Feb 2024 at 02:28, Xiaolong Wang
 wrote:

> Hi,
>
> I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis.
> The job can start but will fail within 1 hour. Detailed error log
> is attached.
>
> When I changed the version of the flink-connector-kinesis to `1.15.2` ,
> everything settled.
>
> Any idea to fix it ?
>
>


Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-27 Thread Xiaolong Wang
Hi,

I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis.
The job can start but will fail within 1 hour. Detailed error log
is attached.

When I changed the version of the flink-connector-kinesis to `1.15.2` ,
everything settled.

Any idea to fix it ?
create table kafka_event_v1 (
  `timestamp` bigint,
  serverTimestamp bigint,

  name string,
  data string,
  app string,
  `user` string,
  context string,
  browser row<
url string,
referrer string,
userAgent string,
`language` string,
title string,
viewportWidth int,
viewportHeight int,
contentWidth int,
contentHeight int,
cookies map,
name string,
version string,
device row<
  model string,
  type string,
  vendor string
>,
engine row<
  name string,
  version string
>,
os row<
  name string,
  version string
>
  >,
  abtests map,
  apikey string,
  lifecycleId string,
  sessionId string,
  instanceId string,
  requestId string,
  eventId string,
  `trigger` string,

  virtualId string,
  accountId string,
  ip string,

  serverTimestampLtz as to_timestamp(from_unixtime(serverTimestamp / 1000)),
  watermark for serverTimestampLtz as serverTimestampLtz - interval '5' second
) with (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'shared.kafka.smartnews.internal:9093',
  'topic' = 'shared-cluster-sn-pixel-event-v1-dev',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true',

  'properties.group.id' = 'event_v2',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'SNTOKEN',
  'properties.sasl.login.class' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLogin',
  'properties.sasl.login.callback.handler.class' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenCallbackHandler',
  'properties.sasl.client.callback.handler.class' = 
'com.smartnews.dp.kafka.security.sn.sasl.SnTokenSaslClientCallbackHandler',
  'properties.sasl.jaas.config' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLoginModule required 
username="sn-pixel" password="aQXJcNUsCuIZpICHO9bQ" env="prd";',
  'properties.ssl.truststore.type' = 'PEM'
);

create catalog iceberg_dev with (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://dev-hive-metastore.smartnews.internal:9083',
  'warehouse'='s3a://smartnews-dmp/warehouse/development'
);

insert into iceberg_dev.pixel.event_v2 /*+ options(
  'partition.time-extractor.timestamp-pattern'='$dt 00:00:00',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'auto-compaction'='true'
) */
select
  `timestamp`,
  serverTimestamp,
  data,
  app,
  `user`,
  context,
  browser,
  abtests,
  lifecycleId,
  sessionId,
  instanceId,
  requestId,
  eventId,
  `trigger`,

  virtualId,
  accountId,
  ip,

  date_format(serverTimestampLtz, '-MM-dd') dt,
  apikey,
  name
from
  default_catalog.default_database.kafka_event_v1
;