http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
new file mode 100644
index 0000000..26b1901
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.TableState;
+
+/**
+ * This is a helper class used to manage table states.
+ * States persisted in tableinfo and cached internally.
+ */
[email protected]
+public class TableStateManager {
+  private static final Log LOG = LogFactory.getLog(TableStateManager.class);
+  private final TableDescriptors descriptors;
+
+  private final Map<TableName, TableState.State> tableStates = 
Maps.newConcurrentMap();
+
+  public TableStateManager(MasterServices master) {
+    this.descriptors = master.getTableDescriptors();
+  }
+
+  public void start() throws IOException {
+    Map<String, TableDescriptor> all = descriptors.getAllDescriptors();
+    for (TableDescriptor table : all.values()) {
+      TableName tableName = table.getHTableDescriptor().getTableName();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding table state: " + tableName
+            + ": " + table.getTableState());
+      }
+      tableStates.put(tableName, table.getTableState());
+    }
+  }
+
+  /**
+   * Set table state to provided.
+   * Caller should lock table on write.
+   * @param tableName table to change state for
+   * @param newState new state
+   * @throws IOException
+   */
+  public void setTableState(TableName tableName, TableState.State newState) 
throws IOException {
+    synchronized (tableStates) {
+      TableDescriptor descriptor = readDescriptor(tableName);
+      if (descriptor == null) {
+        throw new TableNotFoundException(tableName);
+      }
+      if (descriptor.getTableState() != newState) {
+        writeDescriptor(
+            new TableDescriptor(descriptor.getHTableDescriptor(), newState));
+      }
+    }
+  }
+
+  /**
+   * Set table state to provided but only if table in specified states
+   * Caller should lock table on write.
+   * @param tableName table to change state for
+   * @param newState new state
+   * @param states states to check against
+   * @throws IOException
+   */
+  public boolean setTableStateIfInStates(TableName tableName,
+                                         TableState.State newState,
+                                         TableState.State... states)
+          throws IOException {
+    synchronized (tableStates) {
+      TableDescriptor descriptor = readDescriptor(tableName);
+      if (descriptor == null) {
+        throw new TableNotFoundException(tableName);
+      }
+      if (TableState.isInStates(descriptor.getTableState(), states)) {
+        writeDescriptor(
+            new TableDescriptor(descriptor.getHTableDescriptor(), newState));
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+
+  /**
+   * Set table state to provided but only if table not in specified states
+   * Caller should lock table on write.
+   * @param tableName table to change state for
+   * @param newState new state
+   * @param states states to check against
+   * @throws IOException
+   */
+  public boolean setTableStateIfNotInStates(TableName tableName,
+                                            TableState.State newState,
+                                            TableState.State... states)
+          throws IOException {
+    synchronized (tableStates) {
+      TableDescriptor descriptor = readDescriptor(tableName);
+      if (descriptor == null) {
+        throw new TableNotFoundException(tableName);
+      }
+      if (!TableState.isInStates(descriptor.getTableState(), states)) {
+        writeDescriptor(
+            new TableDescriptor(descriptor.getHTableDescriptor(), newState));
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  public boolean isTableState(TableName tableName, TableState.State... states) 
{
+    TableState.State tableState = null;
+    try {
+      tableState = getTableState(tableName);
+    } catch (IOException e) {
+      LOG.error("Unable to get table state, probably table not exists");
+      return false;
+    }
+    return tableState != null && TableState.isInStates(tableState, states);
+  }
+
+  public void setDeletedTable(TableName tableName) throws IOException {
+    TableState.State remove = tableStates.remove(tableName);
+    if (remove == null) {
+      LOG.warn("Moving table " + tableName + " state to deleted but was " +
+              "already deleted");
+    }
+  }
+
+  public boolean isTablePresent(TableName tableName) throws IOException {
+    return getTableState(tableName) != null;
+  }
+
+  /**
+   * Return all tables in given states.
+   *
+   * @param states filter by states
+   * @return tables in given states
+   * @throws IOException
+   */
+  public Set<TableName> getTablesInStates(TableState.State... states) throws 
IOException {
+    Set<TableName> rv = Sets.newHashSet();
+    for (Map.Entry<TableName, TableState.State> entry : 
tableStates.entrySet()) {
+      if (TableState.isInStates(entry.getValue(), states))
+        rv.add(entry.getKey());
+    }
+    return rv;
+  }
+
+  public TableState.State getTableState(TableName tableName) throws 
IOException {
+    TableState.State tableState = tableStates.get(tableName);
+    if (tableState == null) {
+      TableDescriptor descriptor = readDescriptor(tableName);
+      if (descriptor != null)
+        tableState = descriptor.getTableState();
+    }
+    return tableState;
+  }
+
+  /**
+   * Write descriptor in place, update cache of states.
+   * Write lock should be hold by caller.
+   *
+   * @param descriptor what to write
+   */
+  private void writeDescriptor(TableDescriptor descriptor) throws IOException {
+    TableName tableName = descriptor.getHTableDescriptor().getTableName();
+    TableState.State state = descriptor.getTableState();
+    descriptors.add(descriptor);
+    LOG.debug("Table " + tableName + " written descriptor for state " + state);
+    tableStates.put(tableName, state);
+    LOG.debug("Table " + tableName + " updated state to " + state);
+  }
+
+  /**
+   * Read current descriptor for table, update cache of states.
+   *
+   * @param table descriptor to read
+   * @return descriptor
+   * @throws IOException
+   */
+  private TableDescriptor readDescriptor(TableName tableName) throws 
IOException {
+    TableDescriptor descriptor = descriptors.getDescriptor(tableName);
+    if (descriptor == null)
+      tableStates.remove(tableName);
+    else
+      tableStates.put(tableName, descriptor.getTableState());
+    return descriptor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
index 3a86128..e584008 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
@@ -31,14 +31,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateException;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.ipc.RequestContext;
@@ -49,7 +51,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -121,8 +122,6 @@ public class CreateTableHandler extends EventHandler {
       if 
(MetaTableAccessor.tableExists(this.server.getShortCircuitConnection(), 
tableName)) {
         throw new TableExistsException(tableName);
       }
-
-      checkAndSetEnablingTable(assignmentManager, tableName);
       success = true;
     } finally {
       if (!success) {
@@ -132,47 +131,6 @@ public class CreateTableHandler extends EventHandler {
     return this;
   }
 
-  static void checkAndSetEnablingTable(final AssignmentManager 
assignmentManager,
-      final TableName tableName) throws IOException {
-    // If we have multiple client threads trying to create the table at the
-    // same time, given the async nature of the operation, the table
-    // could be in a state where hbase:meta table hasn't been updated yet in
-    // the process() function.
-    // Use enabling state to tell if there is already a request for the same
-    // table in progress. This will introduce a new zookeeper call. Given
-    // createTable isn't a frequent operation, that should be ok.
-    // TODO: now that we have table locks, re-evaluate above -- table locks 
are not enough.
-    // We could have cleared the hbase.rootdir and not zk.  How can we detect 
this case?
-    // Having to clean zk AND hdfs is awkward.
-    try {
-      if 
(!assignmentManager.getTableStateManager().setTableStateIfNotInStates(tableName,
-        ZooKeeperProtos.Table.State.ENABLING,
-        ZooKeeperProtos.Table.State.ENABLING,
-        ZooKeeperProtos.Table.State.ENABLED)) {
-        throw new TableExistsException(tableName);
-      }
-    } catch (CoordinatedStateException e) {
-      throw new IOException("Unable to ensure that the table will be" +
-        " enabling because of a ZooKeeper issue", e);
-    }
-  }
-
-  static void removeEnablingTable(final AssignmentManager assignmentManager,
-      final TableName tableName) {
-    // Try deleting the enabling node in case of error
-    // If this does not happen then if the client tries to create the table
-    // again with the same Active master
-    // It will block the creation saying TableAlreadyExists.
-    try {
-      
assignmentManager.getTableStateManager().checkAndRemoveTableState(tableName,
-        ZooKeeperProtos.Table.State.ENABLING, false);
-    } catch (CoordinatedStateException e) {
-      // Keeper exception should not happen here
-      LOG.error("Got a keeper exception while removing the ENABLING table 
znode "
-          + tableName, e);
-    }
-  }
-
   @Override
   public String toString() {
     String name = "UnknownServerName";
@@ -218,9 +176,6 @@ public class CreateTableHandler extends EventHandler {
     releaseTableLock();
     LOG.info("Table, " + this.hTableDescriptor.getTableName() + ", creation " +
         (exception == null ? "successful" : "failed. " + exception));
-    if (exception != null) {
-      removeEnablingTable(this.assignmentManager, 
this.hTableDescriptor.getTableName());
-    }
   }
 
   /**
@@ -243,9 +198,12 @@ public class CreateTableHandler extends EventHandler {
     FileSystem fs = fileSystemManager.getFileSystem();
 
     // 1. Create Table Descriptor
+    // using a copy of descriptor, table will be created enabling first
+    TableDescriptor underConstruction = new TableDescriptor(
+        this.hTableDescriptor, TableState.State.ENABLING);
     Path tempTableDir = FSUtils.getTableDir(tempdir, tableName);
     new FSTableDescriptors(this.conf).createTableDescriptorForTableDirectory(
-      tempTableDir, this.hTableDescriptor, false);
+      tempTableDir, underConstruction, false);
     Path tableDir = FSUtils.getTableDir(fileSystemManager.getRootDir(), 
tableName);
 
     // 2. Create Regions
@@ -271,20 +229,15 @@ public class CreateTableHandler extends EventHandler {
       ModifyRegionUtils.assignRegions(assignmentManager, regionInfos);
     }
 
-    // 8. Set table enabled flag up in zk.
-    try {
-      assignmentManager.getTableStateManager().setTableState(tableName,
-        ZooKeeperProtos.Table.State.ENABLED);
-    } catch (CoordinatedStateException e) {
-      throw new IOException("Unable to ensure that " + tableName + " will be" +
-        " enabled because of a ZooKeeper issue", e);
-    }
+    // 6. Enable table
+    assignmentManager.getTableStateManager().setTableState(tableName,
+            TableState.State.ENABLED);
   }
 
   /**
    * Create any replicas for the regions (the default replicas that was
    * already created is passed to the method)
-   * @param hTableDescriptor
+   * @param hTableDescriptor descriptor to use
    * @param regions default replicas
    * @return the combined list of default and non-default replicas
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 730da73..58be728 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -58,7 +59,7 @@ public class DeleteTableHandler extends TableEventHandler {
   @Override
   protected void prepareWithTableLock() throws IOException {
     // The next call fails if no such table.
-    hTableDescriptor = getTableDescriptor();
+    hTableDescriptor = getTableDescriptor().getHTableDescriptor();
   }
 
   protected void waitRegionInTransition(final List<HRegionInfo> regions)
@@ -102,62 +103,66 @@ public class DeleteTableHandler extends TableEventHandler 
{
     // 1. Wait because of region in transition
     waitRegionInTransition(regions);
 
-    try {
       // 2. Remove table from hbase:meta and HDFS
-      removeTableData(regions);
-    } finally {
-      // 3. Update table descriptor cache
-      LOG.debug("Removing '" + tableName + "' descriptor.");
-      this.masterServices.getTableDescriptors().remove(tableName);
-
-      AssignmentManager am = this.masterServices.getAssignmentManager();
-
-      // 4. Clean up regions of the table in RegionStates.
-      LOG.debug("Removing '" + tableName + "' from region states.");
-      am.getRegionStates().tableDeleted(tableName);
-
-      // 5. If entry for this table in zk, and up in AssignmentManager, remove 
it.
-      LOG.debug("Marking '" + tableName + "' as deleted.");
-      am.getTableStateManager().setDeletedTable(tableName);
-    }
+    removeTableData(regions);
 
     if (cpHost != null) {
       cpHost.postDeleteTableHandler(this.tableName);
     }
   }
 
+  private void cleanupTableState() throws IOException {
+    // 3. Update table descriptor cache
+    LOG.debug("Removing '" + tableName + "' descriptor.");
+    this.masterServices.getTableDescriptors().remove(tableName);
+
+    AssignmentManager am = this.masterServices.getAssignmentManager();
+
+    // 4. Clean up regions of the table in RegionStates.
+    LOG.debug("Removing '" + tableName + "' from region states.");
+    am.getRegionStates().tableDeleted(tableName);
+
+    // 5. If entry for this table states, remove it.
+    LOG.debug("Marking '" + tableName + "' as deleted.");
+    am.getTableStateManager().setDeletedTable(tableName);
+  }
+
   /**
    * Removes the table from hbase:meta and archives the HDFS files.
    */
   protected void removeTableData(final List<HRegionInfo> regions)
       throws IOException, CoordinatedStateException {
-    // 1. Remove regions from META
-    LOG.debug("Deleting regions from META");
-    MetaTableAccessor.deleteRegions(this.server.getShortCircuitConnection(), 
regions);
-
-    // -----------------------------------------------------------------------
-    // NOTE: At this point we still have data on disk, but nothing in 
hbase:meta
-    //       if the rename below fails, hbck will report an inconsistency.
-    // -----------------------------------------------------------------------
-
-    // 2. Move the table in /hbase/.tmp
-    MasterFileSystem mfs = this.masterServices.getMasterFileSystem();
-    Path tempTableDir = mfs.moveTableToTemp(tableName);
-
-    // 3. Archive regions from FS (temp directory)
-    FileSystem fs = mfs.getFileSystem();
-    for (HRegionInfo hri: regions) {
-      LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from 
FS");
-      HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
-          tempTableDir, HRegion.getRegionDir(tempTableDir, 
hri.getEncodedName()));
-    }
+    try {
+      // 1. Remove regions from META
+      LOG.debug("Deleting regions from META");
+      MetaTableAccessor.deleteRegions(this.server.getShortCircuitConnection(), 
regions);
+
+      // 
-----------------------------------------------------------------------
+      // NOTE: At this point we still have data on disk, but nothing in 
hbase:meta
+      //       if the rename below fails, hbck will report an inconsistency.
+      // 
-----------------------------------------------------------------------
+
+      // 2. Move the table in /hbase/.tmp
+      MasterFileSystem mfs = this.masterServices.getMasterFileSystem();
+      Path tempTableDir = mfs.moveTableToTemp(tableName);
+
+      // 3. Archive regions from FS (temp directory)
+      FileSystem fs = mfs.getFileSystem();
+      for (HRegionInfo hri : regions) {
+        LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from 
FS");
+        HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
+            tempTableDir, HRegion.getRegionDir(tempTableDir, 
hri.getEncodedName()));
+      }
 
-    // 4. Delete table directory from FS (temp directory)
-    if (!fs.delete(tempTableDir, true)) {
-      LOG.error("Couldn't delete " + tempTableDir);
-    }
+      // 4. Delete table directory from FS (temp directory)
+      if (!fs.delete(tempTableDir, true)) {
+        LOG.error("Couldn't delete " + tempTableDir);
+      }
 
-    LOG.debug("Table '" + tableName + "' archived!");
+      LOG.debug("Table '" + tableName + "' archived!");
+    } finally {
+      cleanupTableState();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
index fb7aec8..07843a7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
@@ -25,13 +25,13 @@ import java.util.concurrent.ExecutorService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.CoordinatedStateException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.constraint.ConstraintException;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -39,11 +39,10 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.BulkAssigner;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.TableLockManager;
-import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.htrace.Trace;
 
 /**
@@ -91,16 +90,11 @@ public class DisableTableHandler extends EventHandler {
       // DISABLED or ENABLED.
       //TODO: reevaluate this since we have table locks now
       if (!skipTableStateCheck) {
-        try {
-          if 
(!this.assignmentManager.getTableStateManager().setTableStateIfInStates(
-            this.tableName, ZooKeeperProtos.Table.State.DISABLING,
-            ZooKeeperProtos.Table.State.ENABLED)) {
-            LOG.info("Table " + tableName + " isn't enabled; skipping 
disable");
-            throw new TableNotEnabledException(this.tableName);
-          }
-        } catch (CoordinatedStateException e) {
-          throw new IOException("Unable to ensure that the table will be" +
-            " disabling because of a coordination engine issue", e);
+        if 
(!this.assignmentManager.getTableStateManager().setTableStateIfInStates(
+          this.tableName, TableState.State.DISABLING,
+          TableState.State.ENABLED)) {
+          LOG.info("Table " + tableName + " isn't enabled; skipping disable");
+          throw new TableNotEnabledException(this.tableName);
         }
       }
       success = true;
@@ -138,8 +132,6 @@ public class DisableTableHandler extends EventHandler {
       }
     } catch (IOException e) {
       LOG.error("Error trying to disable table " + this.tableName, e);
-    } catch (CoordinatedStateException e) {
-      LOG.error("Error trying to disable table " + this.tableName, e);
     } finally {
       releaseTableLock();
     }
@@ -155,10 +147,10 @@ public class DisableTableHandler extends EventHandler {
     }
   }
 
-  private void handleDisableTable() throws IOException, 
CoordinatedStateException {
+  private void handleDisableTable() throws IOException {
     // Set table disabling flag up in zk.
     this.assignmentManager.getTableStateManager().setTableState(this.tableName,
-      ZooKeeperProtos.Table.State.DISABLING);
+      TableState.State.DISABLING);
     boolean done = false;
     while (true) {
       // Get list of online regions that are of this table.  Regions that are
@@ -187,7 +179,7 @@ public class DisableTableHandler extends EventHandler {
     }
     // Flip the table to disabled if success.
     if (done) 
this.assignmentManager.getTableStateManager().setTableState(this.tableName,
-      ZooKeeperProtos.Table.State.DISABLED);
+      TableState.State.DISABLED);
     LOG.info("Disabled table, " + this.tableName + ", is done=" + done);
   }
 
@@ -207,7 +199,7 @@ public class DisableTableHandler extends EventHandler {
       RegionStates regionStates = assignmentManager.getRegionStates();
       for (HRegionInfo region: regions) {
         if (regionStates.isRegionInTransition(region)
-            && !regionStates.isRegionInState(region, State.FAILED_CLOSE)) {
+            && !regionStates.isRegionInState(region, 
RegionState.State.FAILED_CLOSE)) {
           continue;
         }
         final HRegionInfo hri = region;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
index b8edc0b..5f31389 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.CoordinatedStateException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -95,16 +94,8 @@ public class EnableTableHandler extends EventHandler {
         // retainAssignment is true only during recovery.  In normal case it 
is false
         if (!this.skipTableStateCheck) {
           throw new TableNotFoundException(tableName);
-        } 
-        try {
-          
this.assignmentManager.getTableStateManager().checkAndRemoveTableState(tableName,
-            ZooKeeperProtos.Table.State.ENABLING, true);
-          throw new TableNotFoundException(tableName);
-        } catch (CoordinatedStateException e) {
-          // TODO : Use HBCK to clear such nodes
-          LOG.warn("Failed to delete the ENABLING node for the table " + 
tableName
-              + ".  The table will remain unusable. Run HBCK to manually fix 
the problem.");
         }
+        
this.assignmentManager.getTableStateManager().setDeletedTable(tableName);
       }
 
       // There could be multiple client requests trying to disable or enable
@@ -112,16 +103,11 @@ public class EnableTableHandler extends EventHandler {
       // After that, no other requests can be accepted until the table reaches
       // DISABLED or ENABLED.
       if (!skipTableStateCheck) {
-        try {
-          if 
(!this.assignmentManager.getTableStateManager().setTableStateIfInStates(
-              this.tableName, ZooKeeperProtos.Table.State.ENABLING,
-              ZooKeeperProtos.Table.State.DISABLED)) {
-            LOG.info("Table " + tableName + " isn't disabled; skipping 
enable");
-            throw new TableNotDisabledException(this.tableName);
-          }
-        } catch (CoordinatedStateException e) {
-          throw new IOException("Unable to ensure that the table will be" +
-            " enabling because of a coordination engine issue", e);
+        if 
(!this.assignmentManager.getTableStateManager().setTableStateIfInStates(
+            this.tableName, TableState.State.ENABLING,
+            TableState.State.DISABLED)) {
+          LOG.info("Table " + tableName + " isn't disabled; skipping enable");
+          throw new TableNotDisabledException(this.tableName);
         }
       }
       success = true;
@@ -156,11 +142,7 @@ public class EnableTableHandler extends EventHandler {
       if (cpHost != null) {
         cpHost.postEnableTableHandler(this.tableName);
       }
-    } catch (IOException e) {
-      LOG.error("Error trying to enable the table " + this.tableName, e);
-    } catch (CoordinatedStateException e) {
-      LOG.error("Error trying to enable the table " + this.tableName, e);
-    } catch (InterruptedException e) {
+    } catch (IOException | InterruptedException e) {
       LOG.error("Error trying to enable the table " + this.tableName, e);
     } finally {
       releaseTableLock();
@@ -177,14 +159,13 @@ public class EnableTableHandler extends EventHandler {
     }
   }
 
-  private void handleEnableTable() throws IOException, 
CoordinatedStateException,
+  private void handleEnableTable() throws IOException,
       InterruptedException {
     // I could check table is disabling and if so, not enable but require
     // that user first finish disabling but that might be obnoxious.
 
-    // Set table enabling flag up in zk.
     this.assignmentManager.getTableStateManager().setTableState(this.tableName,
-      ZooKeeperProtos.Table.State.ENABLING);
+      TableState.State.ENABLING);
     boolean done = false;
     ServerManager serverManager = ((HMaster)this.server).getServerManager();
     // Get the regions of this table. We're done when all listed
@@ -236,7 +217,7 @@ public class EnableTableHandler extends EventHandler {
     if (done) {
       // Flip the table to enabled.
       this.assignmentManager.getTableStateManager().setTableState(
-        this.tableName, ZooKeeperProtos.Table.State.ENABLED);
+        this.tableName, TableState.State.ENABLED);
       LOG.info("Table '" + this.tableName
       + "' was successfully enabled. Status: done=" + done);
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
index 591a1d8..d7c40bf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -65,8 +67,9 @@ public class ModifyTableHandler extends TableEventHandler {
     // Check operation is possible on the table in its current state
     // Also checks whether the table exists
     if (masterServices.getAssignmentManager().getTableStateManager()
-        .isTableState(this.htd.getTableName(), 
ZooKeeperProtos.Table.State.ENABLED)
-        && this.htd.getRegionReplication() != 
getTableDescriptor().getRegionReplication()) {
+        .isTableState(this.htd.getTableName(), TableState.State.ENABLED)
+        && this.htd.getRegionReplication() != getTableDescriptor()
+        .getHTableDescriptor().getRegionReplication()) {
       throw new IOException("REGION_REPLICATION change is not supported for 
enabled tables");
     }
   }
@@ -79,11 +82,14 @@ public class ModifyTableHandler extends TableEventHandler {
       cpHost.preModifyTableHandler(this.tableName, this.htd);
     }
     // Update descriptor
-    HTableDescriptor oldHtd = getTableDescriptor();
-    this.masterServices.getTableDescriptors().add(this.htd);
-    deleteFamilyFromFS(hris, oldHtd.getFamiliesKeys());
-    removeReplicaColumnsIfNeeded(this.htd.getRegionReplication(), 
oldHtd.getRegionReplication(),
-        htd.getTableName());
+    HTableDescriptor oldDescriptor =
+        this.masterServices.getTableDescriptors().get(this.tableName);
+    this.masterServices.getTableDescriptors().add(htd);
+    deleteFamilyFromFS(hris, oldDescriptor.getFamiliesKeys());
+    removeReplicaColumnsIfNeeded(
+        this.htd.getRegionReplication(),
+        oldDescriptor.getRegionReplication(),
+        this.htd.getTableName());
     if (cpHost != null) {
       cpHost.postModifyTableHandler(this.tableName, this.htd);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index cae6142..6540491 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.AssignmentManager;
@@ -39,10 +40,8 @@ import org.apache.hadoop.hbase.master.DeadServer;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 
 /**
@@ -231,23 +230,23 @@ public class ServerShutdownHandler extends EventHandler {
                   continue;
                 }
                 LOG.info("Reassigning region with rs = " + rit);
-                regionStates.updateRegionState(hri, State.OFFLINE);
+                regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
               } else if (regionStates.isRegionInState(
-                  hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
-                regionStates.updateRegionState(hri, State.OFFLINE);
+                  hri, RegionState.State.SPLITTING_NEW, 
RegionState.State.MERGING_NEW)) {
+                regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
               }
               toAssignRegions.add(hri);
             } else if (rit != null) {
               if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline())
                   && am.getTableStateManager().isTableState(hri.getTable(),
-                  ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING) ||
+                  TableState.State.DISABLED, TableState.State.DISABLING) ||
                   am.getReplicasToClose().contains(hri)) {
                 // If the table was partially disabled and the RS went down, 
we should clear the RIT
                 // and remove the node for the region.
                 // The rit that we use may be stale in case the table was in 
DISABLING state
                 // but though we did assign we will not be clearing the znode 
in CLOSING state.
                 // Doing this will have no harm. See HBASE-5927
-                regionStates.updateRegionState(hri, State.OFFLINE);
+                regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
                 am.offlineDisabledRegion(hri);
               } else {
                 LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in 
transition "
@@ -323,7 +322,7 @@ public class ServerShutdownHandler extends EventHandler {
     }
     // If table is not disabled but the region is offlined,
     boolean disabled = 
assignmentManager.getTableStateManager().isTableState(hri.getTable(),
-      ZooKeeperProtos.Table.State.DISABLED);
+      TableState.State.DISABLED);
     if (disabled){
       LOG.info("The table " + hri.getTable()
           + " was disabled.  Hence not proceeding.");
@@ -336,7 +335,7 @@ public class ServerShutdownHandler extends EventHandler {
       return false;
     }
     boolean disabling = 
assignmentManager.getTableStateManager().isTableState(hri.getTable(),
-      ZooKeeperProtos.Table.State.DISABLING);
+      TableState.State.DISABLING);
     if (disabling) {
       LOG.info("The table " + hri.getTable()
           + " is disabled.  Hence not assigning region" + 
hri.getEncodedName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
index cd8fe9e..ee32a32 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -50,8 +51,8 @@ public class TableAddFamilyHandler extends TableEventHandler {
   @Override
   protected void prepareWithTableLock() throws IOException {
     super.prepareWithTableLock();
-    HTableDescriptor htd = getTableDescriptor();
-    if (htd.hasFamily(familyDesc.getName())) {
+    TableDescriptor htd = getTableDescriptor();
+    if (htd.getHTableDescriptor().hasFamily(familyDesc.getName())) {
       throw new InvalidFamilyOperationException("Family '" +
         familyDesc.getNameAsString() + "' already exists so cannot be added");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
index 330b9d8..b166be0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
@@ -50,7 +50,7 @@ public class TableDeleteFamilyHandler extends 
TableEventHandler {
   @Override
   protected void prepareWithTableLock() throws IOException {
     super.prepareWithTableLock();
-    HTableDescriptor htd = getTableDescriptor();
+    HTableDescriptor htd = getTableDescriptor().getHTableDescriptor();
     this.familyName = hasColumnFamily(htd, familyName);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
index 4f1c39d..8993840 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -40,12 +41,12 @@ import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.BulkReOpen;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.collect.Lists;
@@ -130,7 +131,7 @@ public abstract class TableEventHandler extends 
EventHandler {
       handleTableOperation(hris);
       if (eventType.isOnlineSchemaChangeSupported() && this.masterServices.
           getAssignmentManager().getTableStateManager().isTableState(
-          tableName, ZooKeeperProtos.Table.State.ENABLED)) {
+          tableName, TableState.State.ENABLED)) {
         if (reOpenAllRegions(hris)) {
           LOG.info("Completed table operation " + eventType + " on table " +
               tableName);
@@ -230,10 +231,10 @@ public abstract class TableEventHandler extends 
EventHandler {
    * @throws FileNotFoundException
    * @throws IOException
    */
-  public HTableDescriptor getTableDescriptor()
+  public TableDescriptor getTableDescriptor()
   throws FileNotFoundException, IOException {
-    HTableDescriptor htd =
-      this.masterServices.getTableDescriptors().get(tableName);
+    TableDescriptor htd =
+      this.masterServices.getTableDescriptors().getDescriptor(tableName);
     if (htd == null) {
       throw new IOException("HTableDescriptor missing for " + tableName);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
index d07d0aa..75ec79c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -49,7 +50,7 @@ public class TableModifyFamilyHandler extends 
TableEventHandler {
   @Override
   protected void prepareWithTableLock() throws IOException {
     super.prepareWithTableLock();
-    HTableDescriptor htd = getTableDescriptor();
+    HTableDescriptor htd = getTableDescriptor().getHTableDescriptor();
     hasColumnFamily(htd, familyDesc.getName());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java
index 086d1d5..6703a42 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TruncateTableHandler.java
@@ -28,15 +28,17 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateException;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@@ -93,54 +95,44 @@ public class TruncateTableHandler extends 
DeleteTableHandler {
 
     AssignmentManager assignmentManager = 
this.masterServices.getAssignmentManager();
 
-    // 1. Set table znode
-    CreateTableHandler.checkAndSetEnablingTable(assignmentManager, tableName);
-    try {
-      // 1. Create Table Descriptor
-      new FSTableDescriptors(server.getConfiguration())
-        .createTableDescriptorForTableDirectory(tempdir, 
this.hTableDescriptor, false);
-      Path tempTableDir = FSUtils.getTableDir(tempdir, this.tableName);
-      Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), this.tableName);
-
-      HRegionInfo[] newRegions;
-      if (this.preserveSplits) {
-        newRegions = regions.toArray(new HRegionInfo[regions.size()]);
-        LOG.info("Truncate will preserve " + newRegions.length + " regions");
-      } else {
-        newRegions = new HRegionInfo[1];
-        newRegions[0] = new HRegionInfo(this.tableName, null, null);
-        LOG.info("Truncate will not preserve the regions");
-      }
-
-      // 2. Create Regions
-      List<HRegionInfo> regionInfos = ModifyRegionUtils.createRegions(
-        masterServices.getConfiguration(), tempdir,
-        this.hTableDescriptor, newRegions, null);
-
-      // 3. Move Table temp directory to the hbase root location
-      if (!fs.rename(tempTableDir, tableDir)) {
-        throw new IOException("Unable to move table from temp=" + tempTableDir 
+
-          " to hbase root=" + tableDir);
-      }
-
-      // 4. Add regions to META
-      
MetaTableAccessor.addRegionsToMeta(masterServices.getShortCircuitConnection(),
-        regionInfos);
-
-      // 5. Trigger immediate assignment of the regions in round-robin fashion
-      ModifyRegionUtils.assignRegions(assignmentManager, regionInfos);
-
-      // 6. Set table enabled flag up in zk.
-      try {
-        assignmentManager.getTableStateManager().setTableState(tableName,
-          ZooKeeperProtos.Table.State.ENABLED);
-      } catch (CoordinatedStateException e) {
-        throw new IOException("Unable to ensure that " + tableName + " will 
be" +
-          " enabled because of a ZooKeeper issue", e);
-      }
-    } catch (IOException e) {
-      CreateTableHandler.removeEnablingTable(assignmentManager, tableName);
-      throw e;
+    // 1. Create Table Descriptor
+    TableDescriptor underConstruction = new TableDescriptor(
+        this.hTableDescriptor, TableState.State.ENABLING);
+    Path tempTableDir = FSUtils.getTableDir(tempdir, this.tableName);
+    new FSTableDescriptors(server.getConfiguration())
+      .createTableDescriptorForTableDirectory(tempTableDir, underConstruction, 
false);
+    Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), this.tableName);
+
+    HRegionInfo[] newRegions;
+    if (this.preserveSplits) {
+      newRegions = regions.toArray(new HRegionInfo[regions.size()]);
+      LOG.info("Truncate will preserve " + newRegions.length + " regions");
+    } else {
+      newRegions = new HRegionInfo[1];
+      newRegions[0] = new HRegionInfo(this.tableName, null, null);
+      LOG.info("Truncate will not preserve the regions");
     }
+
+    // 2. Create Regions
+    List<HRegionInfo> regionInfos = ModifyRegionUtils.createRegions(
+      masterServices.getConfiguration(), tempdir,
+      this.hTableDescriptor, newRegions, null);
+
+    // 3. Move Table temp directory to the hbase root location
+    if (!fs.rename(tempTableDir, tableDir)) {
+      throw new IOException("Unable to move table from temp=" + tempTableDir +
+        " to hbase root=" + tableDir);
+    }
+
+    // 4. Add regions to META
+    
MetaTableAccessor.addRegionsToMeta(masterServices.getShortCircuitConnection(),
+      regionInfos);
+
+    // 5. Trigger immediate assignment of the regions in round-robin fashion
+    ModifyRegionUtils.assignRegions(assignmentManager, regionInfos);
+
+    // 6. Set table enabled flag up in zk.
+    assignmentManager.getTableStateManager().setTableState(tableName,
+      TableState.State.ENABLED);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index bfa5004..e5847aa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.RequestContext;
@@ -566,14 +567,14 @@ public class SnapshotManager extends 
MasterProcedureManager implements Stoppable
     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
     AssignmentManager assignmentMgr = master.getAssignmentManager();
     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
-        ZooKeeperProtos.Table.State.ENABLED)) {
+        TableState.State.ENABLED)) {
       LOG.debug("Table enabled, starting distributed snapshot.");
       snapshotEnabledTable(snapshot);
       LOG.debug("Started snapshot: " + 
ClientSnapshotDescriptionUtils.toString(snapshot));
     }
     // For disabled table, snapshot is created by the master
     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
-        ZooKeeperProtos.Table.State.DISABLED)) {
+        TableState.State.DISABLED)) {
       LOG.debug("Table is disabled, running snapshot entirely on master.");
       snapshotDisabledTable(snapshot);
       LOG.debug("Started snapshot: " + 
ClientSnapshotDescriptionUtils.toString(snapshot));
@@ -705,8 +706,8 @@ public class SnapshotManager extends MasterProcedureManager 
implements Stoppable
 
     // Execute the restore/clone operation
     if (MetaTableAccessor.tableExists(master.getShortCircuitConnection(), 
tableName)) {
-      if (master.getAssignmentManager().getTableStateManager().isTableState(
-          TableName.valueOf(snapshot.getTable()), 
ZooKeeperProtos.Table.State.ENABLED)) {
+      if (master.getTableStateManager().isTableState(
+          TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
         throw new UnsupportedOperationException("Table '" +
             TableName.valueOf(snapshot.getTable()) + "' must be disabled in 
order to " +
             "perform a restore operation" +

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
index b11d74c..1649c4e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java
@@ -39,12 +39,14 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
@@ -374,7 +376,7 @@ public class NamespaceUpgrade implements Tool {
       HTableDescriptor newDesc = new HTableDescriptor(oldDesc);
       newDesc.setName(newTableName);
       new FSTableDescriptors(this.conf).createTableDescriptorForTableDirectory(
-        newTablePath, newDesc, true);
+        newTablePath, new TableDescriptor(newDesc, TableState.State.ENABLED), 
true);
     }
 
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
index 4417bd9..7405272 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -112,13 +113,14 @@ public class CompactionTool extends Configured implements 
Tool {
       if (isFamilyDir(fs, path)) {
         Path regionDir = path.getParent();
         Path tableDir = regionDir.getParent();
-        HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, 
tableDir);
+        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, 
tableDir);
         HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, 
regionDir);
-        compactStoreFiles(tableDir, htd, hri, path.getName(), compactOnce, 
major);
+        compactStoreFiles(tableDir, htd.getHTableDescriptor(), hri,
+            path.getName(), compactOnce, major);
       } else if (isRegionDir(fs, path)) {
         Path tableDir = path.getParent();
-        HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, 
tableDir);
-        compactRegion(tableDir, htd, path, compactOnce, major);
+        TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, 
tableDir);
+        compactRegion(tableDir, htd.getHTableDescriptor(), path, compactOnce, 
major);
       } else if (isTableDir(fs, path)) {
         compactTable(path, compactOnce, major);
       } else {
@@ -129,9 +131,9 @@ public class CompactionTool extends Configured implements 
Tool {
 
     private void compactTable(final Path tableDir, final boolean compactOnce, 
final boolean major)
         throws IOException {
-      HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, 
tableDir);
+      TableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, 
tableDir);
       for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
-        compactRegion(tableDir, htd, regionDir, compactOnce, major);
+        compactRegion(tableDir, htd.getHTableDescriptor(), regionDir, 
compactOnce, major);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 1766d08..cf5b126 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -46,6 +46,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -56,7 +59,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CoordinatedStateException;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -66,7 +68,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.TableStateManager;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -92,7 +94,6 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
@@ -112,10 +113,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.ipc.RemoteException;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
-
 /**
  * This class is responsible for splitting up a bunch of regionserver commit 
log
  * files that are no longer being written to, into new files, one per region 
for
@@ -286,12 +283,13 @@ public class HLogSplitter {
         return true;
       }
       if(csm != null) {
-        try {
-          TableStateManager tsm = csm.getTableStateManager();
-          disablingOrDisabledTables = tsm.getTablesInStates(
-            ZooKeeperProtos.Table.State.DISABLED, 
ZooKeeperProtos.Table.State.DISABLING);
-        } catch (CoordinatedStateException e) {
-          throw new IOException("Can't get disabling/disabled tables", e);
+        HConnection scc = csm.getServer().getShortCircuitConnection();
+        TableName[] tables = scc.listTableNames();
+        for (TableName table : tables) {
+          if (scc.getTableState(table)
+              .inStates(TableState.State.DISABLED, 
TableState.State.DISABLING)) {
+            disablingOrDisabledTables.add(table);
+          }
         }
       }
       int numOpenedFilesBeforeReporting = 
conf.getInt("hbase.splitlog.report.openedfiles", 3);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 0038423..7a03427 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -89,6 +89,7 @@ public class WALCellCodec implements Codec {
    * Fully prepares the codec for use.
    * @param conf {@link Configuration} to read for the user-specified codec. 
If none is specified,
    *          uses a {@link WALCellCodec}.
+   * @param cellCodecClsName name of codec
    * @param compression compression the codec should use
    * @return a {@link WALCellCodec} ready for use.
    * @throws UnsupportedOperationException if the codec cannot be instantiated

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
index 47c6ebf..d6d4f71 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import 
org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
@@ -259,7 +261,8 @@ public class SnapshotManifest {
   private void load() throws IOException {
     switch (getSnapshotFormat(desc)) {
       case SnapshotManifestV1.DESCRIPTOR_VERSION: {
-        this.htd = FSTableDescriptors.getTableDescriptorFromFs(fs, workingDir);
+        this.htd = FSTableDescriptors.getTableDescriptorFromFs(fs, workingDir)
+            .getHTableDescriptor();
         ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
         try {
           this.regionManifests =
@@ -353,7 +356,8 @@ public class SnapshotManifest {
       LOG.info("Using old Snapshot Format");
       // write a copy of descriptor to the snapshot directory
       new FSTableDescriptors(fs, rootDir)
-        .createTableDescriptorForTableDirectory(workingDir, htd, false);
+        .createTableDescriptorForTableDirectory(workingDir, new 
TableDescriptor(
+            htd, TableState.State.ENABLED), false);
     } else {
       LOG.debug("Convert to Single Snapshot Manifest");
       convertToV2SingleManifest();

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index 09749d0..bc8fc7f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import javax.annotation.Nullable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Comparator;
@@ -38,7 +39,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -92,11 +95,11 @@ public class FSTableDescriptors implements TableDescriptors 
{
    * Data structure to hold modification time and table descriptor.
    */
   private static class TableDescriptorAndModtime {
-    private final HTableDescriptor htd;
+    private final TableDescriptor td;
     private final long modtime;
 
-    TableDescriptorAndModtime(final long modtime, final HTableDescriptor htd) {
-      this.htd = htd;
+    TableDescriptorAndModtime(final long modtime, final TableDescriptor td) {
+      this.td = td;
       this.modtime = modtime;
     }
 
@@ -104,8 +107,16 @@ public class FSTableDescriptors implements 
TableDescriptors {
       return this.modtime;
     }
 
-    HTableDescriptor getTableDescriptor() {
-      return this.htd;
+    TableDescriptor getTableDescriptor() {
+      return this.td;
+    }
+
+    HTableDescriptor getHTableDescriptor() {
+      return this.td.getHTableDescriptor();
+    }
+
+    TableState.State getTableState() {
+      return this.td.getTableState();
     }
   }
 
@@ -141,12 +152,13 @@ public class FSTableDescriptors implements 
TableDescriptors {
    * to see if a newer file has been created since the cached one was read.
    */
   @Override
-  public HTableDescriptor get(final TableName tablename)
+  @Nullable
+  public TableDescriptor getDescriptor(final TableName tablename)
   throws IOException {
     invocations++;
     if (HTableDescriptor.META_TABLEDESC.getTableName().equals(tablename)) {
       cachehits++;
-      return HTableDescriptor.META_TABLEDESC;
+      return new TableDescriptor(HTableDescriptor.META_TABLEDESC, 
TableState.State.ENABLED);
     }
     // hbase:meta is already handled. If some one tries to get the descriptor 
for
     // .logs, .oldlogs or .corrupt throw an exception.
@@ -183,31 +195,62 @@ public class FSTableDescriptors implements 
TableDescriptors {
   }
 
   /**
+   * Get the current table descriptor for the given table, or null if none 
exists.
+   *
+   * Uses a local cache of the descriptor but still checks the filesystem on 
each call
+   * to see if a newer file has been created since the cached one was read.
+   */
+  @Override
+  public HTableDescriptor get(TableName tableName) throws IOException {
+    if (HTableDescriptor.META_TABLEDESC.getTableName().equals(tableName)) {
+      cachehits++;
+      return HTableDescriptor.META_TABLEDESC;
+    }
+    TableDescriptor descriptor = getDescriptor(tableName);
+    return descriptor == null ? null : descriptor.getHTableDescriptor();
+  }
+
+  /**
    * Returns a map from table name to table descriptor for all tables.
    */
   @Override
-  public Map<String, HTableDescriptor> getAll()
+  public Map<String, TableDescriptor> getAllDescriptors()
   throws IOException {
-    Map<String, HTableDescriptor> htds = new TreeMap<String, 
HTableDescriptor>();
+    Map<String, TableDescriptor> tds = new TreeMap<String, TableDescriptor>();
     List<Path> tableDirs = FSUtils.getTableDirs(fs, rootdir);
     for (Path d: tableDirs) {
-      HTableDescriptor htd = null;
+      TableDescriptor htd = null;
       try {
-        htd = get(FSUtils.getTableName(d));
+        htd = getDescriptor(FSUtils.getTableName(d));
       } catch (FileNotFoundException fnfe) {
         // inability of retrieving one HTD shouldn't stop getting the remaining
         LOG.warn("Trouble retrieving htd", fnfe);
       }
       if (htd == null) continue;
-      htds.put(htd.getTableName().getNameAsString(), htd);
+      tds.put(htd.getHTableDescriptor().getTableName().getNameAsString(), htd);
     }
-    return htds;
+    return tds;
   }
 
-  /* (non-Javadoc)
-   * @see 
org.apache.hadoop.hbase.TableDescriptors#getTableDescriptors(org.apache.hadoop.fs.FileSystem,
 org.apache.hadoop.fs.Path)
+  /**
+   * Returns a map from table name to table descriptor for all tables.
    */
   @Override
+  public Map<String, HTableDescriptor> getAll() throws IOException {
+    Map<String, HTableDescriptor> htds = new TreeMap<String, 
HTableDescriptor>();
+    Map<String, TableDescriptor> allDescriptors = getAllDescriptors();
+    for (Map.Entry<String, TableDescriptor> entry : allDescriptors
+        .entrySet()) {
+      htds.put(entry.getKey(), entry.getValue().getHTableDescriptor());
+    }
+    return htds;
+  }
+
+  /**
+    * Find descriptors by namespace.
+    * @see #get(org.apache.hadoop.hbase.TableName)
+    */
+  @Override
   public Map<String, HTableDescriptor> getByNamespace(String name)
   throws IOException {
     Map<String, HTableDescriptor> htds = new TreeMap<String, 
HTableDescriptor>();
@@ -232,20 +275,46 @@ public class FSTableDescriptors implements 
TableDescriptors {
    * and updates the local cache with it.
    */
   @Override
-  public void add(HTableDescriptor htd) throws IOException {
+  public void add(TableDescriptor htd) throws IOException {
     if (fsreadonly) {
       throw new NotImplementedException("Cannot add a table descriptor - in 
read only mode");
     }
-    if (TableName.META_TABLE_NAME.equals(htd.getTableName())) {
+    TableName tableName = htd.getHTableDescriptor().getTableName();
+    if (TableName.META_TABLE_NAME.equals(tableName)) {
       throw new NotImplementedException();
     }
-    if 
(HConstants.HBASE_NON_USER_TABLE_DIRS.contains(htd.getTableName().getNameAsString()))
 {
+    if 
(HConstants.HBASE_NON_USER_TABLE_DIRS.contains(tableName.getNameAsString())) {
       throw new NotImplementedException(
-        "Cannot add a table descriptor for a reserved subdirectory name: " + 
htd.getNameAsString());
+        "Cannot add a table descriptor for a reserved subdirectory name: "
+            + htd.getHTableDescriptor().getNameAsString());
     }
     updateTableDescriptor(htd);
-    long modtime = getTableInfoModtime(htd.getTableName());
-    this.cache.put(htd.getTableName(), new TableDescriptorAndModtime(modtime, 
htd));
+  }
+
+  /**
+   * Adds (or updates) the table descriptor to the FileSystem
+   * and updates the local cache with it.
+   */
+  @Override
+  public void add(HTableDescriptor htd) throws IOException {
+    if (fsreadonly) {
+      throw new NotImplementedException("Cannot add a table descriptor - in 
read only mode");
+    }
+    TableName tableName = htd.getTableName();
+    if (TableName.META_TABLE_NAME.equals(tableName)) {
+      throw new NotImplementedException();
+    }
+    if 
(HConstants.HBASE_NON_USER_TABLE_DIRS.contains(tableName.getNameAsString())) {
+      throw new NotImplementedException(
+          "Cannot add a table descriptor for a reserved subdirectory name: "
+              + htd.getNameAsString());
+    }
+    TableDescriptor descriptor = getDescriptor(htd.getTableName());
+    if (descriptor == null)
+      descriptor = new TableDescriptor(htd);
+    else
+      descriptor.setHTableDescriptor(htd);
+    updateTableDescriptor(descriptor);
   }
 
   /**
@@ -266,7 +335,7 @@ public class FSTableDescriptors implements TableDescriptors 
{
       }
     }
     TableDescriptorAndModtime tdm = this.cache.remove(tablename);
-    return tdm == null ? null : tdm.getTableDescriptor();
+    return tdm == null ? null : tdm.getHTableDescriptor();
   }
 
   /**
@@ -463,7 +532,7 @@ public class FSTableDescriptors implements TableDescriptors 
{
    * if it exists, bypassing the local cache.
    * Returns null if it's not found.
    */
-  public static HTableDescriptor getTableDescriptorFromFs(FileSystem fs,
+  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs,
       Path hbaseRootDir, TableName tableName) throws IOException {
     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
     return getTableDescriptorFromFs(fs, tableDir);
@@ -474,7 +543,7 @@ public class FSTableDescriptors implements TableDescriptors 
{
    * directly from the file system if it exists.
    * @throws TableInfoMissingException if there is no descriptor
    */
-  public static HTableDescriptor getTableDescriptorFromFs(FileSystem fs, Path 
tableDir)
+  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path 
tableDir)
   throws IOException {
     FileStatus status = getTableInfoPath(fs, tableDir, false);
     if (status == null) {
@@ -509,11 +578,11 @@ public class FSTableDescriptors implements 
TableDescriptors {
     if (status == null) {
       return null;
     }
-    HTableDescriptor htd = readTableDescriptor(fs, status, !fsreadonly);
-    return new TableDescriptorAndModtime(status.getModificationTime(), htd);
+    TableDescriptor td = readTableDescriptor(fs, status, !fsreadonly);
+    return new TableDescriptorAndModtime(status.getModificationTime(), td);
   }
 
-  private static HTableDescriptor readTableDescriptor(FileSystem fs, 
FileStatus status,
+  private static TableDescriptor readTableDescriptor(FileSystem fs, FileStatus 
status,
       boolean rewritePb) throws IOException {
     int len = Ints.checkedCast(status.getLen());
     byte [] content = new byte[len];
@@ -523,9 +592,9 @@ public class FSTableDescriptors implements TableDescriptors 
{
     } finally {
       fsDataInputStream.close();
     }
-    HTableDescriptor htd = null;
+    TableDescriptor td = null;
     try {
-      htd = HTableDescriptor.parseFrom(content);
+      td = TableDescriptor.parseFrom(content);
     } catch (DeserializationException e) {
       throw new IOException("content=" + Bytes.toShort(content), e);
     }
@@ -533,25 +602,28 @@ public class FSTableDescriptors implements 
TableDescriptors {
       // Convert the file over to be pb before leaving here.
       Path tableInfoDir = status.getPath().getParent();
       Path tableDir = tableInfoDir.getParent();
-      writeTableDescriptor(fs, htd, tableDir, status);
+      writeTableDescriptor(fs, td, tableDir, status);
     }
-    return htd;
+    return td;
   }
- 
+
   /**
    * Update table descriptor on the file system
    * @throws IOException Thrown if failed update.
    * @throws NotImplementedException if in read only mode
    */
-  @VisibleForTesting Path updateTableDescriptor(HTableDescriptor htd)
+  @VisibleForTesting Path updateTableDescriptor(TableDescriptor td)
   throws IOException {
     if (fsreadonly) {
       throw new NotImplementedException("Cannot update a table descriptor - in 
read only mode");
     }
-    Path tableDir = getTableDir(htd.getTableName());
-    Path p = writeTableDescriptor(fs, htd, tableDir, 
getTableInfoPath(tableDir));
+    TableName tableName = td.getHTableDescriptor().getTableName();
+    Path tableDir = getTableDir(tableName);
+    Path p = writeTableDescriptor(fs, td, tableDir, 
getTableInfoPath(tableDir));
     if (p == null) throw new IOException("Failed update");
     LOG.info("Updated tableinfo=" + p);
+    long modtime = getTableInfoModtime(tableName);
+    this.cache.put(tableName, new TableDescriptorAndModtime(modtime, td));
     return p;
   }
 
@@ -601,7 +673,7 @@ public class FSTableDescriptors implements TableDescriptors 
{
    * @return Descriptor file or null if we failed write.
    */
   private static Path writeTableDescriptor(final FileSystem fs, 
-    final HTableDescriptor htd, final Path tableDir,
+    final TableDescriptor htd, final Path tableDir,
     final FileStatus currentDescriptorFile)
   throws IOException {  
     // Get temporary dir into which we'll first write a file to avoid 
half-written file phenomenon.
@@ -632,7 +704,7 @@ public class FSTableDescriptors implements TableDescriptors 
{
       }
       tableInfoDirPath = new Path(tableInfoDir, filename);
       try {
-        writeHTD(fs, tempPath, htd);
+        writeTD(fs, tempPath, htd);
         fs.mkdirs(tableInfoDirPath.getParent());
         if (!fs.rename(tempPath, tableInfoDirPath)) {
           throw new IOException("Failed rename of " + tempPath + " to " + 
tableInfoDirPath);
@@ -656,7 +728,7 @@ public class FSTableDescriptors implements TableDescriptors 
{
     return tableInfoDirPath;
   }
   
-  private static void writeHTD(final FileSystem fs, final Path p, final 
HTableDescriptor htd)
+  private static void writeTD(final FileSystem fs, final Path p, final 
TableDescriptor htd)
   throws IOException {
     FSDataOutputStream out = fs.create(p, false);
     try {
@@ -673,23 +745,41 @@ public class FSTableDescriptors implements 
TableDescriptors {
    * Used by tests.
    * @return True if we successfully created file.
    */
-  public boolean createTableDescriptor(HTableDescriptor htd) throws 
IOException {
+  public boolean createTableDescriptor(TableDescriptor htd) throws IOException 
{
     return createTableDescriptor(htd, false);
   }
 
   /**
+   * Create new HTableDescriptor in HDFS. Happens when we are creating table.
+   * Used by tests.
+   * @return True if we successfully created file.
+   */
+  public boolean createTableDescriptor(HTableDescriptor htd) throws 
IOException {
+    return createTableDescriptor(new TableDescriptor(htd), false);
+  }
+
+  /**
    * Create new HTableDescriptor in HDFS. Happens when we are creating table. 
If
    * forceCreation is true then even if previous table descriptor is present it
    * will be overwritten
    * 
    * @return True if we successfully created file.
    */
-  public boolean createTableDescriptor(HTableDescriptor htd, boolean 
forceCreation)
+  public boolean createTableDescriptor(TableDescriptor htd, boolean 
forceCreation)
   throws IOException {
-    Path tableDir = getTableDir(htd.getTableName());
+    Path tableDir = getTableDir(htd.getHTableDescriptor().getTableName());
     return createTableDescriptorForTableDirectory(tableDir, htd, 
forceCreation);
   }
-  
+
+  /**
+   * Create tables descriptor for given HTableDescriptor. Default 
TableDescriptor state
+   * will be used (typically ENABLED).
+   */
+  public boolean createTableDescriptor(HTableDescriptor htd, boolean 
forceCreation)
+      throws IOException {
+    return createTableDescriptor(new TableDescriptor(htd), forceCreation);
+  }
+
   /**
    * Create a new HTableDescriptor in HDFS in the specified table directory. 
Happens when we create
    * a new table or snapshot a table.
@@ -702,7 +792,7 @@ public class FSTableDescriptors implements TableDescriptors 
{
    * @throws IOException if a filesystem error occurs
    */
   public boolean createTableDescriptorForTableDirectory(Path tableDir,
-      HTableDescriptor htd, boolean forceCreation) throws IOException {
+      TableDescriptor htd, boolean forceCreation) throws IOException {
     if (fsreadonly) {
       throw new NotImplementedException("Cannot create a table descriptor - in 
read only mode");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 017153a..d5cb439 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.URI;
@@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
@@ -107,7 +108,6 @@ import 
org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
 import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.io.IOUtils;
@@ -953,9 +953,9 @@ public class HBaseFsck extends Configured {
         modTInfo = new TableInfo(tableName);
         tablesInfo.put(tableName, modTInfo);
         try {
-          HTableDescriptor htd =
+          TableDescriptor htd =
               FSTableDescriptors.getTableDescriptorFromFs(fs, hbaseRoot, 
tableName);
-          modTInfo.htds.add(htd);
+          modTInfo.htds.add(htd.getHTableDescriptor());
         } catch (IOException ioe) {
           if (!orphanTableDirs.containsKey(tableName)) {
             LOG.warn("Unable to read .tableinfo from " + hbaseRoot, ioe);
@@ -1009,7 +1009,7 @@ public class HBaseFsck extends Configured {
     for (String columnfamimly : columns) {
       htd.addFamily(new HColumnDescriptor(columnfamimly));
     }
-    fstd.createTableDescriptor(htd, true);
+    fstd.createTableDescriptor(new TableDescriptor(htd, 
TableState.State.ENABLED), true);
     return true;
   }
 
@@ -1057,7 +1057,7 @@ public class HBaseFsck extends Configured {
           if (tableName.equals(htds[j].getTableName())) {
             HTableDescriptor htd = htds[j];
             LOG.info("fixing orphan table: " + tableName + " from cache");
-            fstd.createTableDescriptor(htd, true);
+            fstd.createTableDescriptor(new TableDescriptor(htd, 
TableState.State.ENABLED), true);
             j++;
             iter.remove();
           }
@@ -1382,22 +1382,16 @@ public class HBaseFsck extends Configured {
    * @throws IOException
    */
   private void loadDisabledTables()
-  throws ZooKeeperConnectionException, IOException {
+  throws IOException {
     HConnectionManager.execute(new HConnectable<Void>(getConf()) {
       @Override
       public Void connect(HConnection connection) throws IOException {
-        ZooKeeperWatcher zkw = createZooKeeperWatcher();
-        try {
-          for (TableName tableName :
-              ZKTableStateClientSideReader.getDisabledOrDisablingTables(zkw)) {
-            disabledTables.add(tableName);
+        TableName[] tables = connection.listTableNames();
+        for (TableName table : tables) {
+          if (connection.getTableState(table)
+              .inStates(TableState.State.DISABLED, 
TableState.State.DISABLING)) {
+            disabledTables.add(table);
           }
-        } catch (KeeperException ke) {
-          throw new IOException(ke);
-        } catch (InterruptedException e) {
-          throw new InterruptedIOException();
-        } finally {
-          zkw.close();
         }
         return null;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cc5d190/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
index 98eb7e2..e910be5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
@@ -156,7 +156,8 @@ class HMerge {
 
       this.rootDir = FSUtils.getRootDir(conf);
       Path tabledir = FSUtils.getTableDir(this.rootDir, tableName);
-      this.htd = FSTableDescriptors.getTableDescriptorFromFs(this.fs, 
tabledir);
+      this.htd = FSTableDescriptors.getTableDescriptorFromFs(this.fs, tabledir)
+          .getHTableDescriptor();
       String logname = "merge_" + System.currentTimeMillis() + 
HConstants.HREGION_LOGDIR_NAME;
 
       this.hlog = HLogFactory.createHLog(fs, tabledir, logname, conf);

Reply via email to