This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fdf691  [BEAM-7043] Add DynamoDBIO
     new 33d7cd0  Merge pull request #8390: [BEAM-7043] Add DynamoDBIO
3fdf691 is described below

commit 3fdf691763201a48deab21a9279c27375b63559e
Author: Cam Mach <cm...@godaddy.com>
AuthorDate: Mon Apr 15 16:19:18 2019 -0700

    [BEAM-7043] Add DynamoDBIO
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 sdks/java/io/amazon-web-services/build.gradle      |   2 +
 .../sdk/io/aws/dynamodb/AttributeValueCoder.java   | 166 +++++++
 .../AttributeValueCoderProviderRegistrar.java      |  37 ++
 .../sdk/io/aws/dynamodb/AwsClientsProvider.java    |  34 ++
 .../sdk/io/aws/dynamodb/BasicDynamoDBProvider.java |  75 +++
 .../beam/sdk/io/aws/dynamodb/DynamoDBIO.java       | 536 +++++++++++++++++++++
 .../beam/sdk/io/aws/dynamodb/package-info.java     |  19 +
 .../io/aws/dynamodb/AttributeValueCoderTest.java   | 211 ++++++++
 .../io/aws/dynamodb/AwsClientsProviderMock.java    |  46 ++
 .../beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java   | 213 ++++++++
 .../sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java  | 168 +++++++
 12 files changed, 1508 insertions(+), 1 deletion(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 57c1ccf..6bc456f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -424,6 +424,7 @@ class BeamModulePlugin implements Plugin<Project> {
         avro_tests                                  : 
"org.apache.avro:avro:1.8.2:tests",
         aws_java_sdk_cloudwatch                     : 
"com.amazonaws:aws-java-sdk-cloudwatch:$aws_java_sdk_version",
         aws_java_sdk_core                           : 
"com.amazonaws:aws-java-sdk-core:$aws_java_sdk_version",
+        aws_java_sdk_dynamodb                       : 
"com.amazonaws:aws-java-sdk-dynamodb:$aws_java_sdk_version",
         aws_java_sdk_kinesis                        : 
"com.amazonaws:aws-java-sdk-kinesis:$aws_java_sdk_version",
         aws_java_sdk_s3                             : 
"com.amazonaws:aws-java-sdk-s3:$aws_java_sdk_version",
         aws_java_sdk_sns                            : 
"com.amazonaws:aws-java-sdk-sns:$aws_java_sdk_version",
@@ -565,7 +566,6 @@ class BeamModulePlugin implements Plugin<Project> {
         url(project.properties['distMgmtSnapshotsUrl'] ?: isRelease(project)
                 ? 
'https://repository.apache.org/service/local/staging/deploy/maven2'
                 : 
'https://repository.apache.org/content/repositories/snapshots')
-
         // We attempt to find and load credentials from ~/.m2/settings.xml 
file that a user
         // has configured with the Apache release and snapshot staging 
credentials.
         // <settings>
diff --git a/sdks/java/io/amazon-web-services/build.gradle 
b/sdks/java/io/amazon-web-services/build.gradle
index 560e4c5..0bf33dc 100644
--- a/sdks/java/io/amazon-web-services/build.gradle
+++ b/sdks/java/io/amazon-web-services/build.gradle
@@ -29,6 +29,7 @@ dependencies {
   compile project(path: ":sdks:java:core", configuration: "shadow")
   compile library.java.aws_java_sdk_cloudwatch
   compile library.java.aws_java_sdk_core
+  compile library.java.aws_java_sdk_dynamodb
   compile library.java.aws_java_sdk_s3
   compile library.java.aws_java_sdk_sns
   compile library.java.aws_java_sdk_sqs
@@ -47,6 +48,7 @@ dependencies {
   testCompile library.java.mockito_core
   testCompile library.java.junit
   testCompile 'org.elasticmq:elasticmq-rest-sqs_2.12:0.14.1'
+  testCompile 'org.testcontainers:localstack:1.11.2'
   testRuntimeOnly library.java.slf4j_jdk14
   testRuntimeOnly project(":runners:direct-java")
 }
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoder.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoder.java
new file mode 100644
index 0000000..4bdf8b5
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoder.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/** A {@link Coder} that serializes and deserializes the {@link 
AttributeValue} objects. */
+public class AttributeValueCoder extends AtomicCoder<AttributeValue> {
+
+  /** Data type of each value type in AttributeValue object. */
+  private enum AttributeValueType {
+    s, // for String
+    n, // for Number
+    b, // for Byte
+    sS, // for List of String
+    nS, // for List of Number
+    bS, // for List of Byte
+    m, // for Map of String and AttributeValue
+    l, // for list of AttributeValue
+    bOOL, // for Boolean
+    nULLValue, // for null
+  }
+
+  private static final AttributeValueCoder INSTANCE = new 
AttributeValueCoder();
+
+  private static final ListCoder<String> LIST_STRING_CODER = 
ListCoder.of(StringUtf8Coder.of());
+  private static final ListCoder<byte[]> LIST_BYTE_CODER = 
ListCoder.of(ByteArrayCoder.of());
+
+  private static final ListCoder<AttributeValue> LIST_ATTRIBUTE_CODER =
+      ListCoder.of(AttributeValueCoder.of());
+  private static final MapCoder<String, AttributeValue> MAP_ATTRIBUTE_CODER =
+      MapCoder.of(StringUtf8Coder.of(), AttributeValueCoder.of());
+
+  private AttributeValueCoder() {}
+
+  public static AttributeValueCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(AttributeValue value, OutputStream outStream) throws 
IOException {
+
+    if (value.getS() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.s.toString(), outStream);
+      StringUtf8Coder.of().encode(value.getS(), outStream);
+    } else if (value.getN() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.n.toString(), outStream);
+      StringUtf8Coder.of().encode(value.getN(), outStream);
+    } else if (value.getBOOL() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.bOOL.toString(), 
outStream);
+      BooleanCoder.of().encode(value.getBOOL(), outStream);
+    } else if (value.getB() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.b.toString(), outStream);
+      ByteArrayCoder.of().encode(convertToByteArray(value.getB()), outStream);
+    } else if (value.getSS() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.sS.toString(), outStream);
+      LIST_STRING_CODER.encode(value.getSS(), outStream);
+    } else if (value.getNS() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.nS.toString(), outStream);
+      LIST_STRING_CODER.encode(value.getNS(), outStream);
+    } else if (value.getBS() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.bS.toString(), outStream);
+      LIST_BYTE_CODER.encode(convertToListByteArray(value.getBS()), outStream);
+    } else if (value.getL() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.l.toString(), outStream);
+      LIST_ATTRIBUTE_CODER.encode(value.getL(), outStream);
+    } else if (value.getM() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.m.toString(), outStream);
+      MAP_ATTRIBUTE_CODER.encode(value.getM(), outStream);
+    } else if (value.getNULL() != null) {
+      StringUtf8Coder.of().encode(AttributeValueType.nULLValue.toString(), 
outStream);
+      BooleanCoder.of().encode(value.getNULL(), outStream);
+    } else {
+      throw new CoderException("Unknown Type");
+    }
+  }
+
+  @Override
+  public AttributeValue decode(InputStream inStream) throws IOException {
+    AttributeValue attrValue = new AttributeValue();
+
+    String type = StringUtf8Coder.of().decode(inStream);
+    AttributeValueType attrType = AttributeValueType.valueOf(type);
+
+    switch (attrType) {
+      case s:
+        attrValue.setS(StringUtf8Coder.of().decode(inStream));
+        break;
+      case n:
+        attrValue.setN(StringUtf8Coder.of().decode(inStream));
+        break;
+      case bOOL:
+        attrValue.setBOOL(BooleanCoder.of().decode(inStream));
+        break;
+      case b:
+        attrValue.setB(ByteBuffer.wrap(ByteArrayCoder.of().decode(inStream)));
+        break;
+      case sS:
+        attrValue.setSS(LIST_STRING_CODER.decode(inStream));
+        break;
+      case nS:
+        attrValue.setNS(LIST_STRING_CODER.decode(inStream));
+        break;
+      case bS:
+        
attrValue.setBS(convertToListByteBuffer(LIST_BYTE_CODER.decode(inStream)));
+        break;
+      case l:
+        attrValue.setL(LIST_ATTRIBUTE_CODER.decode(inStream));
+        break;
+      case m:
+        attrValue.setM(MAP_ATTRIBUTE_CODER.decode(inStream));
+        break;
+      case nULLValue:
+        attrValue.setNULL(BooleanCoder.of().decode(inStream));
+        break;
+      default:
+        throw new CoderException("Unknown Type");
+    }
+
+    return attrValue;
+  }
+
+  private List<byte[]> convertToListByteArray(List<ByteBuffer> listByteBuffer) 
{
+    return 
listByteBuffer.stream().map(this::convertToByteArray).collect(Collectors.toList());
+  }
+
+  private byte[] convertToByteArray(ByteBuffer buffer) {
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.get(bytes);
+    buffer.position(buffer.position() - bytes.length);
+    return bytes;
+  }
+
+  private List<ByteBuffer> convertToListByteBuffer(List<byte[]> listByteArr) {
+    return 
listByteArr.stream().map(ByteBuffer::wrap).collect(Collectors.toList());
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderProviderRegistrar.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderProviderRegistrar.java
new file mode 100644
index 0000000..fc4c909
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderProviderRegistrar.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.google.auto.service.AutoService;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+
+/** A {@link CoderProviderRegistrar} for standard types used with {@link 
DynamoDBIO}. */
+@AutoService(CoderProviderRegistrar.class)
+public class AttributeValueCoderProviderRegistrar implements 
CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(AttributeValue.class), 
AttributeValueCoder.of()));
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProvider.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProvider.java
new file mode 100644
index 0000000..8d1c267
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import java.io.Serializable;
+
+/**
+ * Provides instances of AWS clients.
+ *
+ * <p>Please note, that any instance of {@link AwsClientsProvider} must be 
{@link Serializable} to
+ * ensure it can be sent to worker machines.
+ */
+public interface AwsClientsProvider extends Serializable {
+  AmazonCloudWatch getCloudWatchClient();
+
+  AmazonDynamoDB createDynamoDB();
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/BasicDynamoDBProvider.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/BasicDynamoDBProvider.java
new file mode 100644
index 0000000..bcb2508
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/BasicDynamoDBProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import javax.annotation.Nullable;
+
+/** Basic implementation of {@link AwsClientsProvider} used by default in 
{@link DynamoDBIO}. */
+public class BasicDynamoDBProvider implements AwsClientsProvider {
+  private final String accessKey;
+  private final String secretKey;
+  private final Regions region;
+  @Nullable private final String serviceEndpoint;
+
+  BasicDynamoDBProvider(
+      String accessKey, String secretKey, Regions region, @Nullable String 
serviceEndpoint) {
+    checkArgument(accessKey != null, "accessKey can not be null");
+    checkArgument(secretKey != null, "secretKey can not be null");
+    checkArgument(region != null, "region can not be null");
+    this.accessKey = accessKey;
+    this.secretKey = secretKey;
+    this.region = region;
+    this.serviceEndpoint = serviceEndpoint;
+  }
+
+  private AWSCredentialsProvider getCredentialsProvider() {
+    return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, 
secretKey));
+  }
+
+  @Override
+  public AmazonCloudWatch getCloudWatchClient() {
+    AmazonCloudWatchClientBuilder clientBuilder =
+        
AmazonCloudWatchClientBuilder.standard().withCredentials(getCredentialsProvider());
+    if (serviceEndpoint == null) {
+      clientBuilder.withRegion(region);
+    } else {
+      clientBuilder.withEndpointConfiguration(
+          new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, 
region.getName()));
+    }
+    return clientBuilder.build();
+  }
+
+  @Override
+  public AmazonDynamoDB createDynamoDB() {
+    return AmazonDynamoDBClientBuilder.standard()
+        .withCredentials(getCredentialsProvider())
+        .withRegion(region)
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
new file mode 100644
index 0000000..cc28137
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanResult;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.http.HttpStatus;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s to read/write from/to <a
+ * href="https://aws.amazon.com/dynamodb/";>DynamoDB</a>.
+ *
+ * <h3>Writing to DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<T> data = ...;
+ * data.apply(
+ *           DynamoDBIO.<WriteRequest>write()
+ *               .withWriteRequestMapperFn(
+ *                   (SerializableFunction<T, KV<String, WriteRequest>>)
+ *                       //Transforming your T data into KV<String, 
WriteRequest>
+ *                       t -> KV.of(tableName, writeRequest))
+ *               .withRetryConfiguration(
+ *                    DynamoDBIO.RetryConfiguration.create(5, 
Duration.standardMinutes(1)))
+ *               .withAwsClientsProvider(new BasicSnsProvider(accessKey, 
secretKey, region));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>Retry configuration
+ *   <li>Specify AwsClientsProvider. You can pass on the default one 
BasicSnsProvider
+ *   <li>Mapper function with a table name to map or transform your object 
into KV<tableName,
+ *       writeRequest>
+ * </ul>
+ *
+ * <h3>Reading from DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<List<Map<String, AttributeValue>>> output =
+ *     pipeline.apply(
+ *             DynamoDBIO.<List<Map<String, AttributeValue>>>read()
+ *                 .withAwsClientsProvider(new BasicSnsProvider(accessKey, 
secretKey, region))
+ *                 .withScanRequestFn(
+ *                     (SerializableFunction<Void, ScanRequest>)
+ *                         input -> new 
ScanRequest(tableName).withTotalSegments(1))
+ *                 .items());
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>Specify AwsClientsProvider. You can pass on the default one 
BasicSnsProvider
+ *   <li>ScanRequestFn, which you build a ScanRequest object with at least 
table name and total
+ *       number of segment. Note This number should base on the number of your 
workers
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class DynamoDBIO {
+  public static <T> Read<T> read() {
+    return new AutoValue_DynamoDBIO_Read.Builder().build();
+  }
+
+  public static <T> Write<T> write() {
+    return new AutoValue_DynamoDBIO_Write.Builder().build();
+  }
+
+  /** Read data from DynamoDB and return ScanResult. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+    @Nullable
+    abstract AwsClientsProvider getAwsClientsProvider();
+
+    @Nullable
+    abstract SerializableFunction<Void, ScanRequest> getScanRequestFn();
+
+    @Nullable
+    abstract Integer getSegmentId();
+
+    @Nullable
+    abstract SerializableFunction<ScanResult, T> getScanResultMapperFn();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setAwsClientsProvider(AwsClientsProvider 
awsClientsProvider);
+
+      abstract Builder<T> setScanRequestFn(SerializableFunction<Void, 
ScanRequest> fn);
+
+      abstract Builder<T> setSegmentId(Integer segmentId);
+
+      abstract Builder<T> setScanResultMapperFn(
+          SerializableFunction<ScanResult, T> scanResultMapperFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withAwsClientsProvider(AwsClientsProvider 
awsClientsProvider) {
+      return toBuilder().setAwsClientsProvider(awsClientsProvider).build();
+    }
+
+    public Read<T> withAwsClientsProvider(
+        String awsAccessKey, String awsSecretKey, Regions region, String 
serviceEndpoint) {
+      return withAwsClientsProvider(
+          new BasicDynamoDBProvider(awsAccessKey, awsSecretKey, region, 
serviceEndpoint));
+    }
+
+    public Read<T> withAwsClientsProvider(
+        String awsAccessKey, String awsSecretKey, Regions region) {
+      return withAwsClientsProvider(awsAccessKey, awsSecretKey, region, null);
+    }
+
+    /**
+     * Can't pass ScanRequest object directly from client since this object is 
not full
+     * serializable.
+     */
+    public Read<T> withScanRequestFn(SerializableFunction<Void, ScanRequest> 
fn) {
+      return toBuilder().setScanRequestFn(fn).build();
+    }
+
+    private Read<T> withSegmentId(Integer segmentId) {
+      checkArgument(segmentId != null, "segmentId can not be null");
+      return toBuilder().setSegmentId(segmentId).build();
+    }
+
+    public Read<T> withScanResultMapperFn(SerializableFunction<ScanResult, T> 
scanResultMapperFn) {
+      checkArgument(scanResultMapperFn != null, "scanResultMapper can not be 
null");
+      return toBuilder().setScanResultMapperFn(scanResultMapperFn).build();
+    }
+
+    public Read<List<Map<String, AttributeValue>>> items() {
+      return withScanResultMapperFn(new DynamoDBIO.Read.ItemsMapper())
+          .withCoder(ListCoder.of(MapCoder.of(StringUtf8Coder.of(), 
AttributeValueCoder.of())));
+    }
+
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkArgument((getScanRequestFn() != null), "withScanRequestFn() is 
required");
+      checkArgument((getAwsClientsProvider() != null), 
"withAwsClientsProvider() is required");
+      ScanRequest scanRequest = getScanRequestFn().apply(null);
+      checkArgument(
+          (scanRequest.getTotalSegments() != null && 
scanRequest.getTotalSegments() > 0),
+          "TotalSegments is required with withScanRequestFn() and greater 
zero");
+
+      PCollection<Read<T>> splits =
+          (PCollection<Read<T>>)
+              input.apply("Create", Create.of(this)).apply("Split", 
ParDo.of(new SplitFn()));
+      splits.setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}));
+
+      PCollection<T> output =
+          (PCollection<T>)
+              splits
+                  .apply("Reshuffle", Reshuffle.viaRandomKey())
+                  .apply("Read", ParDo.of(new ReadFn()));
+      output.setCoder(getCoder());
+      return output;
+    }
+
+    /** A {@link DoFn} to split {@link Read} elements by segment id. */
+    private static class SplitFn<T> extends DoFn<Read<T>, Read<T>> {
+      @ProcessElement
+      public void processElement(@Element Read<T> spec, 
OutputReceiver<Read<T>> out) {
+        ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
+        for (int i = 0; i < scanRequest.getTotalSegments(); i++) {
+          out.output(spec.withSegmentId(i));
+        }
+      }
+    }
+
+    /** A {@link DoFn} executing the ScanRequest to read from DynamoDB. */
+    private static class ReadFn<T> extends DoFn<Read<T>, T> {
+      @ProcessElement
+      public void processElement(@Element Read<T> spec, OutputReceiver<T> out) 
{
+        AmazonDynamoDB client = spec.getAwsClientsProvider().createDynamoDB();
+        ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
+        scanRequest.setSegment(spec.getSegmentId());
+        ScanResult scanResult = client.scan(scanRequest);
+        out.output(spec.getScanResultMapperFn().apply(scanResult));
+      }
+    }
+
+    static final class ItemsMapper<T>
+        implements SerializableFunction<ScanResult, List<Map<String, 
AttributeValue>>> {
+      @Override
+      public List<Map<String, AttributeValue>> apply(@Nullable ScanResult 
scanResult) {
+        if (scanResult == null) {
+          return Collections.emptyList();
+        }
+        return scanResult.getItems();
+      }
+    }
+  }
+
+  /**
+   * A POJO encapsulating a configuration for retry behavior when issuing 
requests to dynamodb. A
+   * retry will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes
+   * first, for any of the following exceptions:
+   *
+   * <ul>
+   *   <li>{@link IOException}
+   * </ul>
+   */
+  @AutoValue
+  public abstract static class RetryConfiguration implements Serializable {
+    @VisibleForTesting
+    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new 
DefaultRetryPredicate();
+
+    abstract int getMaxAttempts();
+
+    abstract Duration getMaxDuration();
+
+    abstract DynamoDBIO.RetryConfiguration.RetryPredicate getRetryPredicate();
+
+    abstract DynamoDBIO.RetryConfiguration.Builder builder();
+
+    public static DynamoDBIO.RetryConfiguration create(int maxAttempts, 
Duration maxDuration) {
+      checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0");
+      checkArgument(
+          maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+          "maxDuration should be greater than 0");
+      return new AutoValue_DynamoDBIO_RetryConfiguration.Builder()
+          .setMaxAttempts(maxAttempts)
+          .setMaxDuration(maxDuration)
+          .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
+          .build();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract DynamoDBIO.RetryConfiguration.Builder setMaxAttempts(int 
maxAttempts);
+
+      abstract DynamoDBIO.RetryConfiguration.Builder setMaxDuration(Duration 
maxDuration);
+
+      abstract DynamoDBIO.RetryConfiguration.Builder setRetryPredicate(
+          RetryPredicate retryPredicate);
+
+      abstract DynamoDBIO.RetryConfiguration build();
+    }
+
+    /**
+     * An interface used to control if we retry the BatchWriteItemRequest call 
when a {@link
+     * Throwable} occurs. If {@link RetryPredicate#test(Object)} returns true, 
{@link Write} tries
+     * to resend the requests to the dynamodb server if the {@link 
RetryConfiguration} permits it.
+     */
+    @FunctionalInterface
+    interface RetryPredicate extends Predicate<Throwable>, Serializable {}
+
+    private static class DefaultRetryPredicate implements RetryPredicate {
+      private static final ImmutableSet<Integer> ELIGIBLE_CODES =
+          ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE);
+
+      @Override
+      public boolean test(Throwable throwable) {
+        return (throwable instanceof IOException
+            || (throwable instanceof AmazonDynamoDBException)
+            || (throwable instanceof AmazonDynamoDBException
+                && ELIGIBLE_CODES.contains(((AmazonDynamoDBException) 
throwable).getStatusCode())));
+      }
+    }
+  }
+
+  /** Write a PCollection<T> data into Dynamodb. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PCollection<Void>> {
+
+    @Nullable
+    abstract AwsClientsProvider getAwsClientsProvider();
+
+    @Nullable
+    abstract RetryConfiguration getRetryConfiguration();
+
+    @Nullable
+    abstract SerializableFunction<T, KV<String, WriteRequest>> 
getWriteItemMapperFn();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setAwsClientsProvider(AwsClientsProvider 
awsClientsProvider);
+
+      abstract Builder<T> setRetryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Builder<T> setWriteItemMapperFn(
+          SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn);
+
+      abstract Write<T> build();
+    }
+
+    public Write<T> withAwsClientsProvider(AwsClientsProvider 
awsClientsProvider) {
+      return builder().setAwsClientsProvider(awsClientsProvider).build();
+    }
+
+    public Write<T> withAwsClientsProvider(
+        String awsAccessKey, String awsSecretKey, Regions region, String 
serviceEndpoint) {
+      return withAwsClientsProvider(
+          new BasicDynamoDBProvider(awsAccessKey, awsSecretKey, region, 
serviceEndpoint));
+    }
+
+    public Write<T> withAwsClientsProvider(
+        String awsAccessKey, String awsSecretKey, Regions region) {
+      return withAwsClientsProvider(awsAccessKey, awsSecretKey, region, null);
+    }
+
+    /**
+     * Provides configuration to retry a failed request to publish a set of 
records to DynamoDB.
+     * Users should consider that retrying might compound the underlying 
problem which caused the
+     * initial failure. Users should also be aware that once retrying is 
exhausted the error is
+     * surfaced to the runner which <em>may</em> then opt to retry the current 
partition in entirety
+     * or abort if the max number of retries of the runner is completed. 
Retrying uses an
+     * exponential backoff algorithm, with minimum backoff of 5 seconds and 
then surfacing the error
+     * once the maximum number of retries or maximum configuration duration is 
exceeded.
+     *
+     * <p>Example use:
+     *
+     * <pre>{@code
+     * DynamoDBIO.write()
+     *   .withRetryConfiguration(DynamoDBIO.RetryConfiguration.create(5, 
Duration.standardMinutes(1))
+     *   ...
+     * }</pre>
+     *
+     * @param retryConfiguration the rules which govern the retry behavior
+     * @return the {@link DynamoDBIO.Write} with retrying configured
+     */
+    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+      checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
+      return builder().setRetryConfiguration(retryConfiguration).build();
+    }
+
+    public Write<T> withWriteRequestMapperFn(
+        SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn) {
+      return builder().setWriteItemMapperFn(writeItemMapperFn).build();
+    }
+
+    @Override
+    public PCollection<Void> expand(PCollection<T> input) {
+      return input.apply(ParDo.of(new WriteFn<>(this)));
+    }
+
+    static class WriteFn<T> extends DoFn<T, Void> {
+      @VisibleForTesting
+      static final String RETRY_ATTEMPT_LOG = "Error writing to DynamoDB. 
Retry attempt[%d]";
+
+      private static final Duration RETRY_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
+      private transient FluentBackoff retryBackoff; // defaults to no retries
+      private static final Logger LOG = LoggerFactory.getLogger(WriteFn.class);
+      private static final Counter DYNAMO_DB_WRITE_FAILURES =
+          Metrics.counter(WriteFn.class, "DynamoDB_Write_Failures");
+
+      private static final int BATCH_SIZE = 25;
+      private transient AmazonDynamoDB client;
+      private final DynamoDBIO.Write spec;
+      private List<KV<String, WriteRequest>> batch;
+
+      WriteFn(DynamoDBIO.Write spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        client = spec.getAwsClientsProvider().createDynamoDB();
+        retryBackoff =
+            FluentBackoff.DEFAULT
+                .withMaxRetries(0) // default to no retrying
+                .withInitialBackoff(RETRY_INITIAL_BACKOFF);
+        if (spec.getRetryConfiguration() != null) {
+          retryBackoff =
+              retryBackoff
+                  
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
+                  
.withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());
+        }
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext context) {
+        batch = new ArrayList<>();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        final KV<String, WriteRequest> writeRequest =
+            (KV<String, WriteRequest>) 
spec.getWriteItemMapperFn().apply(context.element());
+        batch.add(writeRequest);
+        if (batch.size() >= BATCH_SIZE) {
+          flushBatch();
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle(FinishBundleContext context) throws Exception {
+        flushBatch();
+      }
+
+      private void flushBatch() throws IOException, InterruptedException {
+        if (batch.isEmpty()) {
+          return;
+        }
+
+        try {
+          // Since each element is a KV<tableName, writeRequest> in the batch, 
we need to group them
+          // by tableName
+          Map<String, List<WriteRequest>> mapTableRequest =
+              batch.stream()
+                  .collect(
+                      Collectors.groupingBy(
+                          KV::getKey, Collectors.mapping(KV::getValue, 
Collectors.toList())));
+
+          BatchWriteItemRequest batchRequest = new BatchWriteItemRequest();
+          mapTableRequest
+              .entrySet()
+              .forEach(
+                  entry -> batchRequest.addRequestItemsEntry(entry.getKey(), 
entry.getValue()));
+
+          Sleeper sleeper = Sleeper.DEFAULT;
+          BackOff backoff = retryBackoff.backoff();
+          int attempt = 0;
+          while (true) {
+            attempt++;
+            try {
+              client.batchWriteItem(batchRequest);
+              break;
+            } catch (Exception ex) {
+              // Fail right away if there is no retry configuration
+              if (spec.getRetryConfiguration() == null
+                  || 
!spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
+                DYNAMO_DB_WRITE_FAILURES.inc();
+                LOG.info(
+                    "Unable to write batch items {} due to {} ",
+                    batchRequest.getRequestItems().entrySet(),
+                    ex);
+                throw new IOException("Error writing to DyanmoDB (no attempt 
made to retry)", ex);
+              }
+
+              if (!BackOffUtils.next(sleeper, backoff)) {
+                throw new IOException(
+                    String.format(
+                        "Error writing to DyanmoDB after %d attempt(s). No 
more attempts allowed",
+                        attempt),
+                    ex);
+              } else {
+                // Note: this used in test cases to verify behavior
+                LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
+              }
+            }
+          }
+        } finally {
+          batch.clear();
+        }
+      }
+
+      @Teardown
+      public void tearDown() {
+        if (client != null) {
+          client.shutdown();
+          client = null;
+        }
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/package-info.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/package-info.java
new file mode 100644
index 0000000..0a7ea55
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Defines IO connectors for Amazon Web Services DynamoDB. */
+package org.apache.beam.sdk.io.aws.dynamodb;
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderTest.java
new file mode 100644
index 0000000..86fbbe3
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Unit test cases for each type of AttributeValue to test encoding and 
decoding. */
+public class AttributeValueCoderTest {
+
+  @Test
+  public void shouldPassForStringType() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setS("testing");
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForNumberType() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setN("123");
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForBooleanType() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setBOOL(false);
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForByteArray() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setB(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)));
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForListOfString() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setSS(ImmutableList.of("foo", "bar"));
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForOneListOfNumber() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setNS(ImmutableList.of("123", "456"));
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForOneListOfByteArray() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setBS(
+        ImmutableList.of(
+            ByteBuffer.wrap("mylistbyte1".getBytes(StandardCharsets.UTF_8)),
+            ByteBuffer.wrap("mylistbyte2".getBytes(StandardCharsets.UTF_8))));
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForListType() throws IOException {
+    AttributeValue expected = new AttributeValue();
+
+    List<AttributeValue> listAttr = new ArrayList<>();
+    listAttr.add(new AttributeValue("innerMapValue1"));
+    listAttr.add(new AttributeValue().withN("8976234"));
+
+    expected.setL(listAttr);
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForMapType() throws IOException {
+    AttributeValue expected = new AttributeValue();
+
+    Map<String, AttributeValue> attrMap = new HashMap<>();
+    attrMap.put("innerMapAttr1", new AttributeValue("innerMapValue1"));
+    attrMap.put(
+        "innerMapAttr2",
+        new 
AttributeValue().withB(ByteBuffer.wrap("8976234".getBytes(StandardCharsets.UTF_8))));
+
+    expected.setM(attrMap);
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldPassForNullType() throws IOException {
+    AttributeValue expected = new AttributeValue();
+    expected.setNULL(true);
+
+    AttributeValueCoder coder = AttributeValueCoder.of();
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+
+    AttributeValue actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProviderMock.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProviderMock.java
new file mode 100644
index 0000000..dfcf302
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProviderMock.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import org.mockito.Mockito;
+
+/** Mocking AwsClientProvider. */
+public class AwsClientsProviderMock implements AwsClientsProvider {
+
+  private static AwsClientsProviderMock instance = new 
AwsClientsProviderMock();
+  private static AmazonDynamoDB db;
+
+  private AwsClientsProviderMock() {}
+
+  public static AwsClientsProviderMock of(AmazonDynamoDB dynamoDB) {
+    db = dynamoDB;
+    return instance;
+  }
+
+  @Override
+  public AmazonCloudWatch getCloudWatchClient() {
+    return Mockito.mock(AmazonCloudWatch.class);
+  }
+
+  @Override
+  public AmazonDynamoDB createDynamoDB() {
+    return db;
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java
new file mode 100644
index 0000000..3bd27ed
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+/** Test Coverage for the IO. */
+public class DynamoDBIOTest implements Serializable {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public final transient ExpectedLogs expectedLogs = 
ExpectedLogs.none(DynamoDBIO.class);
+
+  private static final String tableName = "TaskA";
+  private static final int numOfItems = 10;
+
+  private static List<Map<String, AttributeValue>> expected;
+
+  @BeforeClass
+  public static void setup() {
+    DynamoDBIOTestHelper.startServerClient();
+    DynamoDBIOTestHelper.createTestTable(tableName);
+    expected = DynamoDBIOTestHelper.generateTestData(tableName, numOfItems);
+  }
+
+  @AfterClass
+  public static void destroy() {
+    DynamoDBIOTestHelper.stopServerClient(tableName);
+  }
+
+  // Test cases for Reader.
+  @Test
+  public void testReadScanResult() {
+    PCollection<List<Map<String, AttributeValue>>> actual =
+        pipeline.apply(
+            DynamoDBIO.<List<Map<String, AttributeValue>>>read()
+                .withAwsClientsProvider(
+                    
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))
+                .withScanRequestFn(
+                    (SerializableFunction<Void, ScanRequest>)
+                        input -> new 
ScanRequest(tableName).withTotalSegments(1))
+                .items());
+    PAssert.that(actual).containsInAnyOrder(expected);
+    pipeline.run().waitUntilFinish();
+  }
+
+  // Test cases for Reader's arguments.
+  @Test
+  public void testMissingScanRequestFn() {
+    thrown.expectMessage("withScanRequestFn() is required");
+    pipeline.apply(
+        DynamoDBIO.read()
+            .withAwsClientsProvider(
+                
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
+    try {
+      pipeline.run().waitUntilFinish();
+      fail("withScanRequestFn() is required");
+    } catch (IllegalArgumentException ex) {
+      assertEquals("withScanRequestFn() is required", ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testMissingAwsClientsProvider() {
+    thrown.expectMessage("withAwsClientsProvider() is required");
+    pipeline.apply(
+        DynamoDBIO.read()
+            .withScanRequestFn(
+                (SerializableFunction<Void, ScanRequest>)
+                    input -> new ScanRequest(tableName).withTotalSegments(3)));
+    try {
+      pipeline.run().waitUntilFinish();
+      fail("withAwsClientsProvider() is required");
+    } catch (IllegalArgumentException ex) {
+      assertEquals("withAwsClientsProvider() is required", ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testMissingTotalSegments() {
+    thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
+    pipeline.apply(
+        DynamoDBIO.read()
+            .withScanRequestFn(
+                (SerializableFunction<Void, ScanRequest>) input -> new 
ScanRequest(tableName))
+            .withAwsClientsProvider(
+                
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
+    try {
+      pipeline.run().waitUntilFinish();
+      fail("TotalSegments is required with withScanRequestFn()");
+    } catch (IllegalArgumentException ex) {
+      assertEquals("TotalSegments is required with withScanRequestFn()", 
ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testNegativeTotalSegments() {
+    thrown.expectMessage("TotalSegments is required with withScanRequestFn() 
and greater zero");
+    pipeline.apply(
+        DynamoDBIO.read()
+            .withScanRequestFn(
+                (SerializableFunction<Void, ScanRequest>)
+                    input -> new ScanRequest(tableName).withTotalSegments(-1))
+            .withAwsClientsProvider(
+                
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
+    try {
+      pipeline.run().waitUntilFinish();
+      fail("withTotalSegments() is expected and greater than zero");
+    } catch (IllegalArgumentException ex) {
+      assertEquals(
+          "TotalSegments is required with withScanRequestFn() and greater 
zero", ex.getMessage());
+    }
+  }
+
+  // Test cases for Writer.
+  @Test
+  public void testWriteDataToDynamo() {
+    final List<WriteRequest> writeRequests = 
DynamoDBIOTestHelper.generateWriteRequests(numOfItems);
+
+    final PCollection<Void> output =
+        pipeline
+            .apply(Create.of(writeRequests))
+            .apply(
+                DynamoDBIO.<WriteRequest>write()
+                    .withWriteRequestMapperFn(
+                        (SerializableFunction<WriteRequest, KV<String, 
WriteRequest>>)
+                            writeRequest -> KV.of(tableName, writeRequest))
+                    .withRetryConfiguration(
+                        DynamoDBIO.RetryConfiguration.create(5, 
Duration.standardMinutes(1)))
+                    .withAwsClientsProvider(
+                        
AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
+
+    final PCollection<Long> publishedResultsSize = 
output.apply(Count.globally());
+    PAssert.that(publishedResultsSize).containsInAnyOrder(0L);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testRetries() throws Throwable {
+    thrown.expectMessage("Error writing to DynamoDB");
+
+    final List<WriteRequest> writeRequests = 
DynamoDBIOTestHelper.generateWriteRequests(numOfItems);
+
+    AmazonDynamoDB amazonDynamoDBMock = Mockito.mock(AmazonDynamoDB.class);
+    
Mockito.when(amazonDynamoDBMock.batchWriteItem(Mockito.any(BatchWriteItemRequest.class)))
+        .thenThrow(new AmazonDynamoDBException("Service unavailable"));
+
+    pipeline
+        .apply(Create.of(writeRequests))
+        .apply(
+            DynamoDBIO.<WriteRequest>write()
+                .withWriteRequestMapperFn(
+                    (SerializableFunction<WriteRequest, KV<String, 
WriteRequest>>)
+                        writeRequest -> KV.of(tableName, writeRequest))
+                .withRetryConfiguration(
+                    DynamoDBIO.RetryConfiguration.create(4, 
Duration.standardSeconds(10)))
+                
.withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDBMock)));
+
+    try {
+      pipeline.run().waitUntilFinish();
+    } catch (final Pipeline.PipelineExecutionException e) {
+      // check 3 retries were initiated by inspecting the log before passing 
on the exception
+      
expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
 1));
+      
expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
 2));
+      
expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
 3));
+      throw e.getCause();
+    }
+    fail("Pipeline is expected to fail because we were unable to write to 
DynamoDB.");
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java
new file mode 100644
index 0000000..6043994
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.PutRequest;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanResult;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+
+/** A utility to generate test table and data for {@link DynamoDBIOTest}. */
+class DynamoDBIOTestHelper implements Serializable {
+
+  @Rule
+  private static LocalStackContainer localStackContainer =
+      new 
LocalStackContainer().withServices(LocalStackContainer.Service.DYNAMODB);
+
+  private static AmazonDynamoDB dynamoDBClient;
+
+  static final String ATTR_NAME_1 = "hashKey1";
+  static final String ATTR_NAME_2 = "rangeKey2";
+
+  static void startServerClient() {
+    localStackContainer.start();
+
+    if (dynamoDBClient == null) {
+      dynamoDBClient =
+          AmazonDynamoDBClientBuilder.standard()
+              .withEndpointConfiguration(
+                  localStackContainer.getEndpointConfiguration(
+                      LocalStackContainer.Service.DYNAMODB))
+              
.withCredentials(localStackContainer.getDefaultCredentialsProvider())
+              .build();
+    }
+  }
+
+  static void stopServerClient(String tableName) {
+    if (dynamoDBClient != null) {
+      dynamoDBClient.deleteTable(tableName);
+      dynamoDBClient.shutdown();
+    }
+    localStackContainer.stop();
+  }
+
+  static AmazonDynamoDB getDynamoDBClient() {
+    // Note: each test case got to have their own dynamo client obj, can't be 
shared
+    // Otherwise will run into connection pool issue
+    return AmazonDynamoDBClientBuilder.standard()
+        .withEndpointConfiguration(
+            
localStackContainer.getEndpointConfiguration(LocalStackContainer.Service.DYNAMODB))
+        .withCredentials(localStackContainer.getDefaultCredentialsProvider())
+        .build();
+  }
+
+  static List<Map<String, AttributeValue>> generateTestData(String tableName, 
int numOfItems) {
+    BatchWriteItemRequest batchWriteItemRequest =
+        generateBatchWriteItemRequest(tableName, numOfItems);
+
+    dynamoDBClient.batchWriteItem(batchWriteItemRequest);
+    ScanResult scanResult = dynamoDBClient.scan(new 
ScanRequest().withTableName(tableName));
+
+    List<Map<String, AttributeValue>> items = scanResult.getItems();
+    Assert.assertEquals(numOfItems, items.size());
+    return items;
+  }
+
+  static BatchWriteItemRequest generateBatchWriteItemRequest(String tableName, 
int numOfItems) {
+    BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest();
+    batchWriteItemRequest.addRequestItemsEntry(tableName, 
generateWriteRequests(numOfItems));
+    return batchWriteItemRequest;
+  }
+
+  static List<WriteRequest> generateWriteRequests(int numOfItem) {
+    List<WriteRequest> writeRequests = new ArrayList<>();
+    for (int i = 1; i <= numOfItem; i++) {
+      WriteRequest writeRequest = new WriteRequest();
+      writeRequest.setPutRequest(generatePutRequest("hashKeyDataStr_" + i, 
"1000" + i));
+      writeRequests.add(writeRequest);
+    }
+    return writeRequests;
+  }
+
+  private static PutRequest generatePutRequest(String hashKeyData, String 
rangeKeyData) {
+    PutRequest putRequest = new PutRequest();
+    putRequest.addItemEntry(ATTR_NAME_1, new AttributeValue(hashKeyData));
+    putRequest.addItemEntry(ATTR_NAME_2, new 
AttributeValue().withN(rangeKeyData));
+    return putRequest;
+  }
+
+  static void createTestTable(String tableName) {
+    CreateTableResult res = createDynamoTable(tableName);
+
+    TableDescription tableDesc = res.getTableDescription();
+
+    Assert.assertEquals(tableName, tableDesc.getTableName());
+    
Assert.assertTrue(tableDesc.getKeySchema().toString().contains(ATTR_NAME_1));
+    
Assert.assertTrue(tableDesc.getKeySchema().toString().contains(ATTR_NAME_2));
+
+    Assert.assertEquals(
+        tableDesc.getProvisionedThroughput().getReadCapacityUnits(), 
Long.valueOf(1000));
+    Assert.assertEquals(
+        tableDesc.getProvisionedThroughput().getWriteCapacityUnits(), 
Long.valueOf(1000));
+    Assert.assertEquals("ACTIVE", tableDesc.getTableStatus());
+    Assert.assertEquals(
+        "arn:aws:dynamodb:us-east-1:000000000000:table/" + tableName, 
tableDesc.getTableArn());
+
+    ListTablesResult tables = dynamoDBClient.listTables();
+    Assert.assertEquals(1, tables.getTableNames().size());
+  }
+
+  private static CreateTableResult createDynamoTable(String tableName) {
+
+    ImmutableList<AttributeDefinition> attributeDefinitions =
+        ImmutableList.of(
+            new AttributeDefinition(ATTR_NAME_1, ScalarAttributeType.S),
+            new AttributeDefinition(ATTR_NAME_2, ScalarAttributeType.N));
+
+    ImmutableList<KeySchemaElement> ks =
+        ImmutableList.of(
+            new KeySchemaElement(ATTR_NAME_1, KeyType.HASH),
+            new KeySchemaElement(ATTR_NAME_2, KeyType.RANGE));
+
+    ProvisionedThroughput provisionedthroughput = new 
ProvisionedThroughput(1000L, 1000L);
+    CreateTableRequest request =
+        new CreateTableRequest()
+            .withTableName(tableName)
+            .withAttributeDefinitions(attributeDefinitions)
+            .withKeySchema(ks)
+            .withProvisionedThroughput(provisionedthroughput);
+
+    return dynamoDBClient.createTable(request);
+  }
+}

Reply via email to