This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 09995159a0 [Bugfix][AmazonDynamoDB] Fix the problem that all table
data cannot be obtained (#5146)
09995159a0 is described below
commit 09995159a0e47a6475f69c13e24b5d0ca6f3498f
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Aug 9 12:27:01 2023 +0800
[Bugfix][AmazonDynamoDB] Fix the problem that all table data cannot be
obtained (#5146)
---
.../source/AmazonDynamoDBSourceReader.java | 33 ++++++++++++++--------
1 file changed, 21 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index afaafa3f8a..c25f8b0e0b 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -31,11 +31,13 @@ import
software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import java.io.IOException;
import java.net.URI;
+import java.util.Map;
@Slf4j
public class AmazonDynamoDBSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
@@ -78,18 +80,25 @@ public class AmazonDynamoDBSourceReader extends
AbstractSingleSplitReader<SeaTun
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- ScanResponse scan =
- dynamoDbClient.scan(
- ScanRequest.builder()
-
.tableName(amazondynamodbSourceOptions.getTable())
- .build());
- if (scan.hasItems()) {
- scan.items()
- .forEach(
- item -> {
-
output.collect(seaTunnelRowDeserializer.deserialize(item));
- });
- }
+ Map<String, AttributeValue> lastKeyEvaluated = null;
+
+ ScanResponse scan;
+ do {
+ scan =
+ dynamoDbClient.scan(
+ ScanRequest.builder()
+
.tableName(amazondynamodbSourceOptions.getTable())
+ .exclusiveStartKey(lastKeyEvaluated)
+ .build());
+ if (scan.hasItems()) {
+ scan.items()
+ .forEach(
+ item -> {
+
output.collect(seaTunnelRowDeserializer.deserialize(item));
+ });
+ }
+ lastKeyEvaluated = scan.lastEvaluatedKey();
+ } while (lastKeyEvaluated != null && !lastKeyEvaluated.isEmpty());
context.signalNoMoreElement();
}
}