Danny Cranmer created FLINK-20630:
-------------------------------------

             Summary: [Kinesis][DynamoDB] DynamoDB Consumer fails to consume 
from Latest
                 Key: FLINK-20630
                 URL: https://issues.apache.org/jira/browse/FLINK-20630
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis
    Affects Versions: 1.12.0
            Reporter: Danny Cranmer
             Fix For: 1.12.1


*Background*
When consuming from `LATEST`, the `KinesisDataFetcher` converts the shard 
iterator type into an `AT_TIMESTAMP` to ensure all shards start from the same 
position. When `LATEST` is used each shared would effectively start from a 
different point in the time.

DynamoDB streams do not support `AT_TIMESTAMP`.

*Scope*
Remove shard iterator type transform for DynamoDB streams consumer. 

*Reproduction Steps*
Create a simple application that consumer from `LATEST` using 
`FlinkDynamoDBStreamsConsumer`

*Expected Results*
Consumer starts reading records from the head of the stream

*Actual Results*
An exception is thrown:

{code}
Caused by: 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException:
 1 validation error detected: Value 'AT_TIMESTAMP' at 'shardIteratorType' 
failed to satisfy constraint: Member must satisfy enum value set: 
[AFTER_SEQUENCE_NUMBER, LATEST, AT_SEQUENCE_NUMBER, TRIM_HORIZON] (Service: 
AmazonDynamoDBStreams; Status Code: 400; Error Code: ValidationException; 
Request ID: AFQ8KCJAP74IN5MR5KD2FP0CTBVV4KQNSO5AEMVJF66Q9ASUAAJG)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.doInvoke(AmazonDynamoDBStreamsClient.java:686)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:653)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:642)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.executeGetShardIterator(AmazonDynamoDBStreamsClient.java:544)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.getShardIterator(AmazonDynamoDBStreamsClient.java:515)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient.getShardIterator(AmazonDynamoDBStreamsAdapterClient.java:355)
        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:311)
        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:302)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.getShardIterator(PollingRecordPublisher.java:173)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.<init>(PollingRecordPublisher.java:93)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:85)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:36)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:469)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher.createShardConsumer(DynamoDBStreamsDataFetcher.java:107)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:540)
        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:348)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to