Emre Kartoglu created FLINK-39128:
-------------------------------------
Summary: DynamoDB CDC source connector millisBehindLatest metric
gets stuck after failover
Key: FLINK-39128
URL: https://issues.apache.org/jira/browse/FLINK-39128
Project: Flink
Issue Type: Bug
Components: Connectors / DynamoDB
Affects Versions: aws-connector-5.1.0
Reporter: Emre Kartoglu
Assignee: Emre Kartoglu
The Dynamo DB source connector occasionally fails with the error:
{code:java}
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:951)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
The security token included in the request is expired (Service:
DynamoDbStreams, Status Code: 400, Request ID: redacted) (SDK Attempt Count: 2)
at
software.amazon.awssdk.services.dynamodb.model.DynamoDbException$BuilderImpl.build(DynamoDbException.java:113)
at
software.amazon.awssdk.services.dynamodb.model.DynamoDbException$BuilderImpl.build(DynamoDbException.java:61)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.retryPolicyDisallowedRetryException(RetryableStageHelper.java:168)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:73)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:53)
at
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
at
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
at
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
at
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
at
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
at
software.amazon.awssdk.services.dynamodb.streams.DefaultDynamoDbStreamsClient.getRecords(DefaultDynamoDbStreamsClient.java:316)
at
org.apache.flink.connector.dynamodb.source.proxy.DynamoDbStreamsProxy.getRecords(DynamoDbStreamsProxy.java:222)
at
org.apache.flink.connector.dynamodb.source.proxy.DynamoDbStreamsProxy.getRecords(DynamoDbStreamsProxy.java:102)
at
org.apache.flink.connector.dynamodb.source.reader.PollingDynamoDbStreamsShardSplitReader.fetch(PollingDynamoDbStreamsShardSplitReader.java:126)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more{code}
I don't see any pattern in terms of how long it takes for the connector to have
this error and cause a full app restart. It has happened multiple times a day.
There are also days when it doesn't happen at all.
I'm using the Amazon Managed Service for Apache Flink. The problem could well
be in the managed service, however I only see this issue with the DynamoDB
source connector which makes me believe there might be a bug in the connector
code.
Here is how I'm instantiating the connector:
{code:java}
Configuration sourceConfig = new Configuration();
sourceConfig.setString(AWSConfigConstants.AWS_REGION, "us-west-2");
DynamoDbStreamsSource.<DdbStreamRecord>builder()
.setStreamArn(streamArn)
.setSourceConfig(sourceConfig)
.setDeserializationSchema(new DdbJsonDeserializer())
.build();
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)