[ 
https://issues.apache.org/jira/browse/BEAM-7557?focusedWorklogId=272209&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-272209
 ]

ASF GitHub Bot logged work on BEAM-7557:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Jul/19 14:02
            Start Date: 04/Jul/19 14:02
    Worklog Time Spent: 10m 
      Work Description: cmachgodaddy commented on pull request #8987: 
[BEAM-7557] - Migrate DynamoDBIO to AWS SDK for Java 2
URL: https://github.com/apache/beam/pull/8987#discussion_r300412735
 
 

 ##########
 File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
 ##########
 @@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.dynamodb;
+
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.http.HttpStatus;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+/**
+ * {@link PTransform}s to read/write from/to <a
+ * href="https://aws.amazon.com/dynamodb/";>DynamoDB</a>.
+ *
+ * <h3>Writing to DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<T> data = ...;
+ * data.apply(
+ *           DynamoDBIO.<WriteRequest>write()
+ *               .withWriteRequestMapperFn(
+ *                   (SerializableFunction<T, KV<String, WriteRequest>>)
+ *                       //Transforming your T data into KV<String, 
WriteRequest>
+ *                       t -> KV.of(tableName, writeRequest))
+ *               .withRetryConfiguration(
+ *                    DynamoDBIO.RetryConfiguration.create(5, 
Duration.standardMinutes(1)))
+ *               .withAwsClientsProvider(new BasicSnsProvider(accessKey, 
secretKey, region));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>Retry configuration
+ *   <li>Specify AwsClientsProvider. You can pass on the default one 
BasicSnsProvider
+ *   <li>Mapper function with a table name to map or transform your object 
into KV<tableName,
+ *       writeRequest>
+ * </ul>
+ *
+ * <h3>Reading from DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<List<Map<String, AttributeValue>>> output =
+ *     pipeline.apply(
+ *             DynamoDBIO.<List<Map<String, AttributeValue>>>read()
+ *                 .withAwsClientsProvider(new BasicSnsProvider(accessKey, 
secretKey, region))
+ *                 .withScanRequestFn(
+ *                     (SerializableFunction<Void, ScanRequest>)
+ *                         input -> new 
ScanRequest(tableName).withTotalSegments(1))
+ *                 .items());
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>Specify AwsClientsProvider. You can pass on the default one 
BasicSnsProvider
+ *   <li>ScanRequestFn, which you build a ScanRequest object with at least 
table name and total
+ *       number of segment. Note This number should base on the number of your 
workers
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class DynamoDBIO {
+  public static <T> Read<T> read() {
+    return new AutoValue_DynamoDBIO_Read.Builder().build();
+  }
+
+  public static <T> Write<T> write() {
+    return new AutoValue_DynamoDBIO_Write.Builder().build();
+  }
+
+  /** Read data from DynamoDB and return ScanResult. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+    @Nullable
+    abstract AwsClientsProvider getAwsClientsProvider();
+
+    @Nullable
+    abstract SerializableFunction<Void, ScanRequest> getScanRequestFn();
+
+    @Nullable
+    abstract Integer getSegmentId();
+
+    @Nullable
+    abstract SerializableFunction<ScanResponse, T> getScanResponseMapperFn();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setAwsClientsProvider(AwsClientsProvider 
awsClientsProvider);
+
+      abstract Builder<T> setScanRequestFn(SerializableFunction<Void, 
ScanRequest> fn);
+
+      abstract Builder<T> setSegmentId(Integer segmentId);
+
+      abstract Builder<T> setScanResponseMapperFn(
+          SerializableFunction<ScanResponse, T> scanResponseMapperFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withAwsClientsProvider(AwsClientsProvider 
awsClientsProvider) {
+      return toBuilder().setAwsClientsProvider(awsClientsProvider).build();
+    }
+
+    public Read<T> withAwsClientsProvider(
+        String awsAccessKey, String awsSecretKey, String region, String 
serviceEndpoint) {
+      return withAwsClientsProvider(
+          new BasicDynamoDbProvider(awsAccessKey, awsSecretKey, region, 
serviceEndpoint));
+    }
+
+    public Read<T> withAwsClientsProvider(String awsAccessKey, String 
awsSecretKey, String region) {
+      return withAwsClientsProvider(awsAccessKey, awsSecretKey, region, null);
+    }
+
+    /**
+     * Can't pass ScanRequest object directly from client since this object is 
not full
+     * serializable.
+     */
+    public Read<T> withScanRequestFn(SerializableFunction<Void, ScanRequest> 
fn) {
+      return toBuilder().setScanRequestFn(fn).build();
+    }
+
+    private Read<T> withSegmentId(Integer segmentId) {
+      checkArgument(segmentId != null, "segmentId can not be null");
+      return toBuilder().setSegmentId(segmentId).build();
+    }
+
+    public Read<T> withScanResponseMapperFn(
+        SerializableFunction<ScanResponse, T> scanResultMapperFn) {
+      checkArgument(scanResultMapperFn != null, "scanResultMapper can not be 
null");
+      return toBuilder().setScanResponseMapperFn(scanResultMapperFn).build();
+    }
+
+    public Read<List<Map<String, AttributeValue>>> items() {
+      return withScanResponseMapperFn(new ItemsMapper())
+          .withCoder(ListCoder.of(MapCoder.of(StringUtf8Coder.of(), 
AttributeValueCoder.of())));
+    }
+
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkArgument((getScanRequestFn() != null), "withScanRequestFn() is 
required");
+      checkArgument((getAwsClientsProvider() != null), 
"withAwsClientsProvider() is required");
+      ScanRequest scanRequest = getScanRequestFn().apply(null);
+      checkArgument(
+          (scanRequest.totalSegments() != null && scanRequest.totalSegments() 
> 0),
+          "TotalSegments is required with withScanRequestFn() and greater 
zero");
+
+      PCollection<Read<T>> splits =
+          (PCollection<Read<T>>)
+              input.apply("Create", Create.of(this)).apply("Split", 
ParDo.of(new SplitFn()));
+      splits.setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}));
+
+      PCollection<T> output =
+          (PCollection<T>)
+              splits
+                  .apply("Reshuffle", Reshuffle.viaRandomKey())
+                  .apply("Read", ParDo.of(new ReadFn()));
+      output.setCoder(getCoder());
+      return output;
+    }
+
+    /** A {@link DoFn} to split {@link Read} elements by segment id. */
+    private static class SplitFn<T> extends DoFn<Read<T>, Read<T>> {
+      @ProcessElement
+      public void processElement(@Element Read<T> spec, 
OutputReceiver<Read<T>> out) {
+        ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
+        for (int i = 0; i < scanRequest.totalSegments(); i++) {
+          out.output(spec.withSegmentId(i));
+        }
+      }
+    }
+
+    /** A {@link DoFn} executing the ScanRequest to read from DynamoDB. */
+    private static class ReadFn<T> extends DoFn<Read<T>, T> {
+      @ProcessElement
+      public void processElement(@Element Read<T> spec, OutputReceiver<T> out) 
{
+        DynamoDbClient client = spec.getAwsClientsProvider().createDynamoDB();
+
+        ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
+        ScanRequest scanRequestWithSegment =
+            scanRequest.toBuilder().segment(spec.getSegmentId()).build();
+
+        ScanResponse scanResponse = client.scan(scanRequestWithSegment);
+        out.output(spec.getScanResponseMapperFn().apply(scanResponse));
+      }
+    }
+
+    static final class ItemsMapper<T>
+        implements SerializableFunction<ScanResponse, List<Map<String, 
AttributeValue>>> {
+      @Override
+      public List<Map<String, AttributeValue>> apply(@Nullable ScanResponse 
scanResponse) {
+        if (scanResponse == null) {
+          return Collections.emptyList();
+        }
+        return scanResponse.items();
+      }
+    }
+  }
+
+  /**
+   * A POJO encapsulating a configuration for retry behavior when issuing 
requests to dynamodb. A
+   * retry will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes
+   * first, for any of the following exceptions:
+   *
+   * <ul>
+   *   <li>{@link IOException}
+   * </ul>
+   */
+  @AutoValue
+  public abstract static class RetryConfiguration implements Serializable {
+    @VisibleForTesting
+    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new 
DefaultRetryPredicate();
+
+    abstract int getMaxAttempts();
+
+    abstract Duration getMaxDuration();
+
+    abstract RetryPredicate getRetryPredicate();
+
+    abstract Builder builder();
+
+    public static RetryConfiguration create(int maxAttempts, Duration 
maxDuration) {
 
 Review comment:
   @iemejia , not sure what to remove here? Do you mean to remove the whole 
method `create` ? or remove `CheckArgument` ?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 272209)
    Time Spent: 3h 10m  (was: 3h)

> Migrate DynamoDBIO to AWS SDK for Java 2
> ----------------------------------------
>
>                 Key: BEAM-7557
>                 URL: https://issues.apache.org/jira/browse/BEAM-7557
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-java-aws
>            Reporter: Ismaël Mejía
>            Assignee: Cam Mach
>            Priority: Minor
>              Labels: backward-incompatible
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> DynamoDBIO uses the old style API ScanRequestAPI, translating into the 
> ScanSpec
>  API can bring us improvements because we can eventually align Scans and 
> Querys
>  in the DynamoDBIO API. Also this is needed for migration to AWS SDK for Java 
> 2.x.
>  We should use for this the new ScanSpec API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to