steveloughran commented on a change in pull request #1576: Hadoop 16520 
dynamodb ms version race refactor. 
URL: https://github.com/apache/hadoop/pull/1576#discussion_r330657047
 
 

 ##########
 File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java
 ##########
 @@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanResult;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+import com.amazonaws.services.dynamodbv2.model.Tag;
+import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
+import com.amazonaws.waiters.WaiterTimedOutException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.AWSClientIOException;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+import static java.lang.String.valueOf;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG;
+import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.attributeDefinitions;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarkerPrimaryKey;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractCreationTimeFromMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractVersionFromMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema;
+
+/**
+ * Table handling for dynamo tables, factored out from DynamoDBMetadataStore
+ * Mainly
+ */
+public class DynamoDBMetadataStoreTableHandler {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DynamoDBMetadataStoreTableHandler.class);
+
+  /** Error: version marker not found in table but the table is not empty. */
+  public static final String E_NO_VERSION_MARKER_AND_NOT_EMPTY
+      = "S3Guard table lacks version marker, and not empty.";
+
+  /** Error: version mismatch. */
+  public static final String E_INCOMPATIBLE_TAG_VERSION
+      = "Database table is from an incompatible S3Guard version based on table 
TAG.";
+
+  /** Error: version mismatch. */
+  public static final String E_INCOMPATIBLE_ITEM_VERSION
+      = "Database table is from an incompatible S3Guard version based on table 
ITEM.";
+
+  /** Invoker for IO. Until configured properly, use try-once. */
+  private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      Invoker.NO_OP
+  );
+
+  final private AmazonDynamoDB amazonDynamoDB;
+  final private DynamoDB dynamoDB;
+  final private String tableName;
+  final private String region;
+  final private Configuration conf;
+  final private Invoker readOp;
+  final private RetryPolicy batchWriteRetryPolicy;
+
+  private Table table;
+  private String tableArn;
+
+  public DynamoDBMetadataStoreTableHandler(DynamoDB dynamoDB,
+      String tableName,
+      String region,
+      AmazonDynamoDB amazonDynamoDB,
+      Configuration conf,
+      Invoker readOp,
+      RetryPolicy batchWriteCapacityExceededEvents) {
+    this.dynamoDB = dynamoDB;
+    this.amazonDynamoDB = amazonDynamoDB;
+    this.tableName = tableName;
+    this.region = region;
+    this.conf = conf;
+    this.readOp = readOp;
+    this.batchWriteRetryPolicy = batchWriteCapacityExceededEvents;
+  }
+
+  /**
+   * Create a table if it does not exist and wait for it to become active.
+   *
+   * If a table with the intended name already exists, then it uses that table.
+   * Otherwise, it will automatically create the table if the config
+   * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
+   * enabled. The DynamoDB table creation API is asynchronous.  This method 
wait
+   * for the table to become active after sending the creation request, so
+   * overall, this method is synchronous, and the table is guaranteed to exist
+   * after this method returns successfully.
+   *
+   * The wait for a table becoming active is Retry+Translated; it can fail
+   * while a table is not yet ready.
+   *
+   * @throws IOException if table does not exist and auto-creation is disabled;
+   * or table is being deleted, or any other I/O exception occurred.
+   */
+  @VisibleForTesting
+  @Retries.OnceRaw
+  Table initTable() throws IOException {
+    table = dynamoDB.getTable(tableName);
+    try {
+      try {
+        LOG.debug("Binding to table {}", tableName);
+        TableDescription description = table.describe();
+        LOG.debug("Table state: {}", description);
+        tableArn = description.getTableArn();
+        final String status = description.getTableStatus();
+        switch (status) {
+        case "CREATING":
+          LOG.debug("Table {} in region {} is being created/updated. This may"
+                  + " indicate that the table is being operated by another "
+                  + "concurrent thread or process. Waiting for active...",
+              tableName, region);
+          waitForTableActive(table);
+          break;
+        case "DELETING":
+          throw new FileNotFoundException("DynamoDB table "
+              + "'" + tableName + "' is being "
+              + "deleted in region " + region);
+        case "UPDATING":
+          // table being updated; it can still be used.
+          LOG.debug("Table is being updated.");
+          break;
+        case "ACTIVE":
+          break;
+        default:
+          throw new IOException("Unknown DynamoDB table status " + status
+              + ": tableName='" + tableName + "', region=" + region);
+        }
+
+        verifyVersionCompatibility();
+        final Item versionMarker = getVersionMarkerItem();
+        Long created = extractCreationTimeFromMarker(versionMarker);
+        LOG.debug("Using existing DynamoDB table {} in region {} created {}",
+            tableName, region, (created != null) ? new Date(created) : null);
+      } catch (ResourceNotFoundException rnfe) {
+        if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
+          long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
+              S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT);
+          long writeCapacity = conf.getLong(
+              S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
+              S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT);
+          ProvisionedThroughput capacity;
+          if (readCapacity > 0 && writeCapacity > 0) {
+            capacity = new ProvisionedThroughput(
+                readCapacity,
+                writeCapacity);
+          } else {
+            // at least one capacity value is <= 0
+            // verify they are both exactly zero
+            Preconditions.checkArgument(
+                readCapacity == 0 && writeCapacity == 0,
+                "S3Guard table read capacity %d and and write capacity %d"
+                    + " are inconsistent", readCapacity, writeCapacity);
+            // and set the capacity to null for per-request billing.
+            capacity = null;
+          }
+
+          createTable(capacity);
+        } else {
+          throw (FileNotFoundException) new FileNotFoundException(
+              "DynamoDB table '" + tableName + "' does not "
+                  + "exist in region " + region +
+                  "; auto-creation is turned off")
+              .initCause(rnfe);
+        }
+      }
+
+    } catch (AmazonClientException e) {
+      throw translateException("initTable", tableName, e);
+    }
+
+    return table;
+  }
+
+  protected void tagTableWithVersionMarker() {
+    TagResourceRequest tagResourceRequest = new TagResourceRequest()
+        .withResourceArn(table.getDescription().getTableArn())
+        .withTags(newVersionMarkerTag());
+    amazonDynamoDB.tagResource(tagResourceRequest);
+  }
+
+  // todo test
+  protected static Item getVersionMarkerFromTags(Table table,
+      AmazonDynamoDB addb) {
+    final List<Tag> tags;
+    try {
+      final TableDescription description = table.describe();
+      ListTagsOfResourceRequest listTagsOfResourceRequest =
+          new ListTagsOfResourceRequest()
+              .withResourceArn(description.getTableArn());
+      tags = addb.listTagsOfResource(listTagsOfResourceRequest).getTags();
+    } catch (ResourceNotFoundException e) {
+      LOG.error("Table: {} not found.");
+      throw e;
+    }
+
+    final Optional<Tag> first = tags.stream()
+        .filter(tag -> tag.getKey().equals(VERSION_MARKER)).findFirst();
+    if (first.isPresent()) {
+      final Tag vmTag = first.get();
+      return createVersionMarker(
+          vmTag.getKey(), Integer.valueOf(vmTag.getValue()), 0
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Create a table, wait for it to become active, then add the version
+   * marker.
+   * Creating an setting up the table isn't wrapped by any retry operations;
+   * the wait for a table to become available is RetryTranslated.
+   * The tags are added to the table during creation, not after creation.
+   * We can assume that tagging and creating the table is a single atomic
+   * operation.
+   *
+   * @param capacity capacity to provision. If null: create a per-request
+   * table.
+   * @throws IOException on any failure.
+   * @throws InterruptedIOException if the wait was interrupted
+   */
+  @Retries.OnceRaw
 
 Review comment:
   OnceMixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to