[ https://issues.apache.org/jira/browse/BEAM-7043?focusedWorklogId=237933&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-237933 ]
ASF GitHub Bot logged work on BEAM-7043: ---------------------------------------- Author: ASF GitHub Bot Created on: 06/May/19 17:22 Start Date: 06/May/19 17:22 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on pull request #8390: [BEAM-7043] Add DynamoDBIO URL: https://github.com/apache/beam/pull/8390#discussion_r281275518 ########## File path: sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java ########## @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import java.util.function.Predicate; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet; +import org.apache.http.HttpStatus; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link PTransform}s for writing to <a href="https://aws.amazon.com/dynamodb/">DynamoDB</a>. + * + * <h3>Writing to DynamoDB</h3> + * + * <p>Example usage: + * + * <pre>{@code + * PCollection<BatchWriteItemRequest> data = ...; + * + * data.apply(DynamoDBIO.write() + * .withRetryConfiguration( + * DynamoDBIO.RetryConfiguration.create( + * 4, org.joda.time.Duration.standardSeconds(10))) + * .withAWSClientsProvider(new BasisDynamoDBProvider(accessKey, secretKey, region)) + * .withResultOutputTag(results)); + * }</pre> + * + * <p>As a client, you need to provide at least the following things: + * + * <ul> + * <li>retry configuration + * <li>need to specify AwsClientsProvider. You can pass on the default one BasisDynamoDBProvider + * <li>an output tag where you can get results. Example in DynamoDBIOTest + * </ul> + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public final class DynamoDBIO { + public static Read read() { + return new AutoValue_DynamoDBIO_Read.Builder() + .setNumOfItemPerSegment(Integer.MAX_VALUE) + .setNumOfSplits(1) + .build(); + } + + public static Write write() { + return new AutoValue_DynamoDBIO_Write.Builder().build(); + } + + /** A config object used to construct AmazonDynamoDB client object. */ + @AutoValue + public abstract static class DynamoDBConfiguration implements Serializable { + @Nullable + abstract ValueProvider<String> getRegion(); + + @Nullable + abstract ValueProvider<String> getEndpointUrl(); + + @Nullable + abstract ValueProvider<String> getAwsAccessKey(); + + @Nullable + abstract ValueProvider<String> getAwsSecretKey(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setRegion(ValueProvider<String> region); + + abstract Builder setEndpointUrl(ValueProvider<String> endpointUrl); + + abstract Builder setAwsAccessKey(ValueProvider<String> accessKey); + + abstract Builder setAwsSecretKey(ValueProvider<String> secretKey); + + abstract DynamoDBConfiguration build(); + } + + public static DynamoDBConfiguration create(String region, String accessKey, String secretKey) { + checkArgument(region != null, "region can not be null"); + checkArgument(accessKey != null, "accessKey can not be null"); + checkArgument(secretKey != null, "secretKey can not be null"); + return create( + null, + ValueProvider.StaticValueProvider.of(region), + ValueProvider.StaticValueProvider.of(accessKey), + ValueProvider.StaticValueProvider.of(secretKey)); + } + + public static DynamoDBConfiguration create( + String endpointUrl, String region, String accessKey, String secretKey) { + checkArgument(region != null, "region can not be null"); + checkArgument(accessKey != null, "accessKey can not be null"); + checkArgument(secretKey != null, "secretKey can not be null"); + return create( + ValueProvider.StaticValueProvider.of(endpointUrl), + ValueProvider.StaticValueProvider.of(region), + ValueProvider.StaticValueProvider.of(accessKey), + ValueProvider.StaticValueProvider.of(secretKey)); + } + + public static DynamoDBConfiguration create( + ValueProvider<String> endpointUrl, + ValueProvider<String> region, + ValueProvider<String> accessKey, + ValueProvider<String> secretKey) { + checkArgument(region != null, "region can not be null"); + checkArgument(accessKey != null, "accessKey can not be null"); + checkArgument(secretKey != null, "secretKey can not be null"); + return new AutoValue_DynamoDBIO_DynamoDBConfiguration.Builder() + .setEndpointUrl(endpointUrl) + .setRegion(region) + .setAwsAccessKey(accessKey) + .setAwsSecretKey(secretKey) + .build(); + } + + public DynamoDBConfiguration withRegion(String region) { + return withRegion(ValueProvider.StaticValueProvider.of(region)); + } + + public DynamoDBConfiguration withRegion(ValueProvider<String> region) { + return builder().setRegion(region).build(); + } + + public DynamoDBConfiguration withEndpointUrl(String endpointUrl) { + return withEndpointUrl(ValueProvider.StaticValueProvider.of(endpointUrl)); + } + + public DynamoDBConfiguration withEndpointUrl(ValueProvider<String> endpointUrl) { + return builder().setRegion(endpointUrl).build(); + } + + public DynamoDBConfiguration withAwsAccessKey(String accessKey) { + return withAwsAccessKey(ValueProvider.StaticValueProvider.of(accessKey)); + } + + public DynamoDBConfiguration withAwsAccessKey(ValueProvider<String> accessKey) { + return builder().setAwsAccessKey(accessKey).build(); + } + + public DynamoDBConfiguration withAwsSecretKey(String secretKey) { + return withAwsSecretKey(ValueProvider.StaticValueProvider.of(secretKey)); + } + + public DynamoDBConfiguration withAwsSecretKey(ValueProvider<String> secretKey) { + return builder().setAwsSecretKey(secretKey).build(); + } + + private void populateDisplayData(DisplayData.Builder builder) { + builder.addIfNotNull(DisplayData.item("region", getRegion())); + builder.addIfNotNull(DisplayData.item("accessKey", getAwsAccessKey())); + builder.addIfNotNull(DisplayData.item("secretKey", getAwsSecretKey())); + } + + AmazonDynamoDB buildAmazonDynamoDB() { + AmazonDynamoDBClientBuilder builder = AmazonDynamoDBClientBuilder.standard(); + if (getEndpointUrl() != null) { + builder.setEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(getEndpointUrl().get(), getRegion().get())); + } + if (getEndpointUrl() == null && getRegion() != null) { + builder.setRegion(getRegion().get()); + } + if (getAwsAccessKey() != null && getAwsSecretKey() != null) { + builder.setCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(getAwsAccessKey().get(), getAwsSecretKey().get()))); + } + return builder.build(); + } + } + + /** Read data from DynamoDB and return PCollection<Map<String, AttributeValue>>. */ + @AutoValue + public abstract static class Read + extends PTransform<PBegin, PCollection<Map<String, AttributeValue>>> { + @Nullable + abstract DynamoDBConfiguration getDynamoDBConfiguration(); + + @Nullable + abstract String getTableName(); + + @Nullable + abstract String getFilterExpression(); + + @Nullable + abstract Map<String, AttributeValue> getExpressionAttributeValues(); + + @Nullable + abstract Map<String, String> getExpressionAttributeNames(); + + @Nullable + abstract String getProjectionExpression(); + + abstract int getNumOfItemPerSegment(); + + abstract int getNumOfSplits(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setDynamoDBConfiguration(DynamoDBConfiguration dynamoDBConfiguration); + + abstract Builder setTableName(String tableName); + + abstract Builder setFilterExpression(String filterExpression); + + abstract Builder setExpressionAttributeValues( + Map<String, AttributeValue> filterExpressionMapValue); + + abstract Builder setExpressionAttributeNames(Map<String, String> filterExpressionMapName); + + abstract Builder setProjectionExpression(String projectionExpression); + + abstract Builder setNumOfItemPerSegment(int numOfItemPerSegment); + + abstract Builder setNumOfSplits(int numOfSplits); + + abstract Read build(); + } + + public Read withDynamoDBConfiguration(DynamoDBConfiguration dynamoDBConfiguration) { + return toBuilder().setDynamoDBConfiguration(dynamoDBConfiguration).build(); + } + + public Read withTableName(String tableName) { + return toBuilder().setTableName(tableName).build(); + } + + public Read withFilterExpression(String filterExpression) { + return toBuilder().setFilterExpression(filterExpression).build(); + } + + public Read withExpressionAttributeNames(Map<String, String> filterExpressionMapName) { + return toBuilder().setExpressionAttributeNames(filterExpressionMapName).build(); + } + + public Read withExpressionAttributeValues( + Map<String, AttributeValue> filterExpressionMapValue) { + return toBuilder().setExpressionAttributeValues(filterExpressionMapValue).build(); + } + + public Read withProjectionExpression(String projectionExpression) { + return toBuilder().setProjectionExpression(projectionExpression).build(); + } + + public Read withNumOfItemPerSegment(int numOfItemPerSegment) { + return toBuilder().setNumOfItemPerSegment(numOfItemPerSegment).build(); + } + + public Read withNumOfSplits(int numOfSplits) { + return toBuilder().setNumOfSplits(numOfSplits).build(); + } + + @Override + public PCollection<Map<String, AttributeValue>> expand(PBegin input) { + checkArgument(getTableName() != null, "withTableName() is required"); + checkArgument( + (getDynamoDBConfiguration() != null), "withDynamoDBConfiguration() is required"); + + return input.apply(org.apache.beam.sdk.io.Read.from(new DynamoDBBoundedSource(this, 0))); + } + } + + /** + * A POJO encapsulating a configuration for retry behavior when issuing requests to dynamodb. A + * retry will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes + * first, for any of the following exceptions: + * + * <ul> + * <li>{@link IOException} + * </ul> + */ + @AutoValue + public abstract static class RetryConfiguration implements Serializable { + @VisibleForTesting + static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate(); + + abstract int getMaxAttempts(); + + abstract Duration getMaxDuration(); + + abstract DynamoDBIO.RetryConfiguration.RetryPredicate getRetryPredicate(); + + abstract DynamoDBIO.RetryConfiguration.Builder builder(); + + public static DynamoDBIO.RetryConfiguration create(int maxAttempts, Duration maxDuration) { + checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0"); + checkArgument( + maxDuration != null && maxDuration.isLongerThan(Duration.ZERO), + "maxDuration should be greater than 0"); + return new AutoValue_DynamoDBIO_RetryConfiguration.Builder() + .setMaxAttempts(maxAttempts) + .setMaxDuration(maxDuration) + .setRetryPredicate(DEFAULT_RETRY_PREDICATE) + .build(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract DynamoDBIO.RetryConfiguration.Builder setMaxAttempts(int maxAttempts); + + abstract DynamoDBIO.RetryConfiguration.Builder setMaxDuration(Duration maxDuration); + + abstract DynamoDBIO.RetryConfiguration.Builder setRetryPredicate( + RetryPredicate retryPredicate); + + abstract DynamoDBIO.RetryConfiguration build(); + } + + /** + * An interface used to control if we retry the BatchWriteItemRequest call when a {@link + * Throwable} occurs. If {@link RetryPredicate#test(Object)} returns true, {@link Write} tries + * to resend the requests to the Solr server if the {@link RetryConfiguration} permits it. Review comment: Please, change "Solr server" to "DynamoDB server" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 237933) Time Spent: 4h (was: 3h 50m) > Add DynamoDBIO > -------------- > > Key: BEAM-7043 > URL: https://issues.apache.org/jira/browse/BEAM-7043 > Project: Beam > Issue Type: New Feature > Components: io-java-aws > Reporter: Cam Mach > Assignee: Cam Mach > Priority: Minor > Time Spent: 4h > Remaining Estimate: 0h > > Currently we don't have any feature to write data to AWS DynamoDB. This > feature will enable us to send data to DynamoDB -- This message was sent by Atlassian JIRA (v7.6.3#76005)