http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb13ffd8/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index bfa332c..37cdd21 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.query;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
+import static org.apache.hadoop.hbase.HColumnDescriptor.REPLICATION_SCOPE;
+import static org.apache.hadoop.hbase.HColumnDescriptor.KEEP_DELETED_CELLS;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
@@ -62,6 +64,7 @@ import static 
org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks;
 import static org.apache.phoenix.util.UpgradeUtil.getSysCatalogSnapshotName;
 import static org.apache.phoenix.util.UpgradeUtil.moveChildLinks;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
+import static org.apache.phoenix.util.UpgradeUtil.syncTableAndIndexProperties;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
@@ -101,11 +104,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.NamespaceNotFoundException;
 import org.apache.hadoop.hbase.TableExistsException;
@@ -775,62 +780,94 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             PTableType tableType, Map<String, Object> tableProps, 
List<Pair<byte[], Map<String, Object>>> families,
             byte[][] splits, boolean isNamespaceMapped) throws SQLException {
         String defaultFamilyName = 
(String)tableProps.remove(PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME);
-        HTableDescriptor tableDescriptor = (existingDesc != null) ? new 
HTableDescriptor(existingDesc)
-        : new HTableDescriptor(physicalTableName);
+        HTableDescriptor newTableDescriptor = (existingDesc != null) ? new 
HTableDescriptor(existingDesc)
+        : new HTableDescriptor(TableName.valueOf(physicalTableName));
+
+        HColumnDescriptor dataTableColDescForIndexTablePropSyncing = null;
+        if (tableType == PTableType.INDEX || 
MetaDataUtil.isViewIndex(Bytes.toString(physicalTableName))) {
+            byte[] defaultFamilyBytes =
+                    defaultFamilyName == null ? 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName);
+
+            final HTableDescriptor baseTableDesc;
+            if (MetaDataUtil.isViewIndex(Bytes.toString(physicalTableName))) {
+                // Handles indexes created on views for single-tenant tables 
and
+                // global indexes created on views of multi-tenant tables
+                baseTableDesc = 
this.getTableDescriptor(Bytes.toBytes(MetaDataUtil.getViewIndexUserTableName(Bytes.toString(physicalTableName))));
+            } else if (existingDesc == null) {
+                // Global/local index creation on top of a physical base table
+                baseTableDesc = 
this.getTableDescriptor(SchemaUtil.getPhysicalTableName(
+                        Bytes.toBytes((String) 
tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME)), isNamespaceMapped)
+                        .getName());
+            } else {
+                // In case this a local index created on a view of a 
multi-tenant table, the
+                // DATA_TABLE_NAME points to the name of the view instead of 
the physical base table
+                baseTableDesc = existingDesc;
+            }
+            dataTableColDescForIndexTablePropSyncing = 
baseTableDesc.getFamily(defaultFamilyBytes);
+            // It's possible that the table has specific column families and 
none of them are declared
+            // to be the DEFAULT_COLUMN_FAMILY, so we choose the first column 
family for syncing properties
+            if (dataTableColDescForIndexTablePropSyncing == null) {
+                dataTableColDescForIndexTablePropSyncing = 
baseTableDesc.getColumnFamilies()[0];
+            }
+        }
         // By default, do not automatically rebuild/catch up an index on a 
write failure
+        // Add table-specific properties to the table descriptor
         for (Entry<String,Object> entry : tableProps.entrySet()) {
             String key = entry.getKey();
             if (!TableProperty.isPhoenixTableProperty(key)) {
                 Object value = entry.getValue();
-                tableDescriptor.setValue(key, value == null ? null : 
value.toString());
+                newTableDescriptor.setValue(key, value == null ? null : 
value.toString());
             }
         }
-        if (families.isEmpty()) {
-            if (tableType != PTableType.VIEW) {
-                byte[] defaultFamilyByes = defaultFamilyName == null ? 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName);
-                // Add dummy column family so we have key values for tables 
that
-                HColumnDescriptor columnDescriptor = 
generateColumnFamilyDescriptor(new 
Pair<byte[],Map<String,Object>>(defaultFamilyByes,Collections.<String,Object>emptyMap()),
 tableType);
-                tableDescriptor.addFamily(columnDescriptor);
-            }
-        } else {
-            for (Pair<byte[],Map<String,Object>> family : families) {
-                // If family is only in phoenix description, add it. 
otherwise, modify its property accordingly.
-                byte[] familyByte = family.getFirst();
-                if (tableDescriptor.getFamily(familyByte) == null) {
-                    if (tableType == PTableType.VIEW) {
-                        String fullTableName = 
Bytes.toString(physicalTableName);
-                        throw new ReadOnlyTableException(
-                                "The HBase column families for a read-only 
table must already exist",
-                                
SchemaUtil.getSchemaNameFromFullName(fullTableName),
-                                
SchemaUtil.getTableNameFromFullName(fullTableName),
-                                Bytes.toString(familyByte));
-                    }
-                    HColumnDescriptor columnDescriptor = 
generateColumnFamilyDescriptor(family, tableType);
-                    tableDescriptor.addFamily(columnDescriptor);
-                } else {
-                    if (tableType != PTableType.VIEW) {
-                        HColumnDescriptor columnDescriptor = 
tableDescriptor.getFamily(familyByte);
-                        if (columnDescriptor == null) {
-                            throw new IllegalArgumentException("Unable to find 
column descriptor with family name " + Bytes.toString(family.getFirst()));
-                        }
-                        modifyColumnFamilyDescriptor(columnDescriptor, 
family.getSecond());
+
+        Map<String, Object> syncedProps = 
MetaDataUtil.getSyncedProps(dataTableColDescForIndexTablePropSyncing);
+        // Add column family-specific properties to the table descriptor
+        for (Pair<byte[],Map<String,Object>> family : families) {
+            // If family is only in phoenix description, add it. otherwise, 
modify its property accordingly.
+            byte[] familyByte = family.getFirst();
+            if (newTableDescriptor.getFamily(familyByte) == null) {
+                if (tableType == PTableType.VIEW) {
+                    String fullTableName = Bytes.toString(physicalTableName);
+                    throw new ReadOnlyTableException(
+                            "The HBase column families for a read-only table 
must already exist",
+                            
SchemaUtil.getSchemaNameFromFullName(fullTableName),
+                            SchemaUtil.getTableNameFromFullName(fullTableName),
+                            Bytes.toString(familyByte));
+                }
+
+                HColumnDescriptor columnDescriptor = 
generateColumnFamilyDescriptor(family, tableType);
+                // Keep certain index column family properties in sync with 
the base table
+                if ((tableType == PTableType.INDEX || 
MetaDataUtil.isViewIndex(Bytes.toString(physicalTableName))) &&
+                        (syncedProps != null && !syncedProps.isEmpty())) {
+                    HColumnDescriptor newColFamDesc = new 
HColumnDescriptor(columnDescriptor);
+                    modifyColumnFamilyDescriptor(newColFamDesc, syncedProps);
+                    columnDescriptor = new HColumnDescriptor(newColFamDesc);
+                }
+                newTableDescriptor.addFamily(columnDescriptor);
+            } else {
+                if (tableType != PTableType.VIEW) {
+                    HColumnDescriptor columnDescriptor = 
newTableDescriptor.getFamily(familyByte);
+                    if (columnDescriptor == null) {
+                        throw new IllegalArgumentException("Unable to find 
column descriptor with family name " + Bytes.toString(family.getFirst()));
                     }
+                    modifyColumnFamilyDescriptor(columnDescriptor, 
family.getSecond());
+                    newTableDescriptor.modifyFamily(columnDescriptor);
                 }
             }
         }
-        addCoprocessors(physicalTableName, tableDescriptor, tableType, 
tableProps);
+        addCoprocessors(physicalTableName, newTableDescriptor, tableType, 
tableProps);
 
         // PHOENIX-3072: Set index priority if this is a system table or index 
table
         if (tableType == PTableType.SYSTEM) {
-            tableDescriptor.setValue(QueryConstants.PRIORITY,
+            newTableDescriptor.setValue(QueryConstants.PRIORITY,
                     
String.valueOf(PhoenixRpcSchedulerFactory.getMetadataPriority(config)));
         } else if (tableType == PTableType.INDEX // Global, mutable index
-                && !isLocalIndexTable(tableDescriptor.getFamiliesKeys())
+                && !isLocalIndexTable(newTableDescriptor.getFamiliesKeys())
                 && 
!Boolean.TRUE.equals(tableProps.get(PhoenixDatabaseMetaData.IMMUTABLE_ROWS))) {
-            tableDescriptor.setValue(QueryConstants.PRIORITY,
+            newTableDescriptor.setValue(QueryConstants.PRIORITY,
                     
String.valueOf(PhoenixRpcSchedulerFactory.getIndexPriority(config)));
         }
-        return tableDescriptor;
+        return newTableDescriptor;
     }
 
     private boolean isLocalIndexTable(Collection<byte[]> families) {
@@ -1851,25 +1888,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     @Override
     public MetaDataMutationResult addColumn(final List<Mutation> 
tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> 
stmtProperties, Set<String> colFamiliesForPColumnsToBeAdded, List<PColumn> 
columns) throws SQLException {
         List<Pair<byte[], Map<String, Object>>> families = new 
ArrayList<>(stmtProperties.size());
-        Map<String, Object> tableProps = new HashMap<String, Object>();
+        Map<String, Object> tableProps = new HashMap<>();
         Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
-        Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
         boolean nonTxToTx = false;
-        Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = 
separateAndValidateProperties(table, stmtProperties, 
colFamiliesForPColumnsToBeAdded, tableProps);
-        HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
-        HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
+
+        Map<HTableDescriptor, HTableDescriptor> oldToNewTableDescriptors =
+                separateAndValidateProperties(table, stmtProperties, 
colFamiliesForPColumnsToBeAdded, tableProps);
+        Set<HTableDescriptor> origTableDescriptors = new 
HashSet<>(oldToNewTableDescriptors.keySet());
+
+        HTableDescriptor baseTableOrigDesc = 
this.getTableDescriptor(table.getPhysicalName().getBytes());
+        HTableDescriptor tableDescriptor = 
oldToNewTableDescriptors.get(baseTableOrigDesc);
+
         if (tableDescriptor != null) {
             tableDescriptors = Sets.newHashSetWithExpectedSize(3 + 
table.getIndexes().size());
-            origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + 
table.getIndexes().size());
-            tableDescriptors.add(tableDescriptor);
-            origTableDescriptors.add(origTableDescriptor);
             nonTxToTx = 
Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
             /*
              * If the table was transitioned from non transactional to 
transactional, we need
              * to also transition the index tables.
              */
             if (nonTxToTx) {
-                updateDescriptorForTx(table, tableProps, tableDescriptor, 
Boolean.TRUE.toString(), tableDescriptors, origTableDescriptors);
+                updateDescriptorForTx(table, tableProps, tableDescriptor, 
Boolean.TRUE.toString(),
+                        tableDescriptors, origTableDescriptors, 
oldToNewTableDescriptors);
+                tableDescriptors.add(tableDescriptor);
+            } else {
+                tableDescriptors = new 
HashSet<>(oldToNewTableDescriptors.values());
             }
         }
 
@@ -1963,67 +2005,101 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return result;
     }
+
     private void updateDescriptorForTx(PTable table, Map<String, Object> 
tableProps, HTableDescriptor tableDescriptor,
-            String txValue, Set<HTableDescriptor> descriptorsToUpdate, 
Set<HTableDescriptor> origDescriptors) throws SQLException {
+            String txValue, Set<HTableDescriptor> descriptorsToUpdate, 
Set<HTableDescriptor> origDescriptors,
+            Map<HTableDescriptor, HTableDescriptor> oldToNewTableDescriptors) 
throws SQLException {
         byte[] physicalTableName = table.getPhysicalName().getBytes();
         try (HBaseAdmin admin = getAdmin()) {
             setTransactional(tableDescriptor, table.getType(), txValue, 
tableProps);
             Map<String, Object> indexTableProps;
             if (txValue == null) {
-                indexTableProps = Collections.<String,Object>emptyMap();
+                indexTableProps = Collections.emptyMap();
             } else {
                 indexTableProps = Maps.newHashMapWithExpectedSize(1);
                 
indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.valueOf(txValue));
                 
indexTableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, 
tableProps.get(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
             }
             for (PTable index : table.getIndexes()) {
-                HTableDescriptor indexDescriptor = 
admin.getTableDescriptor(index.getPhysicalName().getBytes());
-                origDescriptors.add(indexDescriptor);
-                indexDescriptor = new HTableDescriptor(indexDescriptor);
-                descriptorsToUpdate.add(indexDescriptor);
+                HTableDescriptor origIndexDesc = admin.getTableDescriptor(
+                        TableName.valueOf(index.getPhysicalName().getBytes()));
+                HTableDescriptor intermedIndexDesc = origIndexDesc;
+                // If we already wished to make modifications to this index 
table descriptor previously, we use the updated
+                // table descriptor to carry out further modifications
+                // See {@link 
ConnectionQueryServicesImpl#separateAndValidateProperties(PTable, Map, Set, 
Map)}
+                if (origDescriptors.contains(origIndexDesc)) {
+                    intermedIndexDesc = 
oldToNewTableDescriptors.get(origIndexDesc);
+                    // Remove any previous modification for this table 
descriptor because we will add
+                    // the combined modification done in this method as well
+                    descriptorsToUpdate.remove(intermedIndexDesc);
+                } else {
+                    origDescriptors.add(origIndexDesc);
+                }
+                HTableDescriptor newIndexDescriptor = new 
HTableDescriptor(intermedIndexDesc);
                 if (index.getColumnFamilies().isEmpty()) {
                     byte[] dataFamilyName = 
SchemaUtil.getEmptyColumnFamily(table);
                     byte[] indexFamilyName = 
SchemaUtil.getEmptyColumnFamily(index);
-                    HColumnDescriptor indexColDescriptor = 
indexDescriptor.getFamily(indexFamilyName);
+                    HColumnDescriptor indexColDescriptor = 
newIndexDescriptor.getFamily(indexFamilyName);
                     HColumnDescriptor tableColDescriptor = 
tableDescriptor.getFamily(dataFamilyName);
                     
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                    
indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, 
tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
+                    
indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL,
+                            
tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
+                    newIndexDescriptor.removeFamily(indexFamilyName);
+                    newIndexDescriptor.addFamily(indexColDescriptor);
                 } else {
                     for (PColumnFamily family : index.getColumnFamilies()) {
                         byte[] familyName = family.getName().getBytes();
-                        
indexDescriptor.getFamily(familyName).setMaxVersions(tableDescriptor.getFamily(familyName).getMaxVersions());
-                        HColumnDescriptor indexColDescriptor = 
indexDescriptor.getFamily(familyName);
+                        HColumnDescriptor indexColDescriptor = 
newIndexDescriptor.getFamily(familyName);
                         HColumnDescriptor tableColDescriptor = 
tableDescriptor.getFamily(familyName);
                         
indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                        
indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, 
tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
+                        
indexColDescriptor.setValue(Bytes.toBytes(PhoenixTransactionContext.PROPERTY_TTL),
+                                
tableColDescriptor.getValue(Bytes.toBytes(PhoenixTransactionContext.PROPERTY_TTL)));
+                        newIndexDescriptor.removeFamily(familyName);
+                        newIndexDescriptor.addFamily(indexColDescriptor);
                     }
                 }
-                setTransactional(indexDescriptor, index.getType(), txValue, 
indexTableProps);
+                setTransactional(newIndexDescriptor, index.getType(), txValue, 
indexTableProps);
+                descriptorsToUpdate.add(newIndexDescriptor);
             }
             try {
-                HTableDescriptor indexDescriptor = 
admin.getTableDescriptor(MetaDataUtil.getViewIndexPhysicalName(physicalTableName));
-                origDescriptors.add(indexDescriptor);
-                indexDescriptor = new HTableDescriptor(indexDescriptor);
-                descriptorsToUpdate.add(indexDescriptor);
-                setSharedIndexMaxVersion(table, tableDescriptor, 
indexDescriptor);
-                setTransactional(indexDescriptor, PTableType.INDEX, txValue, 
indexTableProps);
+                HTableDescriptor origIndexDesc = admin.getTableDescriptor(
+                        
TableName.valueOf(MetaDataUtil.getViewIndexPhysicalName(physicalTableName)));
+                HTableDescriptor intermedIndexDesc = origIndexDesc;
+                if (origDescriptors.contains(origIndexDesc)) {
+                    intermedIndexDesc = 
oldToNewTableDescriptors.get(origIndexDesc);
+                    descriptorsToUpdate.remove(intermedIndexDesc);
+                } else {
+                    origDescriptors.add(origIndexDesc);
+                }
+                HTableDescriptor newIndexDescriptor = new 
HTableDescriptor(intermedIndexDesc);
+                setSharedIndexMaxVersion(table, tableDescriptor, 
newIndexDescriptor);
+                setTransactional(newIndexDescriptor, PTableType.INDEX, 
txValue, indexTableProps);
+                descriptorsToUpdate.add(newIndexDescriptor);
             } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
                 // Ignore, as we may never have created a view index table
             }
             try {
-                HTableDescriptor indexDescriptor = 
admin.getTableDescriptor(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName));
-                origDescriptors.add(indexDescriptor);
-                indexDescriptor = new HTableDescriptor(indexDescriptor);
-                descriptorsToUpdate.add(indexDescriptor);
-                setSharedIndexMaxVersion(table, tableDescriptor, 
indexDescriptor);
-                setTransactional(indexDescriptor, PTableType.INDEX, txValue, 
indexTableProps);
+                HTableDescriptor origIndexDesc = admin.getTableDescriptor(
+                        
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(physicalTableName)));
+                HTableDescriptor intermedIndexDesc = origIndexDesc;
+                if (origDescriptors.contains(origIndexDesc)) {
+                    intermedIndexDesc = 
oldToNewTableDescriptors.get(origIndexDesc);
+                    descriptorsToUpdate.remove(intermedIndexDesc);
+                } else {
+                    origDescriptors.add(origIndexDesc);
+                }
+                HTableDescriptor newIndexDescriptor = new 
HTableDescriptor(intermedIndexDesc);
+                setSharedIndexMaxVersion(table, tableDescriptor, 
newIndexDescriptor);
+                setTransactional(newIndexDescriptor, PTableType.INDEX, 
txValue, indexTableProps);
+                descriptorsToUpdate.add(newIndexDescriptor);
             } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
-                // Ignore, as we may never have created a view index table
+                // Ignore, as we may never have created a local index
             }
         } catch (IOException e) {
             throw ServerUtil.parseServerException(e);
         }
     }
+
     private void setSharedIndexMaxVersion(PTable table, HTableDescriptor 
tableDescriptor,
             HTableDescriptor indexDescriptor) {
         if (table.getColumnFamilies().isEmpty()) {
@@ -2074,8 +2150,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, 
tableType, tableProps);
     }
 
-    private Pair<HTableDescriptor,HTableDescriptor> 
separateAndValidateProperties(PTable table, Map<String, List<Pair<String, 
Object>>> properties,
-      Set<String> colFamiliesForPColumnsToBeAdded, Map<String, Object> 
tableProps) throws SQLException {
+    private Map<HTableDescriptor, HTableDescriptor> 
separateAndValidateProperties(PTable table,
+            Map<String, List<Pair<String, Object>>> properties, Set<String> 
colFamiliesForPColumnsToBeAdded,
+             Map<String, Object> tableProps) throws SQLException {
         Map<String, Map<String, Object>> stmtFamiliesPropsMap = new 
HashMap<>(properties.size());
         Map<String,Object> commonFamilyProps = new HashMap<>();
         boolean addingColumns = colFamiliesForPColumnsToBeAdded != null && 
!colFamiliesForPColumnsToBeAdded.isEmpty();
@@ -2085,11 +2162,13 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         boolean willBeTransactional = false;
         boolean isOrWillBeTransactional = isTransactional;
         Integer newTTL = null;
+        Integer newReplicationScope = null;
+        KeepDeletedCells newKeepDeletedCells = null;
         TransactionFactory.Provider txProvider = null;
         for (String family : properties.keySet()) {
             List<Pair<String, Object>> propsList = properties.get(family);
             if (propsList != null && propsList.size() > 0) {
-                Map<String, Object> colFamilyPropsMap = new HashMap<String, 
Object>(propsList.size());
+                Map<String, Object> colFamilyPropsMap = new 
HashMap<>(propsList.size());
                 for (Pair<String, Object> prop : propsList) {
                     String propName = prop.getFirst();
                     Object propValue = prop.getSecond();
@@ -2118,10 +2197,15 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             TableProperty tableProp = 
TableProperty.valueOf(propName);
                             tableProp.validate(true, 
!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
                             if (propName.equals(TTL)) {
-                                newTTL = ((Number)prop.getSecond()).intValue();
+                                if (table.getType() == PTableType.INDEX) {
+                                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX)
+                                            .setMessage("Property: " + 
propName).build()
+                                            .buildException();
+                                }
+                                newTTL = ((Number)propValue).intValue();
                                 // Even though TTL is really a HColumnProperty 
we treat it specially.
                                 // We enforce that all column families have 
the same TTL.
-                                commonFamilyProps.put(propName, 
prop.getSecond());
+                                commonFamilyProps.put(propName, propValue);
                             } else if 
(propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && 
Boolean.TRUE.equals(propValue)) {
                                 willBeTransactional = isOrWillBeTransactional 
= true;
                                 
tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue);
@@ -2133,8 +2217,28 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             }
                         } else {
                             if (MetaDataUtil.isHColumnProperty(propName)) {
+                                if (table.getType() == PTableType.INDEX && 
MetaDataUtil.propertyNotAllowedToBeOutOfSync(propName)) {
+                                    // We disallow index tables from 
overriding TTL, KEEP_DELETED_CELLS and REPLICATION_SCOPE,
+                                    // in order to avoid situations where 
indexes are not in sync with their data table
+                                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX)
+                                            .setMessage("Property: " + 
propName).build()
+                                            .buildException();
+                                }
                                 if 
(family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
+                                    if (propName.equals(KEEP_DELETED_CELLS)) {
+                                        newKeepDeletedCells =
+                                                
Boolean.valueOf(propValue.toString()) ? KeepDeletedCells.TRUE : 
KeepDeletedCells.FALSE;
+                                    }
+                                    if (propName.equals(REPLICATION_SCOPE)) {
+                                        newReplicationScope = 
((Number)propValue).intValue();
+                                    }
                                     commonFamilyProps.put(propName, propValue);
+                                } else if 
(MetaDataUtil.propertyNotAllowedToBeOutOfSync(propName)) {
+                                    // Don't allow specifying column families 
for TTL, KEEP_DELETED_CELLS and REPLICATION_SCOPE.
+                                    // These properties can only be applied 
for all column families of a table and can't be column family specific.
+                                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY)
+                                            .setMessage("Property: " + 
propName).build()
+                                            .buildException();
                                 } else {
                                     colFamilyPropsMap.put(propName, propValue);
                                 }
@@ -2175,7 +2279,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             if (!addingColumns) {
                 // Add the common family props to all existing column families
                 for (String existingColFamily : existingColumnFamilies) {
-                    Map<String, Object> m = new HashMap<String, 
Object>(commonFamilyProps.size());
+                    Map<String, Object> m = new 
HashMap<>(commonFamilyProps.size());
                     m.putAll(commonFamilyProps);
                     allFamiliesProps.put(existingColFamily, m);
                 }
@@ -2184,7 +2288,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 for (String colFamily : colFamiliesForPColumnsToBeAdded) {
                     if (colFamily != null) {
                         // only set properties for key value columns
-                        Map<String, Object> m = new HashMap<String, 
Object>(commonFamilyProps.size());
+                        Map<String, Object> m = new 
HashMap<>(commonFamilyProps.size());
                         m.putAll(commonFamilyProps);
                         allFamiliesProps.put(colFamily, m);
                     } else if (isAddingPkColOnly) {
@@ -2249,30 +2353,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
         HTableDescriptor newTableDescriptor = null;
         HTableDescriptor origTableDescriptor = null;
+        // Store all old to new table descriptor mappings for the table as 
well as its global indexes
+        Map<HTableDescriptor, HTableDescriptor> 
tableAndIndexDescriptorMappings = Collections.emptyMap();
         if (!allFamiliesProps.isEmpty() || !tableProps.isEmpty()) {
-            byte[] tableNameBytes = 
Bytes.toBytes(table.getPhysicalName().getString());
-            HTableDescriptor existingTableDescriptor = origTableDescriptor = 
getTableDescriptor(tableNameBytes);
+            tableAndIndexDescriptorMappings = 
Maps.newHashMapWithExpectedSize(3 + table.getIndexes().size());
+            HTableDescriptor existingTableDescriptor = origTableDescriptor = 
this.getTableDescriptor(table.getPhysicalName().getBytes());
             newTableDescriptor = new HTableDescriptor(existingTableDescriptor);
             if (!tableProps.isEmpty()) {
-                // add all the table properties to the existing table 
descriptor
+                // add all the table properties to the new table descriptor
                 for (Entry<String, Object> entry : tableProps.entrySet()) {
                     newTableDescriptor.setValue(entry.getKey(), 
entry.getValue() != null ? entry.getValue().toString() : null);
                 }
             }
             if (addingColumns) {
-                // Make sure that all the CFs of the table have the same TTL 
as the empty CF.
-                setTTLForNewCFs(allFamiliesProps, table, newTableDescriptor, 
newTTL);
-            }
-            // Set TTL on all table column families, even if they're not 
referenced here
-            if (newTTL != null) {
-                for (PColumnFamily family : table.getColumnFamilies()) {
-                    if 
(!allFamiliesProps.containsKey(family.getName().getString())) {
-                        Map<String,Object> familyProps = 
Maps.newHashMapWithExpectedSize(1);
-                        familyProps.put(TTL, newTTL);
-                        allFamiliesProps.put(family.getName().getString(), 
familyProps);
-                    }
-                }
+                // Make sure that TTL, KEEP_DELETED_CELLS and 
REPLICATION_SCOPE for the new column family to be added stays in sync
+                // with the table's existing column families. Note that we use 
the new values for these properties in case we are
+                // altering their values. We also propagate these altered 
values to existing column families and indexes on the table below
+                setSyncedPropsForNewColumnFamilies(allFamiliesProps, table, 
newTableDescriptor, newTTL, newKeepDeletedCells, newReplicationScope);
+            }
+            if (newTTL != null || newKeepDeletedCells != null || 
newReplicationScope != null) {
+                // Set properties to be kept in sync on all table column 
families of this table, even if they are not referenced here
+                
setSyncedPropsForUnreferencedColumnFamilies(this.getTableDescriptor(table.getPhysicalName().getBytes()),
+                        allFamiliesProps, newTTL, newKeepDeletedCells, 
newReplicationScope);
             }
+
             Integer defaultTxMaxVersions = null;
             if (isOrWillBeTransactional) {
                 // Calculate default for max versions
@@ -2300,19 +2404,18 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         }
                     }
                 }
-            }
-            // Set Tephra's TTL property based on HBase property if we're
-            // transitioning to become transactional or setting TTL on
-            // an already transactional table.
-            if (isOrWillBeTransactional) {
+                // Set Tephra's TTL property based on HBase property if we're
+                // transitioning to become transactional or setting TTL on
+                // an already transactional table.
                 int ttl = getTTL(table, newTableDescriptor, newTTL);
                 if (ttl != HColumnDescriptor.DEFAULT_TTL) {
                     for (Map.Entry<String, Map<String, Object>> entry : 
allFamiliesProps.entrySet()) {
                         Map<String, Object> props = entry.getValue();
                         if (props == null) {
-                            props = new HashMap<String, Object>();
+                            allFamiliesProps.put(entry.getKey(), new 
HashMap<String, Object>());
+                            props = allFamiliesProps.get(entry.getKey());
                         } else {
-                            props = new HashMap<String, Object>(props);
+                            props = new HashMap<>(props);
                         }
                         props.put(PhoenixTransactionContext.PROPERTY_TTL, ttl);
                         // Remove HBase TTL if we're not transitioning an 
existing table to become transactional
@@ -2339,13 +2442,25 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     newTableDescriptor.addFamily(colDescriptor);
                 } else {
                     modifyColumnFamilyDescriptor(colDescriptor, familyProps);
+                    newTableDescriptor.removeFamily(cf);
+                    newTableDescriptor.addFamily(colDescriptor);
                 }
                 if (isOrWillBeTransactional) {
                     checkTransactionalVersionsValue(colDescriptor);
                 }
             }
         }
-        return new Pair<>(origTableDescriptor, newTableDescriptor);
+        if (origTableDescriptor != null && newTableDescriptor != null) {
+            // Add the table descriptor mapping for the base table
+            tableAndIndexDescriptorMappings.put(origTableDescriptor, 
newTableDescriptor);
+        }
+
+        Map<String, Object> applyPropsToAllIndexColFams = 
getNewSyncedPropsMap(newTTL, newKeepDeletedCells, newReplicationScope);
+        // Copy properties that need to be synced from the default column 
family of the base table to
+        // the column families of each of its indexes (including indexes on 
this base table's views)
+        // and store those table descriptor mappings as well
+        setSyncedPropertiesForTableIndexes(table, 
tableAndIndexDescriptorMappings, applyPropsToAllIndexColFams);
+        return tableAndIndexDescriptorMappings;
     }
 
     private void checkTransactionalVersionsValue(HColumnDescriptor 
colDescriptor) throws SQLException {
@@ -2379,16 +2494,139 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         return ttl;
     }
 
-    private static void setTTLForNewCFs(Map<String, Map<String, Object>> 
familyProps, PTable table,
-            HTableDescriptor tableDesc, Integer newTTL) throws SQLException {
-        if (!familyProps.isEmpty()) {
+    private static KeepDeletedCells getKeepDeletedCells(PTable table, 
HTableDescriptor tableDesc,
+            KeepDeletedCells newKeepDeletedCells) throws SQLException {
+        // If we're setting KEEP_DELETED_CELLS now, then use that value. 
Otherwise, use the empty column family value
+        return (newKeepDeletedCells != null) ?
+                newKeepDeletedCells :
+                
tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getKeepDeletedCellsAsEnum();
+    }
+
+    private static int getReplicationScope(PTable table, HTableDescriptor 
tableDesc,
+            Integer newReplicationScope) throws SQLException {
+        // If we're setting replication scope now, then use that value. 
Otherwise, use the empty column family value
+        return (newReplicationScope != null) ?
+                newReplicationScope :
+                
tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table)).getScope();
+    }
+
+    /**
+     * Keep the TTL, KEEP_DELETED_CELLS and REPLICATION_SCOPE properties of 
new column families
+     * in sync with the existing column families. Note that we use the new 
values for these properties in case they
+     * are passed from our alter table command, if not, we use the default 
column family's value for each property
+     * See {@link MetaDataUtil#SYNCED_DATA_TABLE_AND_INDEX_PROPERTIES}
+     * @param allFamiliesProps Map of all column family properties
+     * @param table original table
+     * @param tableDesc new table descriptor
+     * @param newTTL new value of TTL
+     * @param newKeepDeletedCells new value of KEEP_DELETED_CELLS
+     * @param newReplicationScope new value of REPLICATION_SCOPE
+     * @throws SQLException
+     */
+    private void setSyncedPropsForNewColumnFamilies(Map<String, Map<String, 
Object>> allFamiliesProps, PTable table,
+            HTableDescriptor tableDesc, Integer newTTL, KeepDeletedCells 
newKeepDeletedCells,
+            Integer newReplicationScope) throws SQLException {
+        if (!allFamiliesProps.isEmpty()) {
             int ttl = getTTL(table, tableDesc, newTTL);
-            for (Map.Entry<String, Map<String, Object>> entry : 
familyProps.entrySet()) {
+            int replicationScope = getReplicationScope(table, tableDesc, 
newReplicationScope);
+            KeepDeletedCells keepDeletedCells = getKeepDeletedCells(table, 
tableDesc, newKeepDeletedCells);
+            for (Map.Entry<String, Map<String, Object>> entry : 
allFamiliesProps.entrySet()) {
                 Map<String, Object> props = entry.getValue();
                 if (props == null) {
-                    props = new HashMap<String, Object>();
+                    allFamiliesProps.put(entry.getKey(), new HashMap<String, 
Object>());
+                    props = allFamiliesProps.get(entry.getKey());
                 }
                 props.put(TTL, ttl);
+                props.put(KEEP_DELETED_CELLS, keepDeletedCells);
+                props.put(REPLICATION_SCOPE, replicationScope);
+            }
+        }
+    }
+
+    private void setPropIfNotNull(Map<String, Object> propMap, String 
propName, Object propVal) {
+        if (propName!= null && propVal != null) {
+            propMap.put(propName, propVal);
+        }
+    }
+
+    private Map<String, Object> getNewSyncedPropsMap(Integer newTTL, 
KeepDeletedCells newKeepDeletedCells, Integer newReplicationScope) {
+        Map<String,Object> newSyncedProps = Maps.newHashMapWithExpectedSize(3);
+        setPropIfNotNull(newSyncedProps, TTL, newTTL);
+        setPropIfNotNull(newSyncedProps,KEEP_DELETED_CELLS, 
newKeepDeletedCells);
+        setPropIfNotNull(newSyncedProps, REPLICATION_SCOPE, 
newReplicationScope);
+        return newSyncedProps;
+    }
+
+    /**
+     * Set the new values for properties that are to be kept in sync amongst 
those column families of the table which are
+     * not referenced in the context of our alter table command, including the 
local index column family if it exists
+     * See {@link MetaDataUtil#SYNCED_DATA_TABLE_AND_INDEX_PROPERTIES}
+     * @param tableDesc original table descriptor
+     * @param allFamiliesProps Map of all column family properties
+     * @param newTTL new value of TTL
+     * @param newKeepDeletedCells new value of KEEP_DELETED_CELLS
+     * @param newReplicationScope new value of REPLICATION_SCOPE
+     * @return
+     */
+    private void setSyncedPropsForUnreferencedColumnFamilies(HTableDescriptor 
tableDesc,
+            Map<String, Map<String, Object>> allFamiliesProps,
+            Integer newTTL, KeepDeletedCells newKeepDeletedCells, Integer 
newReplicationScope) {
+        for (HColumnDescriptor family: tableDesc.getColumnFamilies()) {
+            if (!allFamiliesProps.containsKey(family.getNameAsString())) {
+                allFamiliesProps.put(family.getNameAsString(),
+                        getNewSyncedPropsMap(newTTL, newKeepDeletedCells, 
newReplicationScope));
+            }
+        }
+    }
+
+    /**
+     * Set properties to be kept in sync for global indexes of a table, as 
well as
+     * the physical table corresponding to indexes created on views of a table
+     * See {@link MetaDataUtil#SYNCED_DATA_TABLE_AND_INDEX_PROPERTIES} and
+     * @param table base table
+     * @param tableAndIndexDescriptorMappings old to new table descriptor 
mappings
+     * @param applyPropsToAllIndexesDefaultCF new properties to apply to all 
index column families
+     * @throws SQLException
+     */
+    private void setSyncedPropertiesForTableIndexes(PTable table,
+            Map<HTableDescriptor, HTableDescriptor> 
tableAndIndexDescriptorMappings,
+            Map<String, Object> applyPropsToAllIndexesDefaultCF) throws 
SQLException {
+        if (applyPropsToAllIndexesDefaultCF == null || 
applyPropsToAllIndexesDefaultCF.isEmpty()) {
+            return;
+        }
+
+        for (PTable indexTable: table.getIndexes()) {
+            if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
+                // local index tables are already handled when we sync all 
column families of a base table
+                continue;
+            }
+            HTableDescriptor origIndexDescriptor = 
this.getTableDescriptor(indexTable.getPhysicalName().getBytes());
+            HTableDescriptor newIndexDescriptor = new 
HTableDescriptor(origIndexDescriptor);
+
+            byte[] defaultIndexColFam = 
SchemaUtil.getEmptyColumnFamily(indexTable);
+            HColumnDescriptor indexDefaultColDescriptor =
+                    new 
HColumnDescriptor(origIndexDescriptor.getFamily(defaultIndexColFam));
+            modifyColumnFamilyDescriptor(indexDefaultColDescriptor, 
applyPropsToAllIndexesDefaultCF);
+            newIndexDescriptor.removeFamily(defaultIndexColFam);
+            newIndexDescriptor.addFamily(indexDefaultColDescriptor);
+            tableAndIndexDescriptorMappings.put(origIndexDescriptor, 
newIndexDescriptor);
+        }
+        // Also keep properties for the physical view index table in sync
+        String viewIndexName = 
MetaDataUtil.getViewIndexPhysicalName(table.getPhysicalName().getString());
+        if (!Strings.isNullOrEmpty(viewIndexName)) {
+            try {
+                HTableDescriptor origViewIndexTableDescriptor = 
this.getTableDescriptor(Bytes.toBytes(viewIndexName));
+                HTableDescriptor newViewIndexDescriptor =
+                        new HTableDescriptor(origViewIndexTableDescriptor);
+                for (HColumnDescriptor cfd: 
origViewIndexTableDescriptor.getColumnFamilies()) {
+                    HColumnDescriptor newCfd = new HColumnDescriptor(cfd);
+                    modifyColumnFamilyDescriptor(newCfd, 
applyPropsToAllIndexesDefaultCF);
+                    newViewIndexDescriptor.removeFamily(cfd.getName());
+                    newViewIndexDescriptor.addFamily(newCfd);
+                }
+                
tableAndIndexDescriptorMappings.put(origViewIndexTableDescriptor, 
newViewIndexDescriptor);
+            } catch (TableNotFoundException ignore) {
+                // Ignore since this means that a view index table does not 
exist for this table
             }
         }
     }
@@ -3160,6 +3398,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     // We will not reach here if we fail to acquire the lock, 
since it throws UpgradeInProgressException
                 }
                 metaConnection = 
upgradeSystemCatalogIfRequired(metaConnection, currentServerSideTableTimeStamp);
+                // Synchronize necessary properties amongst all column 
families of a base table and its indexes
+                // See PHOENIX-3955
+                if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) {
+                    syncTableAndIndexProperties(metaConnection, getAdmin());
+                }
             }
 
             int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(
@@ -3923,22 +4166,20 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
     @Override
     public MetaDataMutationResult updateIndexState(final List<Mutation> 
tableMetaData, String parentTableName, Map<String, List<Pair<String,Object>>> 
stmtProperties,  PTable table) throws SQLException {
-        if(stmtProperties==null) return 
updateIndexState(tableMetaData,parentTableName);
-
-        Map<String, Object> tableProps = new HashMap<String, Object>();
-        Pair<HTableDescriptor,HTableDescriptor> tableDescriptorPair = 
separateAndValidateProperties(table, stmtProperties, new HashSet<String>(), 
tableProps);
-        HTableDescriptor tableDescriptor = tableDescriptorPair.getSecond();
-        HTableDescriptor origTableDescriptor = tableDescriptorPair.getFirst();
-        Set<HTableDescriptor> tableDescriptors = Collections.emptySet();
-        Set<HTableDescriptor> origTableDescriptors = Collections.emptySet();
-        if (tableDescriptor != null) {
-            tableDescriptors = Sets.newHashSetWithExpectedSize(3 + 
table.getIndexes().size());
-            origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + 
table.getIndexes().size());
-            tableDescriptors.add(tableDescriptor);
-            origTableDescriptors.add(origTableDescriptor);
-        }
-        sendHBaseMetaData(tableDescriptors, true);
-        return updateIndexState(tableMetaData,parentTableName);
+        if(stmtProperties == null) {
+            return updateIndexState(tableMetaData,parentTableName);
+        }
+        Map<HTableDescriptor, HTableDescriptor> oldToNewTableDescriptors =
+                separateAndValidateProperties(table, stmtProperties, new 
HashSet<String>(), new HashMap<String, Object>());
+        HTableDescriptor origTableDescriptor = 
this.getTableDescriptor(table.getPhysicalName().getBytes());
+        HTableDescriptor newTableDescriptor = 
oldToNewTableDescriptors.remove(origTableDescriptor);
+        Set<HTableDescriptor> modifiedTableDescriptors = 
Collections.emptySet();
+        if (newTableDescriptor != null) {
+            modifiedTableDescriptors = Sets.newHashSetWithExpectedSize(3 + 
table.getIndexes().size());
+            modifiedTableDescriptors.add(newTableDescriptor);
+        }
+        sendHBaseMetaData(modifiedTableDescriptors, true);
+        return updateIndexState(tableMetaData, parentTableName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb13ffd8/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index f77ed75..66e8baa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1065,7 +1065,7 @@ public class MetaDataClient {
         TableName tableName = statement.getTableName();
         Map<String,Object> tableProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size());
         Map<String,Object> commonFamilyProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
-        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps);
+        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, statement.getTableType());
 
         boolean isAppendOnlySchema = false;
         long updateCacheFrequency = 
connection.getQueryServices().getProps().getLong(
@@ -1143,14 +1143,27 @@ public class MetaDataClient {
         return connection.getQueryServices().updateData(plan);
     }
 
-    private void populatePropertyMaps(ListMultimap<String,Pair<String,Object>> 
props, Map<String, Object> tableProps,
-            Map<String, Object> commonFamilyProps) {
+    /**
+     * Populate properties for the table and common properties for all column 
families of the table
+     * @param statementProps Properties specified in SQL statement
+     * @param tableProps Properties for an HTableDescriptor
+     * @param commonFamilyProps Properties common to all column families
+     * @param tableType Used to distinguish between index creation vs. base 
table creation paths
+     * @throws SQLException
+     */
+    private void populatePropertyMaps(ListMultimap<String,Pair<String,Object>> 
statementProps, Map<String, Object> tableProps,
+            Map<String, Object> commonFamilyProps, PTableType tableType) 
throws SQLException {
         // Somewhat hacky way of determining if property is for 
HColumnDescriptor or HTableDescriptor
         HColumnDescriptor defaultDescriptor = new 
HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
-        if (!props.isEmpty()) {
-            Collection<Pair<String,Object>> propsList = 
props.get(QueryConstants.ALL_FAMILY_PROPERTIES_KEY);
+        if (!statementProps.isEmpty()) {
+            Collection<Pair<String,Object>> propsList = 
statementProps.get(QueryConstants.ALL_FAMILY_PROPERTIES_KEY);
             for (Pair<String,Object> prop : propsList) {
-                if (defaultDescriptor.getValue(prop.getFirst()) == null) {
+                if (tableType == PTableType.INDEX && 
MetaDataUtil.propertyNotAllowedToBeOutOfSync(prop.getFirst())) {
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX)
+                            .setMessage("Property: " + prop.getFirst()).build()
+                            .buildException();
+                }
+                if (defaultDescriptor.getValue(Bytes.toBytes(prop.getFirst())) 
== null) {
                     tableProps.put(prop.getFirst(), prop.getSecond());
                 } else {
                     commonFamilyProps.put(prop.getFirst(), prop.getSecond());
@@ -1502,7 +1515,7 @@ public class MetaDataClient {
 
         Map<String,Object> tableProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size());
         Map<String,Object> commonFamilyProps = 
Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
-        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps);
+        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, PTableType.INDEX);
         List<Pair<ParseNode, SortOrder>> indexParseNodeAndSortOrderList = 
ik.getParseNodeAndSortOrderList();
         List<ColumnName> includedColumns = statement.getIncludeColumns();
         TableRef tableRef = null;
@@ -1897,6 +1910,57 @@ public class MetaDataClient {
         connection.getQueryServices().deleteMutexCell(tenantId, schemaName, 
tableName, columnName, null);
     }
 
+    /**
+     *
+     * Populate the properties for each column family referenced in the create 
table statement
+     * @param familyNames column families referenced in the create table 
statement
+     * @param commonFamilyProps properties common to all column families
+     * @param statement create table statement
+     * @param defaultFamilyName the default column family name
+     * @param isLocalIndex true if in the create local index path
+     * @param familyPropList list containing pairs of column families and 
their corresponding properties
+     * @throws SQLException
+     */
+    private void populateFamilyPropsList(Map<String, PName> familyNames, 
Map<String,Object> commonFamilyProps,
+            CreateTableStatement statement, String defaultFamilyName, boolean 
isLocalIndex,
+            final List<Pair<byte[],Map<String,Object>>> familyPropList) throws 
SQLException {
+        for (PName familyName : familyNames.values()) {
+            String fam = familyName.getString();
+            Collection<Pair<String, Object>> propsForCF =
+                    
statement.getProps().get(IndexUtil.getActualColumnFamilyName(fam));
+            // No specific properties for this column family, so add the 
common family properties
+            if (propsForCF.isEmpty()) {
+                familyPropList.add(new 
Pair<>(familyName.getBytes(),commonFamilyProps));
+            } else {
+                Map<String,Object> combinedFamilyProps = 
Maps.newHashMapWithExpectedSize(propsForCF.size() + commonFamilyProps.size());
+                combinedFamilyProps.putAll(commonFamilyProps);
+                for (Pair<String,Object> prop : propsForCF) {
+                    // Don't allow specifying column families for TTL, 
KEEP_DELETED_CELLS and REPLICATION_SCOPE.
+                    // These properties can only be applied for all column 
families of a table and can't be column family specific.
+                    // See PHOENIX-3955
+                    if (!fam.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY) 
&& MetaDataUtil.propertyNotAllowedToBeOutOfSync(prop.getFirst())) {
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY)
+                                .setMessage("Property: " + prop.getFirst())
+                                .build()
+                                .buildException();
+                    }
+                    combinedFamilyProps.put(prop.getFirst(), prop.getSecond());
+                }
+                familyPropList.add(new 
Pair<>(familyName.getBytes(),combinedFamilyProps));
+            }
+        }
+
+        if (familyNames.isEmpty()) {
+            // If there are no family names, use the default column family 
name. This also takes care of the case when
+            // the table ddl has only PK cols present (which means familyNames 
is empty).
+            byte[] cf =
+                    defaultFamilyName == null ? (!isLocalIndex? 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES
+                            : 
QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES)
+                            : Bytes.toBytes(defaultFamilyName);
+            familyPropList.add(new Pair<>(cf, commonFamilyProps));
+        }
+    }
+
     private PTable createTableInternal(CreateTableStatement statement, 
byte[][] splits,
             final PTable parent, String viewStatement, ViewType viewType, 
PDataType viewIndexType,
             final byte[][] viewColumnConstants, final BitSet 
isViewColumnReferenced, boolean allocateIndexId,
@@ -1937,7 +2001,6 @@ public class MetaDataClient {
             ImmutableStorageScheme immutableStorageScheme = 
ONE_CELL_PER_COLUMN;
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, 
transactionProvider != null, transactionProvider);
-                storeNulls = parent.getStoreNulls();
                 isImmutableRows = parent.isImmutableRows();
                 isAppendOnlySchema = parent.isAppendOnlySchema();
 
@@ -2552,7 +2615,6 @@ public class MetaDataClient {
                     .build().buildException();
             }
 
-            List<Pair<byte[],Map<String,Object>>> familyPropList = 
Lists.newArrayListWithExpectedSize(familyNames.size());
             if (!statement.getProps().isEmpty()) {
                 for (String familyName : statement.getProps().keySet()) {
                     if 
(!familyName.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
@@ -2567,36 +2629,8 @@ public class MetaDataClient {
             }
             throwIfInsufficientColumns(schemaName, tableName, pkColumns, 
saltBucketNum!=null, multiTenant);
 
-            for (PName familyName : familyNames.values()) {
-                String fam = familyName.getString();
-                Collection<Pair<String, Object>> props =
-                        
statement.getProps().get(IndexUtil.getActualColumnFamilyName(fam));
-                if (props.isEmpty()) {
-                    familyPropList.add(new 
Pair<byte[],Map<String,Object>>(familyName.getBytes(),commonFamilyProps));
-                } else {
-                    Map<String,Object> combinedFamilyProps = 
Maps.newHashMapWithExpectedSize(props.size() + commonFamilyProps.size());
-                    combinedFamilyProps.putAll(commonFamilyProps);
-                    for (Pair<String,Object> prop : props) {
-                        // Don't allow specifying column families for TTL. TTL 
can only apply for the all the column families of the table
-                        // i.e. it can't be column family specific.
-                        if 
(!familyName.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY) && 
prop.getFirst().equals(TTL)) {
-                            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_TTL).build().buildException();
-                        }
-                        combinedFamilyProps.put(prop.getFirst(), 
prop.getSecond());
-                    }
-                    familyPropList.add(new 
Pair<byte[],Map<String,Object>>(familyName.getBytes(),combinedFamilyProps));
-                }
-            }
-
-            if (familyNames.isEmpty()) {
-                //if there are no family names, use the default column family 
name. This also takes care of the case when
-                //the table ddl has only PK cols present (which means 
familyNames is empty).
-                byte[] cf =
-                        defaultFamilyName == null ? (!isLocalIndex? 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES
-                                : 
QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES)
-                                : Bytes.toBytes(defaultFamilyName);
-                familyPropList.add(new Pair<byte[],Map<String,Object>>(cf, 
commonFamilyProps));
-            }
+            List<Pair<byte[],Map<String,Object>>> familyPropList = 
Lists.newArrayListWithExpectedSize(familyNames.size());
+            populateFamilyPropsList(familyNames, commonFamilyProps, statement, 
defaultFamilyName, isLocalIndex, familyPropList);
 
             // Bootstrapping for our SYSTEM.TABLE that creates itself before 
it exists
             if (SchemaUtil.isMetaTable(schemaName,tableName)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb13ffd8/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 3d2e84e..ab667c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.schema;
 
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ALTER_PROPERTY;
-import static 
org.apache.phoenix.exception.SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_TTL;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.DEFAULT_COLUMN_FAMILY_ONLY_ON_CREATE_TABLE;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.SALT_ONLY_ON_CREATE_TABLE;
@@ -74,7 +74,7 @@ public enum TableProperty {
         }
     },
 
-    TTL(HColumnDescriptor.TTL, COLUMN_FAMILY_NOT_ALLOWED_FOR_TTL, true, 
CANNOT_ALTER_PROPERTY, false, false) {
+    TTL(HColumnDescriptor.TTL, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, 
CANNOT_ALTER_PROPERTY, false, false) {
         @Override
         public Object getPTableValue(PTable table) {
             return null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb13ffd8/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index aaa444f..ec73296 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -25,8 +25,10 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +83,7 @@ import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ServiceException;
@@ -96,8 +99,22 @@ public class MetaDataUtil {
     public static final byte[] VIEW_INDEX_SEQUENCE_PREFIX_BYTES = 
Bytes.toBytes(VIEW_INDEX_SEQUENCE_PREFIX);
     private static final String VIEW_INDEX_ID_COLUMN_NAME = "_INDEX_ID";
     public static final String PARENT_TABLE_KEY = "PARENT_TABLE";
-    public static final byte[] PARENT_TABLE_KEY_BYTES = 
Bytes.toBytes("PARENT_TABLE");
-    
+    public static final String IS_VIEW_INDEX_TABLE_PROP_NAME = 
"IS_VIEW_INDEX_TABLE";
+    public static final byte[] IS_VIEW_INDEX_TABLE_PROP_BYTES = 
Bytes.toBytes(IS_VIEW_INDEX_TABLE_PROP_NAME);
+
+    public static final String IS_LOCAL_INDEX_TABLE_PROP_NAME = 
"IS_LOCAL_INDEX_TABLE";
+    public static final byte[] IS_LOCAL_INDEX_TABLE_PROP_BYTES = 
Bytes.toBytes(IS_LOCAL_INDEX_TABLE_PROP_NAME);
+
+    public static final String DATA_TABLE_NAME_PROP_NAME = "DATA_TABLE_NAME";
+
+    public static final byte[] DATA_TABLE_NAME_PROP_BYTES = 
Bytes.toBytes(DATA_TABLE_NAME_PROP_NAME);
+
+    // See PHOENIX-3955
+    public static final List<String> SYNCED_DATA_TABLE_AND_INDEX_PROPERTIES = 
ImmutableList.of(
+            HColumnDescriptor.TTL,
+            HColumnDescriptor.KEEP_DELETED_CELLS,
+            HColumnDescriptor.REPLICATION_SCOPE);
+
     public static boolean areClientAndServerCompatible(long 
serverHBaseAndPhoenixVersion) {
         // As of 3.0, we allow a client and server to differ for the minor 
version.
         // Care has to be taken to upgrade the server before the client, as 
otherwise
@@ -708,17 +725,20 @@ public class MetaDataUtil {
         return true;
     }
 
-    public static final String IS_VIEW_INDEX_TABLE_PROP_NAME = 
"IS_VIEW_INDEX_TABLE";
-    public static final byte[] IS_VIEW_INDEX_TABLE_PROP_BYTES = 
Bytes.toBytes(IS_VIEW_INDEX_TABLE_PROP_NAME);
-
-    public static final String IS_LOCAL_INDEX_TABLE_PROP_NAME = 
"IS_LOCAL_INDEX_TABLE";
-    public static final byte[] IS_LOCAL_INDEX_TABLE_PROP_BYTES = 
Bytes.toBytes(IS_LOCAL_INDEX_TABLE_PROP_NAME);
-
-    public static final String DATA_TABLE_NAME_PROP_NAME = "DATA_TABLE_NAME";
-
-    public static final byte[] DATA_TABLE_NAME_PROP_BYTES = 
Bytes.toBytes(DATA_TABLE_NAME_PROP_NAME);
-
+    public static boolean propertyNotAllowedToBeOutOfSync(String colFamProp) {
+        return SYNCED_DATA_TABLE_AND_INDEX_PROPERTIES.contains(colFamProp);
+    }
 
+    public static Map<String, Object> getSyncedProps(HColumnDescriptor 
defaultCFDesc) {
+        Map<String, Object> syncedProps = new HashMap<>();
+        if (defaultCFDesc != null) {
+            for (String propToKeepInSync: 
SYNCED_DATA_TABLE_AND_INDEX_PROPERTIES) {
+                syncedProps.put(propToKeepInSync, Bytes.toString(
+                        
defaultCFDesc.getValue(Bytes.toBytes(propToKeepInSync))));
+            }
+        }
+        return syncedProps;
+    }
 
     public static Scan newTableRowsScan(byte[] key, long startTimeStamp, long 
stopTimeStamp){
         return newTableRowsScan(key, null, startTimeStamp, stopTimeStamp);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb13ffd8/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 9d5583b..ede14b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -73,21 +73,14 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter;
 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -101,6 +94,7 @@ import org.apache.phoenix.coprocessor.TableViewFinderResult;
 import org.apache.phoenix.coprocessor.ViewFinder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -1218,7 +1212,135 @@ public class UpgradeUtil {
             queryConn.getQueryServices().clearCache();
         }
     }
-    
+
+    /**
+     * Synchronize column family properties using the default cf properties 
for a given table
+     * @param tableDesc table descriptor of table to modify
+     * @param defaultColFam default column family used as the baseline for 
property synchronization
+     * @param syncedProps Map of properties to be kept in sync as read from 
the default column family descriptor
+     * @return modified table descriptor
+     */
+    private static HTableDescriptor syncColFamProperties(HTableDescriptor 
tableDesc, HColumnDescriptor defaultColFam,
+            Map<String, Object> syncedProps) {
+        HTableDescriptor newTableDesc = new HTableDescriptor(tableDesc);
+        // Ensure that all column families have necessary properties in sync 
(including local index cf if present)
+        for (HColumnDescriptor currentColFam: tableDesc.getColumnFamilies()) {
+            if (!currentColFam.equals(defaultColFam)) {
+                HColumnDescriptor newColFamDesc = new 
HColumnDescriptor(currentColFam);
+                for (String prop: 
MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_PROPERTIES) {
+                    String existingPropVal = 
Bytes.toString(currentColFam.getValue(Bytes.toBytes(prop)));
+                    String expectedPropVal = syncedProps.get(prop).toString();
+                    if (existingPropVal == null || 
!existingPropVal.toLowerCase().equals(expectedPropVal.toLowerCase())) {
+                        // Need to synchronize this property for the current 
column family descriptor
+                        newColFamDesc.setValue(prop, expectedPropVal);
+                    }
+                }
+                if (!newColFamDesc.equals(currentColFam)) {
+                    newTableDesc.modifyFamily(newColFamDesc);
+                }
+            }
+        }
+        return newTableDesc;
+    }
+
+    /**
+     * Add the table descriptor to the set of table descriptors to keep in 
sync, if it has been changed
+     * @param origTableDesc original table descriptor of the table in question
+     * @param defaultColFam column family to be used for synchronizing 
properties
+     * @param syncedProps Map of properties to be kept in sync as read from 
the default column family descriptor
+     * @param tableDescsToSync set of modified table descriptors
+     * @throws SQLException
+     */
+    private static void addTableDescIfPropsChanged(HTableDescriptor 
origTableDesc, HColumnDescriptor defaultColFam,
+            Map<String, Object> syncedProps, Set<HTableDescriptor> 
tableDescsToSync) throws SQLException {
+        HTableDescriptor newTableDesc = syncColFamProperties(origTableDesc, 
defaultColFam, syncedProps);
+        if (!origTableDesc.equals(newTableDesc)) {
+            tableDescsToSync.add(newTableDesc);
+        }
+    }
+
+    /**
+     * Synchronize certain properties across column families of global index 
tables for a given base table
+     * @param cqs CQS object to get table descriptor from PTable
+     * @param baseTable base table
+     * @param defaultColFam column family to be used for synchronizing 
properties
+     * @param syncedProps Map of properties to be kept in sync as read from 
the default column family descriptor
+     * @param tableDescsToSync set of modified table descriptors
+     */
+    private static void syncGlobalIndexesForTable(ConnectionQueryServices cqs, 
PTable baseTable, HColumnDescriptor defaultColFam,
+            Map<String, Object> syncedProps, Set<HTableDescriptor> 
tableDescsToSync) throws SQLException {
+        for (PTable indexTable: baseTable.getIndexes()) {
+            // We already handle local index property synchronization when 
considering all column families of the base table
+            if (indexTable.getIndexType() == IndexType.GLOBAL) {
+                
addTableDescIfPropsChanged(cqs.getTableDescriptor(indexTable.getPhysicalName().getBytes()),
+                        defaultColFam, syncedProps, tableDescsToSync);
+            }
+        }
+    }
+
+    /**
+     * Synchronize certain properties across column families of view index 
tables for a given base table
+     * @param cqs CQS object to get table descriptor from PTable
+     * @param baseTable base table
+     * @param defaultColFam column family to be used for synchronizing 
properties
+     * @param syncedProps Map of properties to be kept in sync as read from 
the default column family descriptor
+     * @param tableDescsToSync set of modified table descriptors
+     */
+    private static void syncViewIndexTable(ConnectionQueryServices cqs, PTable 
baseTable, HColumnDescriptor defaultColFam,
+            Map<String, Object> syncedProps, Set<HTableDescriptor> 
tableDescsToSync) throws SQLException {
+        String viewIndexName = 
MetaDataUtil.getViewIndexPhysicalName(baseTable.getPhysicalName().getString());
+        if (!Strings.isNullOrEmpty(viewIndexName)) {
+            try {
+                
addTableDescIfPropsChanged(cqs.getTableDescriptor(Bytes.toBytes(viewIndexName)),
+                        defaultColFam, syncedProps, tableDescsToSync);
+            } catch (TableNotFoundException ignore) {
+                // Ignore since this means that a view index table does not 
exist for this table
+            }
+        }
+    }
+
+    /**
+     * Make sure that all tables have necessary column family properties in 
sync
+     * with each other and also in sync with all the table's indexes
+     * See PHOENIX-3955
+     * @param conn Phoenix connection
+     * @param admin HBase admin used for getting existing tables and their 
descriptors
+     * @throws SQLException
+     * @throws IOException
+     */
+    public static void syncTableAndIndexProperties(PhoenixConnection conn, 
Admin admin)
+    throws SQLException, IOException {
+        Set<HTableDescriptor> tableDescriptorsToSynchronize = new HashSet<>();
+        for (HTableDescriptor origTableDesc : admin.listTables()) {
+            if 
(MetaDataUtil.isViewIndex(origTableDesc.getTableName().getNameWithNamespaceInclAsString()))
 {
+                // Ignore physical view index tables since we handle them for 
each base table already
+                continue;
+            }
+            PTable table = null;
+            String tableName = origTableDesc.getTableName().getNameAsString();
+            try {
+                table = PhoenixRuntime.getTable(conn, tableName);
+            } catch (TableNotFoundException e) {
+                // Ignore tables not mapped to a Phoenix Table
+                logger.warn("Error getting PTable for HBase table: " + 
tableName);
+                continue;
+            }
+            if (table.getType() == PTableType.INDEX) {
+                // Ignore global index tables since we handle them for each 
base table already
+                continue;
+            }
+            HColumnDescriptor defaultColFam = 
origTableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(table));
+            Map<String, Object> syncedProps = 
MetaDataUtil.getSyncedProps(defaultColFam);
+
+            addTableDescIfPropsChanged(origTableDesc, defaultColFam, 
syncedProps, tableDescriptorsToSynchronize);
+            syncGlobalIndexesForTable(conn.getQueryServices(), table, 
defaultColFam, syncedProps, tableDescriptorsToSynchronize);
+            syncViewIndexTable(conn.getQueryServices(), table, defaultColFam, 
syncedProps, tableDescriptorsToSynchronize);
+        }
+        for (HTableDescriptor t: tableDescriptorsToSynchronize) {
+            admin.modifyTable(t.getTableName(), t);
+        }
+    }
+
     private static void upsertBaseColumnCountInHeaderRow(PhoenixConnection 
metaConnection,
             String tenantId, String schemaName, String viewOrTableName, int 
baseColumnCount)
             throws SQLException {

Reply via email to