This is an automated email from the ASF dual-hosted git repository.

gabota pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a700c2  HADOOP-16520. Race condition in DDB table init and waiting 
threads.  (#1576). Contributed by Gabor Bota.
4a700c2 is described below

commit 4a700c20d553dc5336ee881719bcf189fc46bfbf
Author: Gabor Bota <gabor.b...@cloudera.com>
AuthorDate: Fri Oct 11 12:08:47 2019 +0200

    HADOOP-16520. Race condition in DDB table init and waiting threads.  
(#1576). Contributed by Gabor Bota.
    
    Fixes HADOOP-16349. DynamoDBMetadataStore.getVersionMarkerItem() to log at 
info/warn on retry
    
    Change-Id: Ia83e92b9039ccb780090c99c41b4f71ef7539d35
---
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   2 +-
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java      | 450 +------------
 .../s3guard/DynamoDBMetadataStoreTableManager.java | 693 +++++++++++++++++++++
 .../s3guard/PathMetadataDynamoDBTranslation.java   |   2 +-
 .../hadoop/fs/s3a/s3guard/S3GuardTableAccess.java  |   6 +-
 .../src/site/markdown/tools/hadoop-aws/s3guard.md  |  33 +-
 .../fs/s3a/s3guard/ITestDynamoDBMetadataStore.java | 173 +++--
 .../s3guard/ITestDynamoDBMetadataStoreScale.java   |   4 +-
 .../fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java   |   7 +-
 .../fs/s3a/s3guard/TestDynamoDBMiscOperations.java |   2 +-
 .../TestPathMetadataDynamoDBTranslation.java       |   6 +-
 11 files changed, 892 insertions(+), 486 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index fdbdf37..9f120b8 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -176,7 +176,7 @@ public final class Constants {
 
   // number of times we should retry errors
   public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
-  public static final int DEFAULT_MAX_ERROR_RETRIES = 20;
+  public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
 
   // seconds until we give up trying to establish a connection to s3
   public static final String ESTABLISH_TIMEOUT =
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 92f04bf..044f3a5 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.fs.s3a.s3guard;
 
 import javax.annotation.Nullable;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
@@ -28,7 +27,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,9 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
-import com.amazonaws.SdkBaseException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
 import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
@@ -62,17 +58,9 @@ import 
com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
 import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
 import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
 import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
-import com.amazonaws.services.dynamodbv2.model.BillingMode;
-import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
-import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
 import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
-import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
-import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
 import com.amazonaws.services.dynamodbv2.model.TableDescription;
-import com.amazonaws.services.dynamodbv2.model.Tag;
-import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
 import com.amazonaws.services.dynamodbv2.model.WriteRequest;
-import com.amazonaws.waiters.WaiterTimedOutException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -89,7 +77,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.s3a.AWSClientIOException;
 import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
 import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
 import org.apache.hadoop.fs.s3a.Constants;
@@ -233,19 +220,14 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
       OPERATIONS_LOG_NAME);
 
   /** parent/child name to use in the version marker. */
-  public static final String VERSION_MARKER = "../VERSION";
+  public static final String VERSION_MARKER_ITEM_NAME = "../VERSION";
+
+  /** parent/child name to use in the version marker. */
+  public static final String VERSION_MARKER_TAG_NAME = "s3guard_version";
 
   /** Current version number. */
   public static final int VERSION = 100;
 
-  /** Error: version marker not found in table. */
-  public static final String E_NO_VERSION_MARKER
-      = "S3Guard table lacks version marker.";
-
-  /** Error: version mismatch. */
-  public static final String E_INCOMPATIBLE_VERSION
-      = "Database table is from an incompatible S3Guard version.";
-
   @VisibleForTesting
   static final String BILLING_MODE
       = "billing-mode";
@@ -305,14 +287,14 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
   private String region;
   private Table table;
   private String tableName;
-  private String tableArn;
   private Configuration conf;
   private String username;
 
   /**
    * This policy is mostly for batched writes, not for processing
    * exceptions in invoke() calls.
-   * It also has a role purpose in {@link #getVersionMarkerItem()};
+   * It also has a role purpose in
+   * {@link DynamoDBMetadataStoreTableManager#getVersionMarkerItem()};
    * look at that method for the details.
    */
   private RetryPolicy batchWriteRetryPolicy;
@@ -359,6 +341,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
    */
   private ITtlTimeProvider ttlTimeProvider;
 
+  private DynamoDBMetadataStoreTableManager tableHandler;
+
   /**
    * A utility function to create DynamoDB instance.
    * @param conf the file system configuration
@@ -437,7 +421,11 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
     );
 
     this.ttlTimeProvider = ttlTp;
-    initTable();
+
+    tableHandler = new DynamoDBMetadataStoreTableManager(
+        dynamoDB, tableName, region, amazonDynamoDB, conf, readOp,
+        batchWriteRetryPolicy);
+    this.table = tableHandler.initTable();
 
     instrumentation.initialized();
   }
@@ -494,6 +482,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
     conf = config;
     // use the bucket as the DynamoDB table name if not specified in config
     tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
+
     Preconditions.checkArgument(!StringUtils.isEmpty(tableName),
         "No DynamoDB table name configured");
     region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
@@ -518,7 +507,11 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
         "s3a-ddb-" + tableName);
     initDataAccessRetries(conf);
     this.ttlTimeProvider = ttlTp;
-    initTable();
+
+    tableHandler = new DynamoDBMetadataStoreTableManager(
+        dynamoDB, tableName, region, amazonDynamoDB, conf, readOp,
+        batchWriteRetryPolicy);
+    this.table = tableHandler.initTable();
   }
 
   /**
@@ -1438,32 +1431,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
   @Override
   @Retries.RetryTranslated
   public void destroy() throws IOException {
-    if (table == null) {
-      LOG.info("In destroy(): no table to delete");
-      return;
-    }
-    LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
-    Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
-    try {
-      invoker.retry("delete", null, true,
-          () -> table.delete());
-      table.waitForDelete();
-    } catch (IllegalArgumentException ex) {
-      throw new TableDeleteTimeoutException(tableName,
-          "Timeout waiting for the table " + tableArn + " to be deleted",
-          ex);
-    } catch (FileNotFoundException rnfe) {
-      LOG.info("FileNotFoundException while deleting DynamoDB table {} in "
-              + "region {}.  This may indicate that the table does not exist, "
-              + "or has been deleted by another concurrent thread or process.",
-          tableName, region);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
-          tableName, ie);
-      throw new InterruptedIOException("Table " + tableName
-          + " in region " + region + " has not been deleted");
-    }
+    tableHandler.destroy();
   }
 
   @Retries.RetryTranslated
@@ -1688,29 +1656,6 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
     }
   }
 
-  /**
-   *  Add tags from configuration to the existing DynamoDB table.
-   */
-  @Retries.OnceRaw
-  public void tagTable() {
-    List<Tag> tags = new ArrayList<>();
-    Map <String, String> tagProperties =
-        conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG);
-    for (Map.Entry<String, String> tagMapEntry : tagProperties.entrySet()) {
-      Tag tag = new Tag().withKey(tagMapEntry.getKey())
-          .withValue(tagMapEntry.getValue());
-      tags.add(tag);
-    }
-    if (tags.isEmpty()) {
-      return;
-    }
-
-    TagResourceRequest tagResourceRequest = new TagResourceRequest()
-        .withResourceArn(table.getDescription().getTableArn())
-        .withTags(tags);
-    getAmazonDynamoDB().tagResource(tagResourceRequest);
-  }
-
   @VisibleForTesting
   public AmazonDynamoDB getAmazonDynamoDB() {
     return amazonDynamoDB;
@@ -1721,7 +1666,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
     return getClass().getSimpleName() + '{'
         + "region=" + region
         + ", tableName=" + tableName
-        + ", tableArn=" + tableArn
+        + ", tableArn=" + tableHandler.getTableArn()
         + '}';
   }
 
@@ -1735,276 +1680,21 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
   @Override
   public List<RoleModel.Statement> listAWSPolicyRules(
       final Set<AccessLevel> access) {
-    Preconditions.checkState(tableArn != null, "TableARN not known");
+    Preconditions.checkState(tableHandler.getTableArn() != null,
+        "TableARN not known");
     if (access.isEmpty()) {
       return Collections.emptyList();
     }
     RoleModel.Statement stat;
     if (access.contains(AccessLevel.ADMIN)) {
-      stat = allowAllDynamoDBOperations(tableArn);
+      stat = allowAllDynamoDBOperations(tableHandler.getTableArn());
     } else {
-      stat = allowS3GuardClientOperations(tableArn);
+      stat = allowS3GuardClientOperations(tableHandler.getTableArn());
     }
     return Lists.newArrayList(stat);
   }
 
   /**
-   * Create a table if it does not exist and wait for it to become active.
-   *
-   * If a table with the intended name already exists, then it uses that table.
-   * Otherwise, it will automatically create the table if the config
-   * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
-   * enabled. The DynamoDB table creation API is asynchronous.  This method 
wait
-   * for the table to become active after sending the creation request, so
-   * overall, this method is synchronous, and the table is guaranteed to exist
-   * after this method returns successfully.
-   *
-   * The wait for a table becoming active is Retry+Translated; it can fail
-   * while a table is not yet ready.
-   *
-   * @throws IOException if table does not exist and auto-creation is disabled;
-   * or table is being deleted, or any other I/O exception occurred.
-   */
-  @VisibleForTesting
-  @Retries.OnceRaw
-  void initTable() throws IOException {
-    table = dynamoDB.getTable(tableName);
-    try {
-      try {
-        LOG.debug("Binding to table {}", tableName);
-        TableDescription description = table.describe();
-        LOG.debug("Table state: {}", description);
-        tableArn = description.getTableArn();
-        final String status = description.getTableStatus();
-        switch (status) {
-        case "CREATING":
-          LOG.debug("Table {} in region {} is being created/updated. This may"
-                  + " indicate that the table is being operated by another "
-                  + "concurrent thread or process. Waiting for active...",
-              tableName, region);
-          waitForTableActive(table);
-          break;
-        case "DELETING":
-          throw new FileNotFoundException("DynamoDB table "
-              + "'" + tableName + "' is being "
-              + "deleted in region " + region);
-        case "UPDATING":
-          // table being updated; it can still be used.
-          LOG.debug("Table is being updated.");
-          break;
-        case "ACTIVE":
-          break;
-        default:
-          throw new IOException("Unknown DynamoDB table status " + status
-              + ": tableName='" + tableName + "', region=" + region);
-        }
-
-        final Item versionMarker = getVersionMarkerItem();
-        verifyVersionCompatibility(tableName, versionMarker);
-        Long created = extractCreationTimeFromMarker(versionMarker);
-        LOG.debug("Using existing DynamoDB table {} in region {} created {}",
-            tableName, region, (created != null) ? new Date(created) : null);
-      } catch (ResourceNotFoundException rnfe) {
-        if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
-          long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
-              S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT);
-          long writeCapacity = conf.getLong(
-              S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
-              S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT);
-          ProvisionedThroughput capacity;
-          if (readCapacity > 0 && writeCapacity > 0) {
-            capacity = new ProvisionedThroughput(
-                readCapacity,
-                writeCapacity);
-          } else {
-            // at least one capacity value is <= 0
-            // verify they are both exactly zero
-            Preconditions.checkArgument(
-                readCapacity == 0 && writeCapacity == 0,
-                "S3Guard table read capacity %d and and write capacity %d"
-                    + " are inconsistent", readCapacity, writeCapacity);
-            // and set the capacity to null for per-request billing.
-            capacity = null;
-          }
-
-          createTable(capacity);
-        } else {
-          throw (FileNotFoundException)new FileNotFoundException(
-              "DynamoDB table '" + tableName + "' does not "
-              + "exist in region " + region + "; auto-creation is turned off")
-              .initCause(rnfe);
-        }
-      }
-
-    } catch (AmazonClientException e) {
-      throw translateException("initTable", tableName, e);
-    }
-  }
-
-  /**
-   * Get the version mark item in the existing DynamoDB table.
-   *
-   * As the version marker item may be created by another concurrent thread or
-   * process, we sleep and retry a limited number times if the lookup returns
-   * with a null value.
-   * DDB throttling is always retried.
-   */
-  @VisibleForTesting
-  @Retries.RetryTranslated
-  Item getVersionMarkerItem() throws IOException {
-    final PrimaryKey versionMarkerKey =
-        createVersionMarkerPrimaryKey(VERSION_MARKER);
-    int retryCount = 0;
-    // look for a version marker, with usual throttling/failure retries.
-    Item versionMarker = queryVersionMarker(versionMarkerKey);
-    while (versionMarker == null) {
-      // The marker was null.
-      // Two possibilities
-      // 1. This isn't a S3Guard table.
-      // 2. This is a S3Guard table in construction; another thread/process
-      //    is about to write/actively writing the version marker.
-      // So that state #2 is handled, batchWriteRetryPolicy is used to manage
-      // retries.
-      // This will mean that if the cause is actually #1, failure will not
-      // be immediate. As this will ultimately result in a failure to
-      // init S3Guard and the S3A FS, this isn't going to be a performance
-      // bottleneck -simply a slightly slower failure report than would 
otherwise
-      // be seen.
-      // "if your settings are broken, performance is not your main issue"
-      try {
-        RetryPolicy.RetryAction action = 
batchWriteRetryPolicy.shouldRetry(null,
-            retryCount, 0, true);
-        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
-          break;
-        } else {
-          LOG.debug("Sleeping {} ms before next retry", action.delayMillis);
-          Thread.sleep(action.delayMillis);
-        }
-      } catch (Exception e) {
-        throw new IOException("initTable: Unexpected exception " + e, e);
-      }
-      retryCount++;
-      versionMarker = queryVersionMarker(versionMarkerKey);
-    }
-    return versionMarker;
-  }
-
-  /**
-   * Issue the query to get the version marker, with throttling for overloaded
-   * DDB tables.
-   * @param versionMarkerKey key to look up
-   * @return the marker
-   * @throws IOException failure
-   */
-  @Retries.RetryTranslated
-  private Item queryVersionMarker(final PrimaryKey versionMarkerKey)
-      throws IOException {
-    return readOp.retry("getVersionMarkerItem",
-        VERSION_MARKER, true,
-        () -> table.getItem(versionMarkerKey));
-  }
-
-  /**
-   * Verify that a table version is compatible with this S3Guard client.
-   * @param tableName name of the table (for error messages)
-   * @param versionMarker the version marker retrieved from the table
-   * @throws IOException on any incompatibility
-   */
-  @VisibleForTesting
-  static void verifyVersionCompatibility(String tableName,
-      Item versionMarker) throws IOException {
-    if (versionMarker == null) {
-      LOG.warn("Table {} contains no version marker", tableName);
-      throw new IOException(E_NO_VERSION_MARKER
-      + " Table: " + tableName);
-    } else {
-      final int version = extractVersionFromMarker(versionMarker);
-      if (VERSION != version) {
-        // version mismatch. Unless/until there is support for
-        // upgrading versions, treat this as an incompatible change
-        // and fail.
-        throw new IOException(E_INCOMPATIBLE_VERSION
-            + " Table "+  tableName
-            + " Expected version " + VERSION + " actual " + version);
-      }
-    }
-  }
-
-  /**
-   * Wait for table being active.
-   * @param t table to block on.
-   * @throws IOException IO problems
-   * @throws InterruptedIOException if the wait was interrupted
-   * @throws IllegalArgumentException if an exception was raised in the waiter
-   */
-  @Retries.RetryTranslated
-  private void waitForTableActive(Table t) throws IOException {
-    invoker.retry("Waiting for active state of table " + tableName,
-        null,
-        true,
-        () -> {
-          try {
-            t.waitForActive();
-          } catch (IllegalArgumentException ex) {
-            throw translateTableWaitFailure(tableName, ex);
-          } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting for table {} in region {}"
-                    + " active",
-                tableName, region, e);
-            Thread.currentThread().interrupt();
-            throw (InterruptedIOException)
-                new InterruptedIOException("DynamoDB table '"
-                    + tableName + "' is not active yet in region " + region)
-                    .initCause(e);
-          }
-        });
-  }
-
-  /**
-   * Create a table, wait for it to become active, then add the version
-   * marker.
-   * Creating an setting up the table isn't wrapped by any retry operations;
-   * the wait for a table to become available is RetryTranslated.
-   * @param capacity capacity to provision. If null: create a per-request
-   * table.
-   * @throws IOException on any failure.
-   * @throws InterruptedIOException if the wait was interrupted
-   */
-  @Retries.OnceRaw
-  private void createTable(ProvisionedThroughput capacity) throws IOException {
-    try {
-      String mode;
-      CreateTableRequest request = new CreateTableRequest()
-          .withTableName(tableName)
-          .withKeySchema(keySchema())
-          .withAttributeDefinitions(attributeDefinitions());
-      if (capacity != null) {
-        mode = String.format("with provisioned read capacity %d and"
-                + " write capacity %s",
-            capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits());
-        request.withProvisionedThroughput(capacity);
-      } else {
-        mode = "with pay-per-request billing";
-        request.withBillingMode(BillingMode.PAY_PER_REQUEST);
-      }
-      LOG.info("Creating non-existent DynamoDB table {} in region {} {}",
-          tableName, region, mode);
-      table = dynamoDB.createTable(request);
-      LOG.debug("Awaiting table becoming active");
-    } catch (ResourceInUseException e) {
-      LOG.warn("ResourceInUseException while creating DynamoDB table {} "
-              + "in region {}.  This may indicate that the table was "
-              + "created by another concurrent thread or process.",
-          tableName, region);
-    }
-    waitForTableActive(table);
-    final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
-        System.currentTimeMillis());
-    putItem(marker);
-    tagTable();
-  }
-
-  /**
    * PUT a single item to the table.
    * @param item item to put
    * @return the outcome.
@@ -2015,47 +1705,6 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
     return table.putItem(item);
   }
 
-  /**
-   * Provision the table with given read and write capacity units.
-   * Call will fail if the table is busy, or the new values match the current
-   * ones.
-   * <p>
-   * Until the AWS SDK lets us switch a table to on-demand, an attempt to
-   * set the I/O capacity to zero will fail.
-   * @param readCapacity read units: must be greater than zero
-   * @param writeCapacity write units: must be greater than zero
-   * @throws IOException on a failure
-   */
-  @Retries.RetryTranslated
-  void provisionTable(Long readCapacity, Long writeCapacity)
-      throws IOException {
-
-    if (readCapacity == 0 || writeCapacity == 0) {
-      // table is pay on demand
-      throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY);
-    }
-    final ProvisionedThroughput toProvision = new ProvisionedThroughput()
-        .withReadCapacityUnits(readCapacity)
-        .withWriteCapacityUnits(writeCapacity);
-    invoker.retry("ProvisionTable", tableName, true,
-        () -> {
-          final ProvisionedThroughputDescription p =
-              table.updateTable(toProvision).getProvisionedThroughput();
-          LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
-                  + "writeCapacityUnits={}",
-              tableName, region, p.getReadCapacityUnits(),
-              p.getWriteCapacityUnits());
-        });
-  }
-
-  @Retries.RetryTranslated
-  @VisibleForTesting
-  void provisionTableBlocking(Long readCapacity, Long writeCapacity)
-      throws IOException {
-    provisionTable(readCapacity, writeCapacity);
-    waitForTableActive(table);
-  }
-
   @VisibleForTesting
   Table getTable() {
     return table;
@@ -2175,7 +1824,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
           currentRead, currentWrite);
       LOG.info("Changing capacity of table to read: {}, write: {}",
           newRead, newWrite);
-      provisionTableBlocking(newRead, newWrite);
+      tableHandler.provisionTableBlocking(newRead, newWrite);
     } else {
       LOG.info("Table capacity unchanged at read: {}, write: {}",
           newRead, newWrite);
@@ -2375,48 +2024,6 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
   }
 
   /**
-   * Take an {@code IllegalArgumentException} raised by a DDB operation
-   * and if it contains an inner SDK exception, unwrap it.
-   * @param ex exception.
-   * @return the inner AWS exception or null.
-   */
-  public static SdkBaseException extractInnerException(
-      IllegalArgumentException ex) {
-    if (ex.getCause() instanceof  SdkBaseException) {
-      return (SdkBaseException) ex.getCause();
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Handle a table wait failure by extracting any inner cause and
-   * converting it, or, if unconvertable by wrapping
-   * the IllegalArgumentException in an IOE.
-   *
-   * @param name name of the table
-   * @param e exception
-   * @return an IOE to raise.
-   */
-  @VisibleForTesting
-  static IOException translateTableWaitFailure(
-      final String name, IllegalArgumentException e) {
-    final SdkBaseException ex = extractInnerException(e);
-    if (ex != null) {
-      if (ex instanceof WaiterTimedOutException) {
-        // a timeout waiting for state change: extract the
-        // message from the outer exception, but translate
-        // the inner one for the throttle policy.
-        return new AWSClientIOException(e.getMessage(), ex);
-      } else {
-        return translateException(e.getMessage(), name, ex);
-      }
-    } else {
-      return new IOException(e);
-    }
-  }
-
-  /**
    * Log a PUT into the operations log at debug level.
    * @param state optional ancestor state.
    * @param items items which have been PUT
@@ -2691,4 +2298,9 @@ public class DynamoDBMetadataStore implements 
MetadataStore,
       return stateStr;
     }
   }
+
+  protected DynamoDBMetadataStoreTableManager getTableHandler() {
+    Preconditions.checkNotNull(tableHandler, "Not initialized");
+    return tableHandler;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java
new file mode 100644
index 0000000..d9f297c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java
@@ -0,0 +1,693 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.amazonaws.services.dynamodbv2.model.ScanResult;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+import com.amazonaws.services.dynamodbv2.model.Tag;
+import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
+import com.amazonaws.waiters.WaiterTimedOutException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.AWSClientIOException;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+import static java.lang.String.valueOf;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG;
+import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_TAG_NAME;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.attributeDefinitions;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarkerPrimaryKey;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractCreationTimeFromMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractVersionFromMarker;
+import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema;
+
+/**
+ * Managing dynamo tables for S3Guard dynamodb based metadatastore.
+ * Factored out from DynamoDBMetadataStore.
+ */
+public class DynamoDBMetadataStoreTableManager {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DynamoDBMetadataStoreTableManager.class);
+
+  /** Error: version marker not found in table but the table is not empty. */
+  public static final String E_NO_VERSION_MARKER_AND_NOT_EMPTY
+      = "S3Guard table lacks version marker, and it is not empty.";
+
+  /** Error: version mismatch. */
+  public static final String E_INCOMPATIBLE_TAG_VERSION
+      = "Database table is from an incompatible S3Guard version based on table 
TAG.";
+
+  /** Error: version mismatch. */
+  public static final String E_INCOMPATIBLE_ITEM_VERSION
+      = "Database table is from an incompatible S3Guard version based on table 
ITEM.";
+
+  /** Invoker for IO. Until configured properly, use try-once. */
+  private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      Invoker.NO_OP
+  );
+
+  final private AmazonDynamoDB amazonDynamoDB;
+  final private DynamoDB dynamoDB;
+  final private String tableName;
+  final private String region;
+  final private Configuration conf;
+  final private Invoker readOp;
+  final private RetryPolicy batchWriteRetryPolicy;
+
+  private Table table;
+  private String tableArn;
+
+  public DynamoDBMetadataStoreTableManager(DynamoDB dynamoDB,
+      String tableName,
+      String region,
+      AmazonDynamoDB amazonDynamoDB,
+      Configuration conf,
+      Invoker readOp,
+      RetryPolicy batchWriteCapacityExceededEvents) {
+    this.dynamoDB = dynamoDB;
+    this.amazonDynamoDB = amazonDynamoDB;
+    this.tableName = tableName;
+    this.region = region;
+    this.conf = conf;
+    this.readOp = readOp;
+    this.batchWriteRetryPolicy = batchWriteCapacityExceededEvents;
+  }
+
+  /**
+   * Create a table if it does not exist and wait for it to become active.
+   *
+   * If a table with the intended name already exists, then it uses that table.
+   * Otherwise, it will automatically create the table if the config
+   * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
+   * enabled. The DynamoDB table creation API is asynchronous.  This method 
wait
+   * for the table to become active after sending the creation request, so
+   * overall, this method is synchronous, and the table is guaranteed to exist
+   * after this method returns successfully.
+   *
+   * The wait for a table becoming active is Retry+Translated; it can fail
+   * while a table is not yet ready.
+   *
+   * @throws IOException if table does not exist and auto-creation is disabled;
+   * or table is being deleted, or any other I/O exception occurred.
+   */
+  @VisibleForTesting
+  @Retries.RetryTranslated
+  Table initTable() throws IOException {
+    table = dynamoDB.getTable(tableName);
+    try {
+      try {
+        LOG.debug("Binding to table {}", tableName);
+        TableDescription description = table.describe();
+        LOG.debug("Table state: {}", description);
+        tableArn = description.getTableArn();
+        final String status = description.getTableStatus();
+        switch (status) {
+        case "CREATING":
+          LOG.debug("Table {} in region {} is being created/updated. This may"
+                  + " indicate that the table is being operated by another "
+                  + "concurrent thread or process. Waiting for active...",
+              tableName, region);
+          waitForTableActive(table);
+          break;
+        case "DELETING":
+          throw new FileNotFoundException("DynamoDB table "
+              + "'" + tableName + "' is being "
+              + "deleted in region " + region);
+        case "UPDATING":
+          // table being updated; it can still be used.
+          LOG.debug("Table is being updated.");
+          break;
+        case "ACTIVE":
+          break;
+        default:
+          throw new IOException("Unknown DynamoDB table status " + status
+              + ": tableName='" + tableName + "', region=" + region);
+        }
+
+        verifyVersionCompatibility();
+        final Item versionMarker = getVersionMarkerItem();
+        Long created = extractCreationTimeFromMarker(versionMarker);
+        LOG.debug("Using existing DynamoDB table {} in region {} created {}",
+            tableName, region, (created != null) ? new Date(created) : null);
+      } catch (ResourceNotFoundException rnfe) {
+        if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
+          long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
+              S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT);
+          long writeCapacity = conf.getLong(
+              S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
+              S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT);
+          ProvisionedThroughput capacity;
+          if (readCapacity > 0 && writeCapacity > 0) {
+            capacity = new ProvisionedThroughput(
+                readCapacity,
+                writeCapacity);
+          } else {
+            // at least one capacity value is <= 0
+            // verify they are both exactly zero
+            Preconditions.checkArgument(
+                readCapacity == 0 && writeCapacity == 0,
+                "S3Guard table read capacity %d and and write capacity %d"
+                    + " are inconsistent", readCapacity, writeCapacity);
+            // and set the capacity to null for per-request billing.
+            capacity = null;
+          }
+
+          createTable(capacity);
+        } else {
+          throw (FileNotFoundException) new FileNotFoundException(
+              "DynamoDB table '" + tableName + "' does not "
+                  + "exist in region " + region +
+                  "; auto-creation is turned off")
+              .initCause(rnfe);
+        }
+      }
+
+    } catch (AmazonClientException e) {
+      throw translateException("initTable", tableName, e);
+    }
+
+    return table;
+  }
+
+  protected void tagTableWithVersionMarker() {
+    try {
+      TagResourceRequest tagResourceRequest = new TagResourceRequest()
+          .withResourceArn(table.getDescription().getTableArn())
+          .withTags(newVersionMarkerTag());
+      amazonDynamoDB.tagResource(tagResourceRequest);
+    } catch (AmazonDynamoDBException e) {
+      LOG.warn("Exception during tagging table: {}", e.getMessage());
+    }
+  }
+
+  protected static Item getVersionMarkerFromTags(Table table,
+      AmazonDynamoDB addb) {
+    List<Tag> tags = null;
+    try {
+      final TableDescription description = table.describe();
+      ListTagsOfResourceRequest listTagsOfResourceRequest =
+          new ListTagsOfResourceRequest()
+              .withResourceArn(description.getTableArn());
+      tags = addb.listTagsOfResource(listTagsOfResourceRequest).getTags();
+    } catch (ResourceNotFoundException e) {
+      LOG.error("Table: {} not found.", table.getTableName());
+      throw e;
+    } catch (AmazonDynamoDBException e) {
+      LOG.warn("Exception while getting tags from the dynamo table: {}",
+          e.getMessage());
+    }
+
+    if (tags == null) {
+      return null;
+    }
+
+    final Optional<Tag> first = tags.stream()
+        .filter(tag -> 
tag.getKey().equals(VERSION_MARKER_TAG_NAME)).findFirst();
+    if (first.isPresent()) {
+      final Tag vmTag = first.get();
+      return createVersionMarker(
+          vmTag.getKey(), Integer.parseInt(vmTag.getValue()), 0
+      );
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Create a table, wait for it to become active, then add the version
+   * marker.
+   * Creating an setting up the table isn't wrapped by any retry operations;
+   * the wait for a table to become available is RetryTranslated.
+   * The tags are added to the table during creation, not after creation.
+   * We can assume that tagging and creating the table is a single atomic
+   * operation.
+   *
+   * @param capacity capacity to provision. If null: create a per-request
+   * table.
+   * @throws IOException on any failure.
+   * @throws InterruptedIOException if the wait was interrupted
+   */
+  @Retries.OnceMixed
+  private void createTable(ProvisionedThroughput capacity) throws IOException {
+    try {
+      String mode;
+      CreateTableRequest request = new CreateTableRequest()
+          .withTableName(tableName)
+          .withKeySchema(keySchema())
+          .withAttributeDefinitions(attributeDefinitions())
+          .withTags(getTableTagsFromConfig());
+      if (capacity != null) {
+        mode = String.format("with provisioned read capacity %d and"
+                + " write capacity %s",
+            capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits());
+        request.withProvisionedThroughput(capacity);
+      } else {
+        mode = "with pay-per-request billing";
+        request.withBillingMode(BillingMode.PAY_PER_REQUEST);
+      }
+      LOG.info("Creating non-existent DynamoDB table {} in region {} {}",
+          tableName, region, mode);
+      table = dynamoDB.createTable(request);
+      LOG.debug("Awaiting table becoming active");
+    } catch (ResourceInUseException e) {
+      LOG.warn("ResourceInUseException while creating DynamoDB table {} "
+              + "in region {}.  This may indicate that the table was "
+              + "created by another concurrent thread or process.",
+          tableName, region);
+    }
+    waitForTableActive(table);
+    putVersionMarkerItemToTable();
+  }
+
+  /**
+   *  Return tags from configuration and the version marker for adding to
+   *  dynamo table during creation.
+   */
+  @Retries.OnceRaw
+  public List<Tag> getTableTagsFromConfig() {
+    List<Tag> tags = new ArrayList<>();
+
+    // from configuration
+    Map<String, String> tagProperties =
+        conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG);
+    for (Map.Entry<String, String> tagMapEntry : tagProperties.entrySet()) {
+      Tag tag = new Tag().withKey(tagMapEntry.getKey())
+          .withValue(tagMapEntry.getValue());
+      tags.add(tag);
+    }
+    // add the version marker
+    tags.add(newVersionMarkerTag());
+    return tags;
+  }
+
+  /**
+   * Create a new version marker tag.
+   * @return a new version marker tag
+   */
+  private static Tag newVersionMarkerTag() {
+    return new 
Tag().withKey(VERSION_MARKER_TAG_NAME).withValue(valueOf(VERSION));
+  }
+
+  /**
+   * Verify that a table version is compatible with this S3Guard client.
+   *
+   * Checks for consistency between the version marker as the item and tag.
+   *
+   * <pre>
+   *   1. If the table lacks both version markers AND it's empty,
+   *      both markers will be added.
+   *      If the table is not empty the check throws IOException
+   *   2. If there's no version marker ITEM, the compatibility with the TAG
+   *      will be checked, and the version marker ITEM will be added if the
+   *      TAG version is compatible.
+   *      If the TAG version is not compatible, the check throws OException
+   *   3. If there's no version marker TAG, the compatibility with the ITEM
+   *      version marker will be checked, and the version marker ITEM will be
+   *      added if the ITEM version is compatible.
+   *      If the ITEM version is not compatible, the check throws IOException
+   *   4. If the TAG and ITEM versions are both present then both will be 
checked
+   *      for compatibility. If the ITEM or TAG version marker is not 
compatible,
+   *      the check throws IOException
+   * </pre>
+   *
+   * @throws IOException on any incompatibility
+   */
+  @VisibleForTesting
+  protected void verifyVersionCompatibility() throws IOException {
+    final Item versionMarkerItem = getVersionMarkerItem();
+    final Item versionMarkerFromTag =
+        getVersionMarkerFromTags(table, amazonDynamoDB);
+
+    LOG.debug("versionMarkerItem: {};  versionMarkerFromTag: {}",
+        versionMarkerItem, versionMarkerFromTag);
+
+    if (versionMarkerItem == null && versionMarkerFromTag == null) {
+      if (!isEmptyTable(tableName, amazonDynamoDB)) {
+        LOG.error("Table is not empty but missing the version maker. 
Failing.");
+        throw new IOException(E_NO_VERSION_MARKER_AND_NOT_EMPTY
+            + " Table: " + tableName);
+      }
+
+      LOG.info("Table {} contains no version marker item or tag. " +
+              "The table is empty, so the version marker will be added " +
+              "as TAG and ITEM.", tableName);
+
+      tagTableWithVersionMarker();
+      putVersionMarkerItemToTable();
+    }
+
+    if (versionMarkerItem == null && versionMarkerFromTag != null) {
+      final int tagVersionMarker =
+          extractVersionFromMarker(versionMarkerFromTag);
+      throwExceptionOnVersionMismatch(tagVersionMarker, tableName,
+          E_INCOMPATIBLE_TAG_VERSION);
+
+      LOG.info("Table {} contains no version marker ITEM but contains " +
+              "compatible version marker TAG. Restoring the version marker " +
+              "item from tag.", tableName);
+
+      putVersionMarkerItemToTable();
+    }
+
+    if (versionMarkerItem != null && versionMarkerFromTag == null) {
+      final int itemVersionMarker =
+          extractVersionFromMarker(versionMarkerItem);
+      throwExceptionOnVersionMismatch(itemVersionMarker, tableName,
+          E_INCOMPATIBLE_ITEM_VERSION);
+
+      LOG.info("Table {} contains no version marker TAG but contains " +
+          "compatible version marker ITEM. Restoring the version marker " +
+          "item from item.", tableName);
+
+      tagTableWithVersionMarker();
+    }
+
+    if (versionMarkerItem != null && versionMarkerFromTag != null) {
+      final int tagVersionMarker =
+          extractVersionFromMarker(versionMarkerFromTag);
+      final int itemVersionMarker =
+          extractVersionFromMarker(versionMarkerItem);
+
+      throwExceptionOnVersionMismatch(tagVersionMarker, tableName,
+          E_INCOMPATIBLE_TAG_VERSION);
+      throwExceptionOnVersionMismatch(itemVersionMarker, tableName,
+          E_INCOMPATIBLE_ITEM_VERSION);
+
+      LOG.debug("Table {} contains correct version marker TAG and ITEM.",
+          tableName);
+    }
+  }
+
+  private static boolean isEmptyTable(String tableName, AmazonDynamoDB aadb) {
+    final ScanRequest req = new ScanRequest().withTableName(
+        tableName).withLimit(1);
+    final ScanResult result = aadb.scan(req);
+    return result.getCount() == 0;
+  }
+
+  private static void throwExceptionOnVersionMismatch(int actual,
+      String tableName,
+      String exMsg) throws IOException {
+
+    if (VERSION != actual) {
+      throw new IOException(exMsg + " Table " + tableName
+          + " Expected version: " + VERSION + " actual tag version: " +
+          actual);
+    }
+  }
+
+  /**
+   * Add version marker to the dynamo table.
+   */
+  @Retries.OnceRaw
+  private void putVersionMarkerItemToTable() {
+    final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION,
+        System.currentTimeMillis());
+    putItem(marker);
+  }
+
+  /**
+   * Wait for table being active.
+   * @param t table to block on.
+   * @throws IOException IO problems
+   * @throws InterruptedIOException if the wait was interrupted
+   * @throws IllegalArgumentException if an exception was raised in the waiter
+   */
+  @Retries.RetryTranslated
+  private void waitForTableActive(Table t) throws IOException {
+    invoker.retry("Waiting for active state of table " + tableName,
+        null,
+        true,
+        () -> {
+          try {
+            t.waitForActive();
+          } catch (IllegalArgumentException ex) {
+            throw translateTableWaitFailure(tableName, ex);
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for table {} in region {}"
+                    + " active",
+                tableName, region, e);
+            Thread.currentThread().interrupt();
+            throw (InterruptedIOException)
+                new InterruptedIOException("DynamoDB table '"
+                    + tableName + "' is not active yet in region " + region)
+                    .initCause(e);
+          }
+        });
+  }
+
+  /**
+   * Handle a table wait failure by extracting any inner cause and
+   * converting it, or, if unconvertable by wrapping
+   * the IllegalArgumentException in an IOE.
+   *
+   * @param name name of the table
+   * @param e exception
+   * @return an IOE to raise.
+   */
+  @VisibleForTesting
+  static IOException translateTableWaitFailure(
+      final String name, IllegalArgumentException e) {
+    final SdkBaseException ex = extractInnerException(e);
+    if (ex != null) {
+      if (ex instanceof WaiterTimedOutException) {
+        // a timeout waiting for state change: extract the
+        // message from the outer exception, but translate
+        // the inner one for the throttle policy.
+        return new AWSClientIOException(e.getMessage(), ex);
+      } else {
+        return translateException(e.getMessage(), name, ex);
+      }
+    } else {
+      return new IOException(e);
+    }
+  }
+
+  /**
+   * Take an {@code IllegalArgumentException} raised by a DDB operation
+   * and if it contains an inner SDK exception, unwrap it.
+   * @param ex exception.
+   * @return the inner AWS exception or null.
+   */
+  public static SdkBaseException extractInnerException(
+      IllegalArgumentException ex) {
+    if (ex.getCause() instanceof SdkBaseException) {
+      return (SdkBaseException) ex.getCause();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get the version mark item in the existing DynamoDB table.
+   *
+   * As the version marker item may be created by another concurrent thread or
+   * process, we sleep and retry a limited number times if the lookup returns
+   * with a null value.
+   * DDB throttling is always retried.
+   */
+  @VisibleForTesting
+  @Retries.RetryTranslated
+  protected Item getVersionMarkerItem() throws IOException {
+    final PrimaryKey versionMarkerKey =
+        createVersionMarkerPrimaryKey(VERSION_MARKER_ITEM_NAME);
+    int retryCount = 0;
+    // look for a version marker, with usual throttling/failure retries.
+    Item versionMarker = queryVersionMarker(versionMarkerKey);
+    while (versionMarker == null) {
+      // The marker was null.
+      // Two possibilities
+      // 1. This isn't a S3Guard table.
+      // 2. This is a S3Guard table in construction; another thread/process
+      //    is about to write/actively writing the version marker.
+      // So that state #2 is handled, batchWriteRetryPolicy is used to manage
+      // retries.
+      // This will mean that if the cause is actually #1, failure will not
+      // be immediate. As this will ultimately result in a failure to
+      // init S3Guard and the S3A FS, this isn't going to be a performance
+      // bottleneck -simply a slightly slower failure report than would 
otherwise
+      // be seen.
+      // "if your settings are broken, performance is not your main issue"
+      try {
+        RetryPolicy.RetryAction action = 
batchWriteRetryPolicy.shouldRetry(null,
+            retryCount, 0, true);
+        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+          break;
+        } else {
+          LOG.warn("No version marker found in the DynamoDB table: {}. " +
+              "Sleeping {} ms before next retry", tableName, 
action.delayMillis);
+          Thread.sleep(action.delayMillis);
+        }
+      } catch (Exception e) {
+        throw new IOException("initTable: Unexpected exception " + e, e);
+      }
+      retryCount++;
+      versionMarker = queryVersionMarker(versionMarkerKey);
+    }
+    return versionMarker;
+  }
+
+  /**
+   * Issue the query to get the version marker, with throttling for overloaded
+   * DDB tables.
+   * @param versionMarkerKey key to look up
+   * @return the marker
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  private Item queryVersionMarker(final PrimaryKey versionMarkerKey)
+      throws IOException {
+    return readOp.retry("getVersionMarkerItem",
+        VERSION_MARKER_ITEM_NAME, true,
+        () -> table.getItem(versionMarkerKey));
+  }
+
+  /**
+   * PUT a single item to the table.
+   * @param item item to put
+   * @return the outcome.
+   */
+  @Retries.OnceRaw
+  private PutItemOutcome putItem(Item item) {
+    LOG.debug("Putting item {}", item);
+    return table.putItem(item);
+  }
+
+  /**
+   * Provision the table with given read and write capacity units.
+   * Call will fail if the table is busy, or the new values match the current
+   * ones.
+   * <p>
+   * Until the AWS SDK lets us switch a table to on-demand, an attempt to
+   * set the I/O capacity to zero will fail.
+   * @param readCapacity read units: must be greater than zero
+   * @param writeCapacity write units: must be greater than zero
+   * @throws IOException on a failure
+   */
+  @Retries.RetryTranslated
+  void provisionTable(Long readCapacity, Long writeCapacity)
+      throws IOException {
+
+    if (readCapacity == 0 || writeCapacity == 0) {
+      // table is pay on demand
+      throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY);
+    }
+    final ProvisionedThroughput toProvision = new ProvisionedThroughput()
+        .withReadCapacityUnits(readCapacity)
+        .withWriteCapacityUnits(writeCapacity);
+    invoker.retry("ProvisionTable", tableName, true,
+        () -> {
+          final ProvisionedThroughputDescription p =
+              table.updateTable(toProvision).getProvisionedThroughput();
+          LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+                  + "writeCapacityUnits={}",
+              tableName, region, p.getReadCapacityUnits(),
+              p.getWriteCapacityUnits());
+        });
+  }
+
+  @Retries.RetryTranslated
+  public void destroy() throws IOException {
+    if (table == null) {
+      LOG.info("In destroy(): no table to delete");
+      return;
+    }
+    LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
+    Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
+    try {
+      invoker.retry("delete", null, true,
+          () -> table.delete());
+      table.waitForDelete();
+    } catch (IllegalArgumentException ex) {
+      throw new TableDeleteTimeoutException(tableName,
+          "Timeout waiting for the table " + getTableArn()
+              + " to be deleted", ex);
+    } catch (FileNotFoundException rnfe) {
+      LOG.info("FileNotFoundException while deleting DynamoDB table {} in "
+              + "region {}.  This may indicate that the table does not exist, "
+              + "or has been deleted by another concurrent thread or process.",
+          tableName, region);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
+          tableName, ie);
+      throw new InterruptedIOException("Table " + tableName
+          + " in region " + region + " has not been deleted");
+    }
+  }
+
+  @Retries.RetryTranslated
+  @VisibleForTesting
+  void provisionTableBlocking(Long readCapacity, Long writeCapacity)
+      throws IOException {
+    provisionTable(readCapacity, writeCapacity);
+    waitForTableActive(table);
+  }
+
+  public Table getTable() {
+    return table;
+  }
+
+  public String getTableArn() {
+    return tableArn;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
index 348dfbf..be12088 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
@@ -247,7 +247,7 @@ public final class PathMetadataDynamoDBTranslation {
    * @return the creation time, or null
    * @throws IOException if the item is not a version marker
    */
-  static Long extractCreationTimeFromMarker(Item marker) throws IOException {
+  static Long extractCreationTimeFromMarker(Item marker) {
     if (marker.hasAttribute(TABLE_CREATED)) {
       return marker.getLong(TABLE_CREATED);
     } else {
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java
index 5592faa..19ef90e 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME;
 import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.CHILD;
 import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT;
 import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.TABLE_VERSION;
@@ -199,8 +199,8 @@ class S3GuardTableAccess {
     public DDBPathMetadata next() {
       Item item = it.next();
       Pair<String, String> key = primaryKey(item);
-      if (VERSION_MARKER.equals(key.getLeft()) &&
-          VERSION_MARKER.equals(key.getRight())) {
+      if (VERSION_MARKER_ITEM_NAME.equals(key.getLeft()) &&
+          VERSION_MARKER_ITEM_NAME.equals(key.getRight())) {
         // a version marker is found, return the special type
         return new VersionMarker(item);
       } else {
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index cb0fd13..571f223 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -951,9 +951,36 @@ logged.
 
 ### Versioning
 
-S3Guard tables are created with a version marker, an entry with the primary
-key and child entry of `../VERSION`; the use of a relative path guarantees
-that it will not be resolved.
+S3Guard tables are created with a version marker entry and table tag.
+The entry is created with the primary key and child entry of `../VERSION`; 
+the use of a relative path guarantees that it will not be resolved.
+Table tag key is named `s3guard_version`.
+
+When the table is initialized by S3Guard, the table will be tagged during the 
+creating and the version marker entry will be created in the table.
+If the table lacks the version marker entry or tag, S3Guard will try to create
+it according to the following rules:
+
+1. If the table lacks both version markers AND it's empty, both markers will 
be added. 
+If the table is not empty the check throws IOException
+1. If there's no version marker ITEM, the compatibility with the TAG
+will be checked, and the version marker ITEM will be added if the
+TAG version is compatible.
+If the TAG version is not compatible, the check throws OException
+1. If there's no version marker TAG, the compatibility with the ITEM
+version marker will be checked, and the version marker ITEM will be
+added if the ITEM version is compatible.
+If the ITEM version is not compatible, the check throws IOException
+1. If the TAG and ITEM versions are both present then both will be checked
+for compatibility. If the ITEM or TAG version marker is not compatible,
+the check throws IOException
+
+*Note*: If the user does not have sufficient rights to tag the table the 
+initialization of S3Guard will not fail, but there will be no version marker 
tag
+on the dynamo table and the following message will be logged on WARN level:
+```
+Exception during tagging table: {AmazonDynamoDBException exception message}
+```
 
 *Versioning policy*
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
index 78c9fea..e541683 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
 import com.amazonaws.services.dynamodbv2.document.Item;
 import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
@@ -41,6 +42,8 @@ import 
com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest;
 import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
 import com.amazonaws.services.dynamodbv2.model.TableDescription;
 import com.amazonaws.services.dynamodbv2.model.Tag;
+import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
+import com.amazonaws.services.dynamodbv2.model.UntagResourceRequest;
 import com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
 
@@ -70,10 +73,15 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import static java.lang.String.valueOf;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_ITEM_VERSION;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_TAG_VERSION;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_NO_VERSION_MARKER_AND_NOT_EMPTY;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.getVersionMarkerFromTags;
 import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
 import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*;
 import static org.apache.hadoop.test.LambdaTestUtils.*;
@@ -110,10 +118,11 @@ public class ITestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
       LoggerFactory.getLogger(ITestDynamoDBMetadataStore.class);
   public static final PrimaryKey
       VERSION_MARKER_PRIMARY_KEY = createVersionMarkerPrimaryKey(
-      DynamoDBMetadataStore.VERSION_MARKER);
+      DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME);
 
   private S3AFileSystem fileSystem;
   private S3AContract s3AContract;
+  private DynamoDBMetadataStoreTableManager tableHandler;
 
   private URI fsUri;
 
@@ -153,6 +162,7 @@ public class ITestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
 
     try{
       super.setUp();
+      tableHandler = getDynamoMetadataStore().getTableHandler();
     } catch (FileNotFoundException e){
       LOG.warn("MetadataStoreTestBase setup failed. Waiting for table to be "
           + "deleted before trying again.");
@@ -613,32 +623,11 @@ public class ITestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
     final String tableName = ddbms.getTable().getTableName();
     verifyTableInitialized(tableName, ddbms.getDynamoDB());
     // create existing table
-    ddbms.initTable();
+    tableHandler.initTable();
     verifyTableInitialized(tableName, ddbms.getDynamoDB());
   }
 
   /**
-   * Test the low level version check code.
-   */
-  @Test
-  public void testItemVersionCompatibility() throws Throwable {
-    verifyVersionCompatibility("table",
-        createVersionMarker(VERSION_MARKER, VERSION, 0));
-  }
-
-  /**
-   * Test that a version marker entry without the version number field
-   * is rejected as incompatible with a meaningful error message.
-   */
-  @Test
-  public void testItemLacksVersion() throws Throwable {
-    intercept(IOException.class, E_NOT_VERSION_MARKER,
-        () -> verifyVersionCompatibility("table",
-            new Item().withPrimaryKey(
-                createVersionMarkerPrimaryKey(VERSION_MARKER))));
-  }
-
-  /**
    * Test versioning handling.
    * <ol>
    *   <li>Create the table.</li>
@@ -663,43 +652,118 @@ public class ITestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
     DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore();
     try {
       ddbms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
+      DynamoDBMetadataStoreTableManager localTableHandler =
+          ddbms.getTableHandler();
+
       Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB());
-      // check the tagging too
+      // check the tagging
       verifyStoreTags(createTagMap(), ddbms);
+      // check version compatibility
+      checkVerifyVersionMarkerCompatibility(localTableHandler, table);
 
-      Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY);
-      table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
-
-      // create existing table
-      intercept(IOException.class, E_NO_VERSION_MARKER,
-          () -> ddbms.initTable());
-
-      // now add a different version marker
-      Item v200 = createVersionMarker(VERSION_MARKER, VERSION * 2, 0);
-      table.putItem(v200);
-
-      // create existing table
-      intercept(IOException.class, E_INCOMPATIBLE_VERSION,
-          () -> ddbms.initTable());
-
-      // create a marker with no version and expect failure
-      final Item invalidMarker = new Item().withPrimaryKey(
-          createVersionMarkerPrimaryKey(VERSION_MARKER))
-          .withLong(TABLE_CREATED, 0);
-      table.putItem(invalidMarker);
-
-      intercept(IOException.class, E_NOT_VERSION_MARKER,
-          () -> ddbms.initTable());
-
-      // reinstate the version marker
-      table.putItem(originalVersionMarker);
-      ddbms.initTable();
       conf.setInt(S3GUARD_DDB_MAX_RETRIES, maxRetries);
     } finally {
       destroy(ddbms);
     }
   }
 
+  private void checkVerifyVersionMarkerCompatibility(
+      DynamoDBMetadataStoreTableManager localTableHandler, Table table)
+      throws Exception {
+    final AmazonDynamoDB addb
+        = getDynamoMetadataStore().getAmazonDynamoDB();
+    Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY);
+
+    LOG.info("1/6: remove version marker and tags from table " +
+        "the table is empty, so it should be initialized after the call");
+    deleteVersionMarkerItem(table);
+    removeVersionMarkerTag(table, addb);
+    localTableHandler.initTable();
+
+    final int versionFromItem = extractVersionFromMarker(
+        localTableHandler.getVersionMarkerItem());
+    final int versionFromTag = extractVersionFromMarker(
+        getVersionMarkerFromTags(table, addb));
+    assertEquals("Table should be tagged with the right version.",
+        VERSION, versionFromTag);
+    assertEquals("Table should have the right version marker.",
+        VERSION, versionFromItem);
+
+    LOG.info("2/6: if the table is not empty and there's no version marker " +
+        "it should fail");
+    deleteVersionMarkerItem(table);
+    removeVersionMarkerTag(table, addb);
+    String testKey = "coffee";
+    Item wrongItem =
+        createVersionMarker(testKey, VERSION * 2, 0);
+    table.putItem(wrongItem);
+    intercept(IOException.class, E_NO_VERSION_MARKER_AND_NOT_EMPTY,
+        () -> localTableHandler.initTable());
+
+    LOG.info("3/6: table has only version marker item then it will be tagged");
+    table.putItem(originalVersionMarker);
+    localTableHandler.initTable();
+    final int versionFromTag2 = extractVersionFromMarker(
+        getVersionMarkerFromTags(table, addb));
+    assertEquals("Table should have the right version marker tag " +
+        "if there was a version item.", VERSION, versionFromTag2);
+
+    LOG.info("4/6: table has only version marker tag then the version marker " 
+
+        "item will be created.");
+    deleteVersionMarkerItem(table);
+    removeVersionMarkerTag(table, addb);
+    localTableHandler.tagTableWithVersionMarker();
+    localTableHandler.initTable();
+    final int versionFromItem2 = extractVersionFromMarker(
+        localTableHandler.getVersionMarkerItem());
+    assertEquals("Table should have the right version marker item " +
+        "if there was a version tag.", VERSION, versionFromItem2);
+
+    LOG.info("5/6: add a different marker tag to the table: init should fail");
+    deleteVersionMarkerItem(table);
+    removeVersionMarkerTag(table, addb);
+    Item v200 = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION * 2, 0);
+    table.putItem(v200);
+    intercept(IOException.class, E_INCOMPATIBLE_ITEM_VERSION,
+        () -> localTableHandler.initTable());
+
+    LOG.info("6/6: add a different marker item to the table: init should 
fail");
+    deleteVersionMarkerItem(table);
+    removeVersionMarkerTag(table, addb);
+    int wrongVersion = VERSION + 3;
+    tagTableWithCustomVersion(table, addb, wrongVersion);
+    intercept(IOException.class, E_INCOMPATIBLE_TAG_VERSION,
+        () -> localTableHandler.initTable());
+
+    // CLEANUP
+    table.putItem(originalVersionMarker);
+    localTableHandler.tagTableWithVersionMarker();
+    localTableHandler.initTable();
+  }
+
+  private void tagTableWithCustomVersion(Table table,
+      AmazonDynamoDB addb,
+      int wrongVersion) {
+    final Tag vmTag = new Tag().withKey(VERSION_MARKER_TAG_NAME)
+        .withValue(valueOf(wrongVersion));
+    TagResourceRequest tagResourceRequest = new TagResourceRequest()
+        .withResourceArn(table.getDescription().getTableArn())
+        .withTags(vmTag);
+    addb.tagResource(tagResourceRequest);
+  }
+
+  private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) {
+    addb.untagResource(new UntagResourceRequest()
+        .withResourceArn(table.describe().getTableArn())
+        .withTagKeys(VERSION_MARKER_TAG_NAME));
+  }
+
+  private void deleteVersionMarkerItem(Table table) {
+    table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
+    assertNull("Version marker should be null after deleting it " +
+            "from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY));
+  }
+
   /**
    * Test that initTable fails with IOException when table does not exist and
    * table auto-creation is disabled.
@@ -952,8 +1016,11 @@ public class ITestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
     tags.forEach(t -> actual.put(t.getKey(), t.getValue()));
     Assertions.assertThat(actual)
         .describedAs("Tags from DDB table")
-        .containsExactlyEntriesOf(tagMap);
-    assertEquals(tagMap.size(), tags.size());
+        .containsAllEntriesOf(tagMap);
+
+    // The version marker is always there in the tags.
+    // We have a plus one in tags we expect.
+    assertEquals(tagMap.size() + 1, tags.size());
   }
 
   protected List<Tag> listTagsOfStore(final DynamoDBMetadataStore store) {
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
index 53df60f..a0614d5 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
@@ -91,6 +91,7 @@ public class ITestDynamoDBMetadataStoreScale
   private static final long MAXIMUM_WRITE_CAPACITY = 15;
 
   private DynamoDBMetadataStore ddbms;
+  private DynamoDBMetadataStoreTableManager tableHandler;
 
   private DynamoDB ddb;
 
@@ -160,6 +161,7 @@ public class ITestDynamoDBMetadataStoreScale
     super.setup();
     ddbms = (DynamoDBMetadataStore) createMetadataStore();
     tableName = ddbms.getTableName();
+    tableHandler = ddbms.getTableHandler();
     assertNotNull("table has no name", tableName);
     ddb = ddbms.getDynamoDB();
     table = ddb.getTable(tableName);
@@ -325,7 +327,7 @@ public class ITestDynamoDBMetadataStoreScale
     execute("get",
         OPERATIONS_PER_THREAD * 2,
         expectThrottling(),
-        () -> ddbms.getVersionMarkerItem()
+        () -> tableHandler.getVersionMarkerItem()
     );
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
index 205eb65..358ab83 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
@@ -160,8 +160,13 @@ public class ITestS3GuardToolDynamoDB extends 
AbstractS3GuardToolTestBase {
       List<Tag> tags = 
ddbms.getAmazonDynamoDB().listTagsOfResource(listTagsOfResourceRequest).getTags();
 
       // assert
-      assertEquals(tagMap.size(), tags.size());
+      // table version is always there as a plus one tag.
+      assertEquals(tagMap.size() + 1, tags.size());
       for (Tag tag : tags) {
+        // skip the version marker tag
+        if (tag.getKey().equals(VERSION_MARKER_TAG_NAME)) {
+          continue;
+        }
         Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue());
       }
       // be sure to clean up - delete table
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java
index 578aed0..602a072 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.test.HadoopTestBase;
 import org.apache.hadoop.fs.Path;
 
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.translateTableWaitFailure;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.translateTableWaitFailure;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java
index 70bf901..c882094 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java
@@ -51,7 +51,7 @@ import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.is;
 
 import static 
org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
-import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER;
+import static 
org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME;
 import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION;
 import static org.mockito.Mockito.never;
 
@@ -272,14 +272,14 @@ public class TestPathMetadataDynamoDBTranslation extends 
Assert {
 
   @Test
   public void testVersionRoundTrip() throws Throwable {
-    final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0);
+    final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 
0);
     assertEquals("Extracted version from " + marker,
         VERSION, extractVersionFromMarker(marker));
   }
 
   @Test
   public void testVersionMarkerNotStatusIllegalPath() throws Throwable {
-    final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0);
+    final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 
0);
     assertNull("Path metadata fromfrom " + marker,
         itemToPathMetadata(marker, "alice"));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to