steveloughran commented on a change in pull request #1576: Hadoop 16520 
dynamodb ms version race refactor. 
URL: https://github.com/apache/hadoop/pull/1576#discussion_r330639780
 
 

 ##########
 File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java
 ##########
 @@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanResult;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+import com.amazonaws.services.dynamodbv2.model.Tag;
+import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
+import com.amazonaws.waiters.WaiterTimedOutException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.AWSClientIOException;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+import static java.lang.String.valueOf;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG;
+import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.attributeDefinitions;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarkerPrimaryKey;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractCreationTimeFromMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractVersionFromMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema;
+
+/**
+ * Table handling for dynamo tables, factored out from DynamoDBMetadataStore
+ * Mainly
+ */
+public class DynamoDBMetadataStoreTableHandler {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DynamoDBMetadataStoreTableHandler.class);
+
+  /** Error: version marker not found in table but the table is not empty. */
+  public static final String E_NO_VERSION_MARKER_AND_NOT_EMPTY
+      = "S3Guard table lacks version marker, and not empty.";
+
+  /** Error: version mismatch. */
+  public static final String E_INCOMPATIBLE_TAG_VERSION
+      = "Database table is from an incompatible S3Guard version based on table 
TAG.";
+
+  /** Error: version mismatch. */
+  public static final String E_INCOMPATIBLE_ITEM_VERSION
+      = "Database table is from an incompatible S3Guard version based on table 
ITEM.";
+
+  /** Invoker for IO. Until configured properly, use try-once. */
+  private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      Invoker.NO_OP
+  );
+
+  final private AmazonDynamoDB amazonDynamoDB;
+  final private DynamoDB dynamoDB;
+  final private String tableName;
+  final private String region;
+  final private Configuration conf;
+  final private Invoker readOp;
+  final private RetryPolicy batchWriteRetryPolicy;
+
+  private Table table;
+  private String tableArn;
+
+  public DynamoDBMetadataStoreTableHandler(DynamoDB dynamoDB,
+      String tableName,
+      String region,
+      AmazonDynamoDB amazonDynamoDB,
+      Configuration conf,
+      Invoker readOp,
+      RetryPolicy batchWriteCapacityExceededEvents) {
+    this.dynamoDB = dynamoDB;
+    this.amazonDynamoDB = amazonDynamoDB;
+    this.tableName = tableName;
+    this.region = region;
+    this.conf = conf;
+    this.readOp = readOp;
+    this.batchWriteRetryPolicy = batchWriteCapacityExceededEvents;
+  }
+
+  /**
+   * Create a table if it does not exist and wait for it to become active.
+   *
+   * If a table with the intended name already exists, then it uses that table.
+   * Otherwise, it will automatically create the table if the config
+   * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
+   * enabled. The DynamoDB table creation API is asynchronous.  This method 
wait
+   * for the table to become active after sending the creation request, so
+   * overall, this method is synchronous, and the table is guaranteed to exist
+   * after this method returns successfully.
+   *
+   * The wait for a table becoming active is Retry+Translated; it can fail
+   * while a table is not yet ready.
+   *
+   * @throws IOException if table does not exist and auto-creation is disabled;
+   * or table is being deleted, or any other I/O exception occurred.
+   */
+  @VisibleForTesting
+  @Retries.OnceRaw
+  Table initTable() throws IOException {
+    table = dynamoDB.getTable(tableName);
+    try {
+      try {
+        LOG.debug("Binding to table {}", tableName);
+        TableDescription description = table.describe();
+        LOG.debug("Table state: {}", description);
+        tableArn = description.getTableArn();
+        final String status = description.getTableStatus();
+        switch (status) {
+        case "CREATING":
+          LOG.debug("Table {} in region {} is being created/updated. This may"
+                  + " indicate that the table is being operated by another "
+                  + "concurrent thread or process. Waiting for active...",
+              tableName, region);
+          waitForTableActive(table);
+          break;
+        case "DELETING":
+          throw new FileNotFoundException("DynamoDB table "
+              + "'" + tableName + "' is being "
+              + "deleted in region " + region);
+        case "UPDATING":
+          // table being updated; it can still be used.
+          LOG.debug("Table is being updated.");
+          break;
+        case "ACTIVE":
+          break;
+        default:
+          throw new IOException("Unknown DynamoDB table status " + status
+              + ": tableName='" + tableName + "', region=" + region);
+        }
+
+        verifyVersionCompatibility();
+        final Item versionMarker = getVersionMarkerItem();
+        Long created = extractCreationTimeFromMarker(versionMarker);
+        LOG.debug("Using existing DynamoDB table {} in region {} created {}",
+            tableName, region, (created != null) ? new Date(created) : null);
+      } catch (ResourceNotFoundException rnfe) {
+        if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
+          long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
+              S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT);
+          long writeCapacity = conf.getLong(
+              S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
+              S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT);
+          ProvisionedThroughput capacity;
+          if (readCapacity > 0 && writeCapacity > 0) {
+            capacity = new ProvisionedThroughput(
+                readCapacity,
+                writeCapacity);
+          } else {
+            // at least one capacity value is <= 0
+            // verify they are both exactly zero
+            Preconditions.checkArgument(
+                readCapacity == 0 && writeCapacity == 0,
+                "S3Guard table read capacity %d and and write capacity %d"
+                    + " are inconsistent", readCapacity, writeCapacity);
+            // and set the capacity to null for per-request billing.
+            capacity = null;
+          }
+
+          createTable(capacity);
+        } else {
+          throw (FileNotFoundException) new FileNotFoundException(
+              "DynamoDB table '" + tableName + "' does not "
+                  + "exist in region " + region +
+                  "; auto-creation is turned off")
+              .initCause(rnfe);
+        }
+      }
+
+    } catch (AmazonClientException e) {
+      throw translateException("initTable", tableName, e);
+    }
+
+    return table;
+  }
+
+  protected void tagTableWithVersionMarker() {
+    TagResourceRequest tagResourceRequest = new TagResourceRequest()
+        .withResourceArn(table.getDescription().getTableArn())
+        .withTags(newVersionMarkerTag());
+    amazonDynamoDB.tagResource(tagResourceRequest);
+  }
+
+  // todo test
+  protected static Item getVersionMarkerFromTags(Table table,
+      AmazonDynamoDB addb) {
+    final List<Tag> tags;
+    try {
+      final TableDescription description = table.describe();
+      ListTagsOfResourceRequest listTagsOfResourceRequest =
+          new ListTagsOfResourceRequest()
+              .withResourceArn(description.getTableArn());
+      tags = addb.listTagsOfResource(listTagsOfResourceRequest).getTags();
+    } catch (ResourceNotFoundException e) {
+      LOG.error("Table: {} not found.");
+      throw e;
+    }
+
+    final Optional<Tag> first = tags.stream()
+        .filter(tag -> tag.getKey().equals(VERSION_MARKER)).findFirst();
+    if (first.isPresent()) {
+      final Tag vmTag = first.get();
+      return createVersionMarker(
+          vmTag.getKey(), Integer.valueOf(vmTag.getValue()), 0
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Create a table, wait for it to become active, then add the version
+   * marker.
+   * Creating an setting up the table isn't wrapped by any retry operations;
+   * the wait for a table to become available is RetryTranslated.
+   * The tags are added to the table during creation, not after creation.
+   * We can assume that tagging and creating the table is a single atomic
+   * operation.
+   *
+   * @param capacity capacity to provision. If null: create a per-request
+   * table.
+   * @throws IOException on any failure.
+   * @throws InterruptedIOException if the wait was interrupted
+   */
+  @Retries.OnceRaw
+  private void createTable(ProvisionedThroughput capacity) throws IOException {
+    try {
+      String mode;
+      CreateTableRequest request = new CreateTableRequest()
+          .withTableName(tableName)
+          .withKeySchema(keySchema())
+          .withAttributeDefinitions(attributeDefinitions())
+          .withTags(getTableTagsFromConfig());
+      if (capacity != null) {
+        mode = String.format("with provisioned read capacity %d and"
+                + " write capacity %s",
+            capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits());
+        request.withProvisionedThroughput(capacity);
+      } else {
+        mode = "with pay-per-request billing";
+        request.withBillingMode(BillingMode.PAY_PER_REQUEST);
+      }
+      LOG.info("Creating non-existent DynamoDB table {} in region {} {}",
+          tableName, region, mode);
+      table = dynamoDB.createTable(request);
+      LOG.debug("Awaiting table becoming active");
+    } catch (ResourceInUseException e) {
+      LOG.warn("ResourceInUseException while creating DynamoDB table {} "
+              + "in region {}.  This may indicate that the table was "
+              + "created by another concurrent thread or process.",
+          tableName, region);
+    }
+    waitForTableActive(table);
+    putVersionMarkerToTable();
+  }
+
+  /**
+   *  Return tags from configuration and the version marker for adding to
+   *  dynamo table during creation
+   */
+  @Retries.OnceRaw
+  public List<Tag> getTableTagsFromConfig() {
+    List<Tag> tags = new ArrayList<>();
+
+    // from configuration
+    Map<String, String> tagProperties =
+        conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG);
+    for (Map.Entry<String, String> tagMapEntry : tagProperties.entrySet()) {
+      Tag tag = new Tag().withKey(tagMapEntry.getKey())
+          .withValue(tagMapEntry.getValue());
+      tags.add(tag);
+    }
+    // add the version marker
+    tags.add(newVersionMarkerTag());
+    return tags;
+  }
+
+  /**
+   * Create a new version marker tag
+   * @return a new version marker tag
+   */
+  private static Tag newVersionMarkerTag() {
+    return new Tag().withKey(VERSION_MARKER).withValue(valueOf(VERSION));
+  }
+
+  /**
+   * Verify that a table version is compatible with this S3Guard client.
+   *
+   * Checks for consistency between the version marker as the item and tag.
+   *
+   * <pre>
+   *   1. If the table lacks both version markers AND it's empty,
+   *      both markers will be added.
+   *      If the table is not empty the check throws {@link IOException}
+   *   2. If there's no version marker ITEM, the compatibility with the TAG
+   *      will be checked, and the version marker ITEM will be added if the
+   *      TAG version is compatible.
+   *      If the TAG version is not compatible, the check throws {@link 
IOException}
+   *   3. If there's no version marker TAG, the compatibility with the ITEM
+   *      version marker will be checked, and the version marker ITEM will be
+   *      added if the ITEM version is compatible.
+   *      If the ITEM version is not compatible, the check throws {@link 
IOException}
+   *   4. If the TAG and ITEM versions are both present then both will be 
checked
+   *      for compatibility. If the ITEM or TAG version marker is not 
compatible,
+   *      the check throws {@link IOException}
+   * </pre>
+   *
+   * @throws IOException on any incompatibility
+   */
+  @VisibleForTesting
+  protected void verifyVersionCompatibility() throws IOException {
+    final Item versionMarkerItem = getVersionMarkerItem();
+    final Item versionMarkerFromTag =
+        getVersionMarkerFromTags(table, amazonDynamoDB);
+
+    LOG.debug("versionMarkerItem: {};  versionMarkerFromTag: {}",
+        versionMarkerItem, versionMarkerFromTag);
+
+    if (versionMarkerItem == null && versionMarkerFromTag == null) {
+      if (!isEmptyTable(tableName, amazonDynamoDB)) {
+        LOG.error("Table is not empty but missing the version maker. 
Failing.");
+        throw new IOException(E_NO_VERSION_MARKER_AND_NOT_EMPTY
+            + " Table: " + tableName);
+      }
+
+      LOG.info("Table {} contains no version marker item or tag. " +
+              "The table is empty, so the version marker will be added " +
+              "as TAG and ITEM.", tableName);
+
+      tagTableWithVersionMarker();
+      putVersionMarkerToTable();
+    }
+
+    if (versionMarkerItem == null && versionMarkerFromTag != null) {
+      final int tagVersionMarker =
+          extractVersionFromMarker(versionMarkerFromTag);
+      throwExceptionOnVersionMismatch(tagVersionMarker, tableName,
+          E_INCOMPATIBLE_TAG_VERSION);
+
+      LOG.info("Table {} contains no version marker ITEM but contains " +
+              "compatible version marker TAG. Restoring the version marker " +
+              "item from tag.", tableName);
+
+      putVersionMarkerToTable();
+    }
+
+    if (versionMarkerItem != null && versionMarkerFromTag == null) {
+      final int itemVersionMarker =
+          extractVersionFromMarker(versionMarkerItem);
+      throwExceptionOnVersionMismatch(itemVersionMarker, tableName,
+          E_INCOMPATIBLE_ITEM_VERSION);
+
+      LOG.info("Table {} contains no version marker TAG but contains " +
+          "compatible version marker ITEM. Restoring the version marker " +
+          "item from item.", tableName);
+
+      tagTableWithVersionMarker();
+    }
+
+    if (versionMarkerItem != null && versionMarkerFromTag != null) {
+      final int tagVersionMarker =
+          extractVersionFromMarker(versionMarkerFromTag);
+      final int itemVersionMarker =
+          extractVersionFromMarker(versionMarkerItem);
+
+      throwExceptionOnVersionMismatch(tagVersionMarker, tableName,
+          E_INCOMPATIBLE_TAG_VERSION);
+      throwExceptionOnVersionMismatch(itemVersionMarker, tableName,
+          E_INCOMPATIBLE_ITEM_VERSION);
+
+      LOG.info("Table {} contains correct version marker TAG and ITEM.",
+          tableName);
+    }
+  }
+
+  private static boolean isEmptyTable(String tableName, AmazonDynamoDB aadb) {
+    final ScanRequest req = new ScanRequest().withTableName(
+        tableName).withLimit(1);
+    final ScanResult result = aadb.scan(req);
+    return result.getCount() == 0;
+  }
+
+  private static void throwExceptionOnVersionMismatch(int actual,
+      String tableName,
+      String exMsg) throws IOException {
+
+    if (VERSION != actual) {
+      throw new IOException(exMsg + " Table " + tableName
 
 Review comment:
   or a PathIOException with the table as a Path? I'm never a fan of base IOEs

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to