This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c3c8e8705 Allow replica group assignment support in tier configs 
(#10255)
7c3c8e8705 is described below

commit 7c3c8e87052edca26648a646850b2b17ec0aa690
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Mar 9 10:55:35 2023 +0530

    Allow replica group assignment support in tier configs (#10255)
    
    * Allow replica group assignment support in tier configs
    
    * Add reassignment logic
    
    * Add tier to assignInstances
    
    * Make changes to APIs
    
    * Consolidate tier assignemnt configs inside InstanceAssignmentConfigsMap
    
    * Add removal logic for tier partitions
    
    * Lint fix
    
    * Review comments
    
    * Review comments
    
    * Add tests
    
    * Fix java8 quickstart
    
    * Fix test
    
    ---------
    
    Co-authored-by: Saurabh Dubey <[email protected]>
---
 .../assignment/InstanceAssignmentConfigUtils.java  |  20 +--
 .../common/assignment/InstancePartitionsUtils.java |  16 ++
 .../pinot/common/metadata/ZKMetadataProvider.java  |  16 ++
 .../common/utils/config/TableConfigUtils.java      |  13 +-
 .../common/utils/config/TableConfigSerDeTest.java  |  10 +-
 .../PinotInstanceAssignmentRestletResource.java    | 188 +++++++++++++++------
 .../api/resources/PinotTableRestletResource.java   |   2 +-
 .../helix/core/PinotHelixResourceManager.java      |  32 +++-
 .../instance/InstanceAssignmentDriver.java         |  32 +++-
 .../helix/core/rebalance/RebalanceResult.java      |   8 +
 .../helix/core/rebalance/TableRebalancer.java      | 111 ++++++++----
 ...anceAssignmentRestletResourceStatelessTest.java | 141 ++++++++++------
 .../instance/InstanceAssignmentTest.java           |  64 +++----
 .../TableRebalancerClusterStatelessTest.java       | 127 +++++++++++++-
 .../segment/local/utils/TableConfigUtils.java      |   3 +-
 .../segment/local/utils/TableConfigUtilsTest.java  |   8 +-
 .../apache/pinot/spi/config/table/TableConfig.java |   8 +-
 .../utils/builder/ControllerRequestURLBuilder.java |   2 +-
 .../spi/utils/builder/TableConfigBuilder.java      |   4 +-
 19 files changed, 594 insertions(+), 211 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index 6a0ae1188e..b571918c0c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -44,11 +44,10 @@ public class InstanceAssignmentConfigUtils {
    * backward-compatibility) COMPLETED server tag is overridden to be 
different from the CONSUMING server tag.
    */
   public static boolean shouldRelocateCompletedSegments(TableConfig 
tableConfig) {
-    Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = 
tableConfig.getInstanceAssignmentConfigMap();
     return (instanceAssignmentConfigMap != null
-        && instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED) 
!= null) || TagNameUtils
-        .isRelocateCompletedSegments(tableConfig.getTenantConfig());
+        && 
instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED.toString()) != 
null)
+        || 
TagNameUtils.isRelocateCompletedSegments(tableConfig.getTenantConfig());
   }
 
   /**
@@ -60,21 +59,20 @@ public class InstanceAssignmentConfigUtils {
       return true;
     }
     TableType tableType = tableConfig.getTableType();
-    Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = 
tableConfig.getInstanceAssignmentConfigMap();
     switch (instancePartitionsType) {
       // Allow OFFLINE instance assignment if the offline table has it 
configured or (for backward-compatibility) is
       // using replica-group segment assignment
       case OFFLINE:
         return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap 
!= null
-            && instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE) 
!= null)
+            && 
instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString()) != 
null)
             || AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
             
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
       // Allow CONSUMING/COMPLETED instance assignment if the real-time table 
has it configured
       case CONSUMING:
       case COMPLETED:
         return tableType == TableType.REALTIME && (instanceAssignmentConfigMap 
!= null
-            && instanceAssignmentConfigMap.get(instancePartitionsType) != 
null);
+            && 
instanceAssignmentConfigMap.get(instancePartitionsType.toString()) != null);
       default:
         throw new IllegalStateException();
     }
@@ -89,10 +87,10 @@ public class InstanceAssignmentConfigUtils {
         "Instance assignment is not allowed for the given table config");
 
     // Use the instance assignment config from the table config if it exists
-    Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = 
tableConfig.getInstanceAssignmentConfigMap();
     if (instanceAssignmentConfigMap != null) {
-      InstanceAssignmentConfig instanceAssignmentConfig = 
instanceAssignmentConfigMap.get(instancePartitionsType);
+      InstanceAssignmentConfig instanceAssignmentConfig =
+          instanceAssignmentConfigMap.get(instancePartitionsType.toString());
       if (instanceAssignmentConfig != null) {
         return instanceAssignmentConfig;
       }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index a15554f3d3..759d387af4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -46,6 +46,7 @@ public class InstancePartitionsUtils {
   }
 
   public static final char TYPE_SUFFIX_SEPARATOR = '_';
+  public static final String TIER_SUFFIX = "__TIER__";
 
   /**
    * Returns the name of the instance partitions for the given table name 
(with or without type suffix) and instance
@@ -93,6 +94,11 @@ public class InstancePartitionsUtils {
     return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
   }
 
+  public static String getInstancePartitionsNameForTier(String tableName, 
String tierName) {
+    return TableNameBuilder.extractRawTableName(tableName) + TIER_SUFFIX + 
tierName;
+  }
+
+
   /**
    * Gets the instance partitions with the given name, and returns a re-named 
copy of the same.
    * This method is useful when we use a table with instancePartitionsMap 
since in that case
@@ -177,4 +183,14 @@ public class InstancePartitionsUtils {
       throw new ZkException("Failed to remove instance partitions: " + 
instancePartitionsName);
     }
   }
+
+  public static void removeTierInstancePartitions(HelixPropertyStore<ZNRecord> 
propertyStore,
+      String tableNameWithType) {
+    List<InstancePartitions> instancePartitions = 
ZKMetadataProvider.getAllInstancePartitions(propertyStore);
+    instancePartitions.stream().filter(instancePartition -> 
instancePartition.getInstancePartitionsName()
+            
.startsWith(TableNameBuilder.extractRawTableName(tableNameWithType) + 
TIER_SUFFIX))
+        .forEach(instancePartition -> {
+          removeInstancePartitions(propertyStore, 
instancePartition.getInstancePartitionsName());
+        });
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 623a856454..b14ba30391 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -26,9 +26,11 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
+import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.SchemaUtils;
@@ -261,6 +263,20 @@ public class ZKMetadataProvider {
     }
   }
 
+  @Nullable
+  public static List<InstancePartitions> 
getAllInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore) {
+    List<ZNRecord> znRecordss =
+        propertyStore.getChildren(PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, 
null, AccessOption.PERSISTENT);
+
+    try {
+      return 
Optional.ofNullable(znRecordss).orElseGet(ArrayList::new).stream().map(InstancePartitions::fromZNRecord)
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while getting instance partitions", e);
+      return null;
+    }
+  }
+
   @Nullable
   public static List<UserConfig> 
getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
     List<ZNRecord> znRecordss =
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 8abb0ea964..9af13f44bc 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -113,11 +113,11 @@ public class TableConfigUtils {
       queryConfig = JsonUtils.stringToObject(queryConfigString, 
QueryConfig.class);
     }
 
-    Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap = null;
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
     String instanceAssignmentConfigMapString = 
simpleFields.get(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY);
     if (instanceAssignmentConfigMapString != null) {
       instanceAssignmentConfigMap = 
JsonUtils.stringToObject(instanceAssignmentConfigMapString,
-          new TypeReference<Map<InstancePartitionsType, 
InstanceAssignmentConfig>>() {
+          new TypeReference<Map<String, InstanceAssignmentConfig>>() {
           });
     }
 
@@ -181,9 +181,9 @@ public class TableConfigUtils {
     }
 
     return new TableConfig(tableName, tableType, validationConfig, 
tenantConfig, indexingConfig, customConfig,
-        quotaConfig, taskConfig, routingConfig, queryConfig, 
instanceAssignmentConfigMap,
-        fieldConfigList, upsertConfig, dedupConfig, dimensionTableConfig, 
ingestionConfig, tierConfigList, isDimTable,
-        tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap);
+        quotaConfig, taskConfig, routingConfig, queryConfig, 
instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
+        dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, 
isDimTable, tunerConfigList,
+        instancePartitionsMap, segmentAssignmentConfigMap);
   }
 
   public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -216,8 +216,7 @@ public class TableConfigUtils {
     if (queryConfig != null) {
       simpleFields.put(TableConfig.QUERY_CONFIG_KEY, 
queryConfig.toJsonString());
     }
-    Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = 
tableConfig.getInstanceAssignmentConfigMap();
     if (instanceAssignmentConfigMap != null) {
       simpleFields
           .put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, 
JsonUtils.objectToString(instanceAssignmentConfigMap));
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index aad62db8f7..30dbfe80b4 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
               new InstanceConstraintConfig(Arrays.asList("constraint1", 
"constraint2")),
               new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, 
false));
       TableConfig tableConfig = 
tableConfigBuilder.setInstanceAssignmentConfigMap(
-          Collections.singletonMap(InstancePartitionsType.OFFLINE, 
instanceAssignmentConfig)).build();
+          Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), 
instanceAssignmentConfig)).build();
 
       checkInstanceAssignmentConfig(tableConfig);
 
@@ -488,12 +488,12 @@ public class TableConfigSerDeTest {
   }
 
   private void checkInstanceAssignmentConfig(TableConfig tableConfig) {
-    Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
-        tableConfig.getInstanceAssignmentConfigMap();
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = 
tableConfig.getInstanceAssignmentConfigMap();
     assertNotNull(instanceAssignmentConfigMap);
     assertEquals(instanceAssignmentConfigMap.size(), 1);
-    
assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE));
-    InstanceAssignmentConfig instanceAssignmentConfig = 
instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE);
+    
assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    InstanceAssignmentConfig instanceAssignmentConfig =
+        
instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString());
 
     InstanceTagPoolConfig tagPoolConfig = 
instanceAssignmentConfig.getTagPoolConfig();
     assertEquals(tagPoolConfig.getTag(), "tenant_OFFLINE");
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index dfdaefafaa..aee7df4e8a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +46,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -57,6 +59,7 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -80,39 +83,57 @@ public class PinotInstanceAssignmentRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tables/{tableName}/instancePartitions")
   @ApiOperation(value = "Get the instance partitions")
-  public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions(
+  public Map<String, InstancePartitions> getInstancePartitions(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable
-          InstancePartitionsType instancePartitionsType) {
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") 
@QueryParam("type") @Nullable String type) {
+    Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
 
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType != TableType.REALTIME) {
-      if (instancePartitionsType == InstancePartitionsType.OFFLINE || 
instancePartitionsType == null) {
-        InstancePartitions offlineInstancePartitions = InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.OFFLINE.toString().equals(type) || type == 
null) {
+        InstancePartitions offlineInstancePartitions =
+            
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 
InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
         if (offlineInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.OFFLINE, 
offlineInstancePartitions);
+          instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(), 
offlineInstancePartitions);
         }
       }
     }
     if (tableType != TableType.OFFLINE) {
-      if (instancePartitionsType == InstancePartitionsType.CONSUMING || 
instancePartitionsType == null) {
-        InstancePartitions consumingInstancePartitions = 
InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.CONSUMING.toString().equals(type) || type == 
null) {
+        InstancePartitions consumingInstancePartitions =
+            
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
         if (consumingInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.CONSUMING, 
consumingInstancePartitions);
+          
instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(), 
consumingInstancePartitions);
         }
       }
-      if (instancePartitionsType == InstancePartitionsType.COMPLETED || 
instancePartitionsType == null) {
-        InstancePartitions completedInstancePartitions = 
InstancePartitionsUtils
-            .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+      if (InstancePartitionsType.COMPLETED.toString().equals(type) || type == 
null) {
+        InstancePartitions completedInstancePartitions =
+            
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
                 
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
         if (completedInstancePartitions != null) {
-          instancePartitionsMap.put(InstancePartitionsType.COMPLETED, 
completedInstancePartitions);
+          
instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(), 
completedInstancePartitions);
+        }
+      }
+    }
+
+    List<TableConfig> tableConfigs = 
Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+        _resourceManager.getOfflineTableConfig(tableName));
+
+    for (TableConfig tableConfig : tableConfigs) {
+      if (tableConfig != null && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+          if (type == null || type.equals(tierConfig.getName())) {
+            InstancePartitions instancePartitions =
+                
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+                    
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+                        tierConfig.getName()));
+            if (instancePartitions != null) {
+              instancePartitionsMap.put(tierConfig.getName(), 
instancePartitions);
+            }
+          }
         }
       }
     }
@@ -130,22 +151,20 @@ public class PinotInstanceAssignmentRestletResource {
   @Path("/tables/{tableName}/assignInstances")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Assign server instances to a table")
-  public Map<InstancePartitionsType, InstancePartitions> assignInstances(
+  public Map<String, InstancePartitions> assignInstances(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable
-          InstancePartitionsType instancePartitionsType,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") 
@QueryParam("type") @Nullable String type,
       @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") 
@QueryParam("dryRun") boolean dryRun) {
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
new TreeMap<>();
+    Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
     List<InstanceConfig> instanceConfigs = 
_resourceManager.getAllHelixInstanceConfigs();
 
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType != TableType.REALTIME && (instancePartitionsType == 
InstancePartitionsType.OFFLINE
-        || instancePartitionsType == null)) {
+    if (tableType != TableType.REALTIME && 
(InstancePartitionsType.OFFLINE.toString().equals(type) || type == null)) {
       TableConfig offlineTableConfig = 
_resourceManager.getOfflineTableConfig(tableName);
       if (offlineTableConfig != null) {
         try {
-          if (InstanceAssignmentConfigUtils
-              .allowInstanceAssignment(offlineTableConfig, 
InstancePartitionsType.OFFLINE)) {
+          if 
(InstanceAssignmentConfigUtils.allowInstanceAssignment(offlineTableConfig,
+              InstancePartitionsType.OFFLINE)) {
             assignInstancesForInstancePartitionsType(instancePartitionsMap, 
offlineTableConfig, instanceConfigs,
                 InstancePartitionsType.OFFLINE);
           }
@@ -158,20 +177,20 @@ public class PinotInstanceAssignmentRestletResource {
         }
       }
     }
-    if (tableType != TableType.OFFLINE && instancePartitionsType != 
InstancePartitionsType.OFFLINE) {
+    if (tableType != TableType.OFFLINE && 
!InstancePartitionsType.OFFLINE.toString().equals(type)) {
       TableConfig realtimeTableConfig = 
_resourceManager.getRealtimeTableConfig(tableName);
       if (realtimeTableConfig != null) {
         try {
-          if (instancePartitionsType == InstancePartitionsType.CONSUMING || 
instancePartitionsType == null) {
-            if (InstanceAssignmentConfigUtils
-                .allowInstanceAssignment(realtimeTableConfig, 
InstancePartitionsType.CONSUMING)) {
+          if (InstancePartitionsType.CONSUMING.toString().equals(type) || type 
== null) {
+            if 
(InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+                InstancePartitionsType.CONSUMING)) {
               assignInstancesForInstancePartitionsType(instancePartitionsMap, 
realtimeTableConfig, instanceConfigs,
                   InstancePartitionsType.CONSUMING);
             }
           }
-          if (instancePartitionsType == InstancePartitionsType.COMPLETED || 
instancePartitionsType == null) {
-            if (InstanceAssignmentConfigUtils
-                .allowInstanceAssignment(realtimeTableConfig, 
InstancePartitionsType.COMPLETED)) {
+          if (InstancePartitionsType.COMPLETED.toString().equals(type) || type 
== null) {
+            if 
(InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+                InstancePartitionsType.COMPLETED)) {
               assignInstancesForInstancePartitionsType(instancePartitionsMap, 
realtimeTableConfig, instanceConfigs,
                   InstancePartitionsType.COMPLETED);
             }
@@ -186,6 +205,16 @@ public class PinotInstanceAssignmentRestletResource {
       }
     }
 
+    TableConfig realtimeTableConfig = 
_resourceManager.getRealtimeTableConfig(tableName);
+    if (realtimeTableConfig != null) {
+      assignInstancesForTier(instancePartitionsMap, realtimeTableConfig, 
instanceConfigs, type);
+    }
+
+    TableConfig offlineTableConfig = 
_resourceManager.getOfflineTableConfig(tableName);
+    if (offlineTableConfig != null) {
+      assignInstancesForTier(instancePartitionsMap, offlineTableConfig, 
instanceConfigs, type);
+    }
+
     if (instancePartitionsMap.isEmpty()) {
       throw new ControllerApplicationException(LOGGER, "Failed to find the 
instance assignment config",
           Response.Status.NOT_FOUND);
@@ -207,22 +236,43 @@ public class PinotInstanceAssignmentRestletResource {
    * @param instanceConfigs list of instance configs
    * @param instancePartitionsType type of instancePartitions
    */
-  private void assignInstancesForInstancePartitionsType(
-      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
TableConfig tableConfig,
-      List<InstanceConfig> instanceConfigs, InstancePartitionsType 
instancePartitionsType) {
+  private void assignInstancesForInstancePartitionsType(Map<String, 
InstancePartitions> instancePartitionsMap,
+      TableConfig tableConfig, List<InstanceConfig> instanceConfigs, 
InstancePartitionsType instancePartitionsType) {
     String tableNameWithType = tableConfig.getTableName();
     if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType)) {
       String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-      instancePartitionsMap.put(instancePartitionsType, 
InstancePartitionsUtils.fetchInstancePartitionsWithRename(
-          _resourceManager.getPropertyStore(), 
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
-          instancePartitionsType.getInstancePartitionsName(rawTableName)));
+      instancePartitionsMap.put(instancePartitionsType.toString(),
+          
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+              
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+              instancePartitionsType.getInstancePartitionsName(rawTableName)));
       return;
     }
     InstancePartitions existingInstancePartitions =
         
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
             
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, 
instancePartitionsType.toString()));
-    instancePartitionsMap.put(instancePartitionsType, new 
InstanceAssignmentDriver(tableConfig)
-        .assignInstances(instancePartitionsType, instanceConfigs, 
existingInstancePartitions));
+    instancePartitionsMap.put(instancePartitionsType.toString(),
+        new 
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, 
instanceConfigs,
+            existingInstancePartitions));
+  }
+
+  private void assignInstancesForTier(Map<String, InstancePartitions> 
instancePartitionsMap, TableConfig tableConfig,
+      List<InstanceConfig> instanceConfigs, String tierName) {
+    if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+        && tableConfig.getInstanceAssignmentConfigMap() != null) {
+      for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+        if ((tierConfig.getName().equals(tierName) || tierName == null)
+            && 
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()) != null) 
{
+          InstancePartitions existingInstancePartitions = 
InstancePartitionsUtils.fetchInstancePartitions(
+              _resourceManager.getHelixZkManager().getHelixPropertyStore(),
+              
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+                  tierConfig.getName()));
+
+          instancePartitionsMap.put(tierConfig.getName(),
+              new 
InstanceAssignmentDriver(tableConfig).assignInstances(tierConfig.getName(), 
instanceConfigs,
+                  existingInstancePartitions, 
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName())));
+        }
+      }
+    }
   }
 
   private void persistInstancePartitionsHelper(InstancePartitions 
instancePartitions) {
@@ -240,7 +290,7 @@ public class PinotInstanceAssignmentRestletResource {
   @Path("/tables/{tableName}/instancePartitions")
   @Authenticate(AccessType.UPDATE)
   @ApiOperation(value = "Create/update the instance partitions")
-  public Map<InstancePartitionsType, InstancePartitions> setInstancePartitions(
+  public Map<String, InstancePartitions> setInstancePartitions(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName, String instancePartitionsStr) {
     InstancePartitions instancePartitions;
     try {
@@ -256,17 +306,32 @@ public class PinotInstanceAssignmentRestletResource {
     if (tableType != TableType.REALTIME) {
       if 
(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
 {
         persistInstancePartitionsHelper(instancePartitions);
-        return Collections.singletonMap(InstancePartitionsType.OFFLINE, 
instancePartitions);
+        return 
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), 
instancePartitions);
       }
     }
     if (tableType != TableType.OFFLINE) {
       if 
(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
 {
         persistInstancePartitionsHelper(instancePartitions);
-        return Collections.singletonMap(InstancePartitionsType.CONSUMING, 
instancePartitions);
+        return 
Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), 
instancePartitions);
       }
       if 
(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
 {
         persistInstancePartitionsHelper(instancePartitions);
-        return Collections.singletonMap(InstancePartitionsType.COMPLETED, 
instancePartitions);
+        return 
Collections.singletonMap(InstancePartitionsType.COMPLETED.toString(), 
instancePartitions);
+      }
+    }
+
+    List<TableConfig> tableConfigs = 
Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+        _resourceManager.getOfflineTableConfig(tableName));
+
+    for (TableConfig tableConfig : tableConfigs) {
+      if (tableConfig != null && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+          if 
(InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
 tierConfig.getName())
+              .equals(instancePartitionsName)) {
+            persistInstancePartitionsHelper(instancePartitions);
+            return Collections.singletonMap(tierConfig.getName(), 
instancePartitions);
+          }
+        }
       }
     }
 
@@ -281,22 +346,39 @@ public class PinotInstanceAssignmentRestletResource {
   @ApiOperation(value = "Remove the instance partitions")
   public SuccessResponse removeInstancePartitions(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable
-          InstancePartitionsType instancePartitionsType) {
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") 
@QueryParam("type") @Nullable
+          String instancePartitionsType) {
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType != TableType.REALTIME && (instancePartitionsType == 
InstancePartitionsType.OFFLINE
+    if (tableType != TableType.REALTIME && 
(InstancePartitionsType.OFFLINE.toString().equals(instancePartitionsType)
         || instancePartitionsType == null)) {
       
removeInstancePartitionsHelper(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
     }
     if (tableType != TableType.OFFLINE) {
-      if (instancePartitionsType == InstancePartitionsType.CONSUMING || 
instancePartitionsType == null) {
+      if 
(InstancePartitionsType.CONSUMING.toString().equals(instancePartitionsType)
+          || instancePartitionsType == null) {
         
removeInstancePartitionsHelper(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
       }
-      if (instancePartitionsType == InstancePartitionsType.COMPLETED || 
instancePartitionsType == null) {
+      if 
(InstancePartitionsType.COMPLETED.toString().equals(instancePartitionsType)
+          || instancePartitionsType == null) {
         
removeInstancePartitionsHelper(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
       }
     }
+
+    List<TableConfig> tableConfigs = 
Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+        _resourceManager.getOfflineTableConfig(tableName));
+
+    for (TableConfig tableConfig : tableConfigs) {
+      if (tableConfig != null && 
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+        for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+          if (instancePartitionsType == null || 
instancePartitionsType.equals(tierConfig.getName())) {
+            removeInstancePartitionsHelper(
+                
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+                    tierConfig.getName()));
+          }
+        }
+      }
+    }
     return new SuccessResponse("Instance partitions removed");
   }
 
@@ -315,16 +397,16 @@ public class PinotInstanceAssignmentRestletResource {
   @Path("/tables/{tableName}/replaceInstance")
   @Authenticate(AccessType.CREATE)
   @ApiOperation(value = "Replace an instance in the instance partitions")
-  public Map<InstancePartitionsType, InstancePartitions> replaceInstance(
+  public Map<String, InstancePartitions> replaceInstance(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName,
-      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") 
@Nullable
-          InstancePartitionsType instancePartitionsType,
+      @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name") 
@QueryParam("type") @Nullable
+          String type,
       @ApiParam(value = "Old instance to be replaced", required = true) 
@QueryParam("oldInstanceId")
           String oldInstanceId,
       @ApiParam(value = "New instance to replace with", required = true) 
@QueryParam("newInstanceId")
           String newInstanceId) {
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
-        getInstancePartitions(tableName, instancePartitionsType);
+    Map<String, InstancePartitions> instancePartitionsMap =
+        getInstancePartitions(tableName, type);
     Iterator<InstancePartitions> iterator = 
instancePartitionsMap.values().iterator();
     while (iterator.hasNext()) {
       InstancePartitions instancePartitions = iterator.next();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index cfa1cffe2d..e43b244893 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -683,7 +683,7 @@ public class PinotTableRestletResource {
           });
           return new RebalanceResult(RebalanceResult.Status.IN_PROGRESS,
               "In progress, check controller logs for updates", 
dryRunResult.getInstanceAssignment(),
-              dryRunResult.getSegmentAssignment());
+              dryRunResult.getTierInstanceAssignment(), 
dryRunResult.getSegmentAssignment());
         } else {
           // If dry-run failed or is no-op, return the dry-run result
           return dryRunResult;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 91dc57956f..48418f2bda 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -147,6 +147,7 @@ import org.apache.pinot.spi.config.table.TableStats;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.tenant.Tenant;
@@ -1734,10 +1735,10 @@ public class PinotHelixResourceManager {
       }
     }
 
+    InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
+    List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
     if (!instancePartitionsTypesToAssign.isEmpty()) {
       LOGGER.info("Assigning {} instances to table: {}", 
instancePartitionsTypesToAssign, tableNameWithType);
-      InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
-      List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
       for (InstancePartitionsType instancePartitionsType : 
instancePartitionsTypesToAssign) {
         boolean hasPreConfiguredInstancePartitions =
             TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, 
instancePartitionsType);
@@ -1757,6 +1758,26 @@ public class PinotHelixResourceManager {
         }
       }
     }
+
+    // Process and persist tier config instancePartitions
+    if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+        && tableConfig.getInstanceAssignmentConfigMap() != null) {
+      for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+        if 
(tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName()))
 {
+          if (override || 
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+              
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, 
tierConfig.getName()))
+              == null) {
+            LOGGER.info("Calculating instance partitions for tier: {}, table : 
{}", tierConfig.getName(),
+                tableNameWithType);
+            InstancePartitions instancePartitions =
+                instanceAssignmentDriver.assignInstances(tierConfig.getName(), 
instanceConfigs, null,
+                    
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
+            LOGGER.info("Persisting instance partitions: {}", 
instancePartitions);
+            InstancePartitionsUtils.persistInstancePartitions(_propertyStore, 
instancePartitions);
+          }
+        }
+      }
+    }
   }
 
   public void updateUserConfig(UserConfig userConfig)
@@ -1909,6 +1930,10 @@ public class PinotHelixResourceManager {
     InstancePartitionsUtils.removeInstancePartitions(_propertyStore, 
offlineTableName);
     LOGGER.info("Deleting table {}: Removed instance partitions", 
offlineTableName);
 
+    // Remove tier instance partitions
+    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, 
offlineTableName);
+    LOGGER.info("Deleting table {}: Removed tier instance partitions", 
offlineTableName);
+
     // Remove segment lineage
     SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, 
offlineTableName);
     LOGGER.info("Deleting table {}: Removed segment lineage", 
offlineTableName);
@@ -1968,6 +1993,9 @@ public class PinotHelixResourceManager {
         
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
     LOGGER.info("Deleting table {}: Removed instance partitions", 
realtimeTableName);
 
+    InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore, 
rawTableName);
+    LOGGER.info("Deleting table {}: Removed tier instance partitions", 
realtimeTableName);
+
     // Remove segment lineage
     SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore, 
realtimeTableName);
     LOGGER.info("Deleting table {}: Removed segment lineage", 
realtimeTableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index ba73f7bd6d..7a5c901029 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
@@ -55,15 +56,31 @@ public class InstanceAssignmentDriver {
   public InstancePartitions assignInstances(InstancePartitionsType 
instancePartitionsType,
       List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions 
existingInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
-    LOGGER.info("Starting {} instance assignment for table: {}", 
instancePartitionsType, tableNameWithType);
-
     InstanceAssignmentConfig assignmentConfig =
         
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, 
instancePartitionsType);
+    return getInstancePartitions(
+        
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
+        assignmentConfig, instanceConfigs, existingInstancePartitions);
+  }
+
+  public InstancePartitions assignInstances(String tierName, 
List<InstanceConfig> instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions, 
InstanceAssignmentConfig instanceAssignmentConfig) {
+    return getInstancePartitions(
+        
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
 tierName),
+        instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
+  }
+
+  private InstancePartitions getInstancePartitions(String 
instancePartitionsName,
+      InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> 
instanceConfigs,
+      @Nullable InstancePartitions existingInstancePartitions) {
+    String tableNameWithType = _tableConfig.getTableName();
+    LOGGER.info("Starting {} instance assignment for table {}", 
instancePartitionsName, tableNameWithType);
+
     InstanceTagPoolSelector tagPoolSelector =
-        new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(), 
tableNameWithType);
+        new 
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), 
tableNameWithType);
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = 
tagPoolSelector.selectInstances(instanceConfigs);
 
-    InstanceConstraintConfig constraintConfig = 
assignmentConfig.getConstraintConfig();
+    InstanceConstraintConfig constraintConfig = 
instanceAssignmentConfig.getConstraintConfig();
     List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>();
     if (constraintConfig == null) {
       LOGGER.info("No instance constraint is configured, using default 
hash-based-rotate instance constraint");
@@ -75,10 +92,9 @@ public class InstanceAssignmentDriver {
     }
 
     InstancePartitionSelector instancePartitionSelector =
-        
InstancePartitionSelectorFactory.getInstance(assignmentConfig.getPartitionSelector(),
-            assignmentConfig.getReplicaGroupPartitionConfig(), 
tableNameWithType, existingInstancePartitions);
-    InstancePartitions instancePartitions = new InstancePartitions(
-        
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+        
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
+            instanceAssignmentConfig.getReplicaGroupPartitionConfig(), 
tableNameWithType, existingInstancePartitions);
+    InstancePartitions instancePartitions = new 
InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, 
instancePartitions);
     return instancePartitions;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index a2d6d630d0..4234abc5d5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -31,6 +31,7 @@ import 
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 public class RebalanceResult {
   private final Status _status;
   private final Map<InstancePartitionsType, InstancePartitions> 
_instanceAssignment;
+  private final Map<String, InstancePartitions> _tierInstanceAssignment;
   private final Map<String, Map<String, String>> _segmentAssignment;
   private final String _description;
 
@@ -38,10 +39,12 @@ public class RebalanceResult {
   public RebalanceResult(@JsonProperty(value = "status", required = true) 
Status status,
       @JsonProperty(value = "description", required = true) String description,
       @JsonProperty("instanceAssignment") @Nullable 
Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
+      @JsonProperty("tierInstanceAssignment") @Nullable Map<String, 
InstancePartitions> tierInstanceAssignment,
       @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, 
String>> segmentAssignment) {
     _status = status;
     _description = description;
     _instanceAssignment = instanceAssignment;
+    _tierInstanceAssignment = tierInstanceAssignment;
     _segmentAssignment = segmentAssignment;
   }
 
@@ -60,6 +63,11 @@ public class RebalanceResult {
     return _instanceAssignment;
   }
 
+  @JsonProperty
+  public Map<String, InstancePartitions> getTierInstanceAssignment() {
+    return _tierInstanceAssignment;
+  }
+
   @JsonProperty
   public Map<String, Map<String, String>> getSegmentAssignment() {
     return _segmentAssignment;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index b8257ed450..218193e210 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -163,13 +163,13 @@ public class TableRebalancer {
           
IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType())
 {
         LOGGER.warn("Cannot rebalance table: {} with high-level consumer, 
aborting the rebalance", tableNameWithType);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot 
rebalance table with high-level consumer",
-            null, null);
+            null, null, null);
       }
     } catch (Exception e) {
       LOGGER.warn("Caught exception while validating table config for table: 
{}, aborting the rebalance",
           tableNameWithType, e);
       return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught 
exception while validating table config: " + e,
-          null, null);
+          null, null, null);
     }
 
     // Fetch ideal state
@@ -181,16 +181,17 @@ public class TableRebalancer {
       LOGGER.warn("Caught exception while fetching IdealState for table: {}, 
aborting the rebalance", tableNameWithType,
           e);
       return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught 
exception while fetching IdealState: " + e,
-          null, null);
+          null, null, null);
     }
     if (currentIdealState == null) {
       LOGGER.warn("Cannot find the IdealState for table: {}, aborting the 
rebalance", tableNameWithType);
-      return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find 
the IdealState for table", null, null);
+      return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find 
the IdealState for table", null, null,
+          null);
     }
     if (!currentIdealState.isEnabled() && !downtime) {
       LOGGER.warn("Cannot rebalance disabled table: {} without downtime, 
aborting the rebalance", tableNameWithType);
       return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot 
rebalance disabled table without downtime",
-          null, null);
+          null, null, null);
     }
 
     LOGGER.info("Fetching/computing instance partitions, reassigning instances 
if configured for table: {}",
@@ -205,13 +206,13 @@ public class TableRebalancer {
           "Caught exception while fetching/calculating instance partitions for 
table: {}, aborting the rebalance",
           tableNameWithType, e);
       return new RebalanceResult(RebalanceResult.Status.FAILED,
-          "Caught exception while fetching/calculating instance partitions: " 
+ e, null, null);
+          "Caught exception while fetching/calculating instance partitions: " 
+ e, null, null, null);
     }
 
     // Calculate instance partitions for tiers if configured
     List<Tier> sortedTiers = getSortedTiers(tableConfig);
     Map<String, InstancePartitions> tierToInstancePartitionsMap =
-        getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+        getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, dryRun);
 
     LOGGER.info("Calculating the target assignment for table: {}", 
tableNameWithType);
     SegmentAssignment segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -224,7 +225,8 @@ public class TableRebalancer {
       LOGGER.warn("Caught exception while calculating target assignment for 
table: {}, aborting the rebalance",
           tableNameWithType, e);
       return new RebalanceResult(RebalanceResult.Status.FAILED,
-          "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap, null);
+          "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
+          tierToInstancePartitionsMap, null);
     }
 
     if (currentAssignment.equals(targetAssignment)) {
@@ -233,20 +235,21 @@ public class TableRebalancer {
         if (dryRun) {
           return new RebalanceResult(RebalanceResult.Status.DONE,
               "Instance reassigned in dry-run mode, table is already 
balanced", instancePartitionsMap,
-              targetAssignment);
+              tierToInstancePartitionsMap, targetAssignment);
         } else {
           return new RebalanceResult(RebalanceResult.Status.DONE, "Instance 
reassigned, table is already balanced",
-              instancePartitionsMap, targetAssignment);
+              instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
         }
       } else {
         return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is 
already balanced", instancePartitionsMap,
-            targetAssignment);
+            tierToInstancePartitionsMap, targetAssignment);
       }
     }
 
     if (dryRun) {
       LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target 
assignment", tableNameWithType);
-      return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", 
instancePartitionsMap, targetAssignment);
+      return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", 
instancePartitionsMap,
+          tierToInstancePartitionsMap, targetAssignment);
     }
 
     if (downtime) {
@@ -267,12 +270,13 @@ public class TableRebalancer {
             System.currentTimeMillis() - startTimeMs);
         return new RebalanceResult(RebalanceResult.Status.DONE,
             "Success with downtime (replaced IdealState with the target 
segment assignment, ExternalView might not "
-                + "reach the target segment assignment yet)", 
instancePartitionsMap, targetAssignment);
+                + "reach the target segment assignment yet)", 
instancePartitionsMap, tierToInstancePartitionsMap,
+            targetAssignment);
       } catch (Exception e) {
         LOGGER.warn("Caught exception while updating IdealState for table: {}, 
aborting the rebalance",
             tableNameWithType, e);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught 
exception while updating IdealState: " + e,
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
       }
     }
 
@@ -295,7 +299,7 @@ public class TableRebalancer {
                 + "replicas: {}, aborting the rebalance", 
minReplicasToKeepUpForNoDowntime, tableNameWithType,
             numReplicas);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min 
available replicas config",
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
       }
       minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
     } else {
@@ -321,7 +325,7 @@ public class TableRebalancer {
             tableNameWithType, e);
         return new RebalanceResult(RebalanceResult.Status.FAILED,
             "Caught exception while waiting for ExternalView to converge: " + 
e, instancePartitionsMap,
-            targetAssignment);
+            tierToInstancePartitionsMap, targetAssignment);
       }
 
       // Re-calculate the target assignment if IdealState changed while 
waiting for ExternalView to converge
@@ -353,7 +357,8 @@ public class TableRebalancer {
           try {
             // Re-calculate the instance partitions in case the instance 
configs changed during the rebalance
             instancePartitionsMap = getInstancePartitionsMap(tableConfig, 
reassignInstances, bootstrap, false);
-            tierToInstancePartitionsMap = 
getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+            tierToInstancePartitionsMap =
+                getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, dryRun);
             targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
                 tierToInstancePartitionsMap, rebalanceConfig);
           } catch (Exception e) {
@@ -362,7 +367,7 @@ public class TableRebalancer {
                 tableNameWithType, e);
             return new RebalanceResult(RebalanceResult.Status.FAILED,
                 "Caught exception while re-calculating the target assignment: 
" + e, instancePartitionsMap,
-                targetAssignment);
+                tierToInstancePartitionsMap, targetAssignment);
           }
         } else {
           LOGGER.info(
@@ -384,7 +389,7 @@ public class TableRebalancer {
         return new RebalanceResult(RebalanceResult.Status.DONE,
             "Success with minAvailableReplicas: " + minAvailableReplicas
                 + " (both IdealState and ExternalView should reach the target 
segment assignment)",
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
       }
 
       Map<String, Map<String, String>> nextAssignment =
@@ -412,7 +417,7 @@ public class TableRebalancer {
         LOGGER.warn("Caught exception while updating IdealState for table: {}, 
aborting the rebalance",
             tableNameWithType, e);
         return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught 
exception while updating IdealState: " + e,
-            instancePartitionsMap, targetAssignment);
+            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
       }
     }
   }
@@ -524,31 +529,73 @@ public class TableRebalancer {
   }
 
   @Nullable
-  private Map<String, InstancePartitions> 
getTierToInstancePartitionsMap(String tableNameWithType,
-      @Nullable List<Tier> sortedTiers) {
+  private Map<String, InstancePartitions> 
getTierToInstancePartitionsMap(TableConfig tableConfig,
+      @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean 
bootstrap, boolean dryRun) {
     if (sortedTiers == null) {
       return null;
     }
     Map<String, InstancePartitions> tierToInstancePartitionsMap = new 
HashMap<>();
     for (Tier tier : sortedTiers) {
       LOGGER.info("Fetching/computing instance partitions for tier: {} of 
table: {}", tier.getName(),
-          tableNameWithType);
-      tierToInstancePartitionsMap.put(tier.getName(), 
getInstancePartitionsForTier(tier, tableNameWithType));
+          tableConfig.getTableName());
+      tierToInstancePartitionsMap.put(tier.getName(),
+          getInstancePartitionsForTier(tableConfig, tier, reassignInstances, 
bootstrap, dryRun));
     }
     return tierToInstancePartitionsMap;
   }
 
   /**
-   * Creates a default instance assignment for the tier.
-   * TODO: We only support default server-tag based assignment currently.
-   *  In next iteration, we will add InstanceAssignmentConfig to the 
TierConfig and also support persisting of the
-   *  InstancePartitions to zk.
-   *  Then we'll be able to support replica group assignment while creating 
InstancePartitions for tiers
+   * Computes the instance partitions for the given tier. If table's 
instanceAssignmentConfigMap has an entry for the
+   * tier, it's used to calculate the instance partitions. Else default 
instance partitions are returned
    */
-  private InstancePartitions getInstancePartitionsForTier(Tier tier, String 
tableNameWithType) {
+  private InstancePartitions getInstancePartitionsForTier(TableConfig 
tableConfig, Tier tier, boolean reassignInstances,
+      boolean bootstrap, boolean dryRun) {
     PinotServerTierStorage storage = (PinotServerTierStorage) 
tier.getStorage();
-    return 
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, 
tableNameWithType,
-        tier.getName(), storage.getServerTag());
+    InstancePartitions defaultInstancePartitions =
+        
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, 
tableConfig.getTableName(),
+            tier.getName(), storage.getServerTag());
+
+    if (tableConfig.getInstanceAssignmentConfigMap() == null || 
!tableConfig.getInstanceAssignmentConfigMap()
+        .containsKey(tier.getName())) {
+      LOGGER.info("Skipping fetching/computing instance partitions for tier {} 
for table: {}", tier.getName(),
+          tableConfig.getTableName());
+      if (!dryRun) {
+        String instancePartitionsName =
+            
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
 tier.getName());
+        LOGGER.info("Removing instance partitions: {} from ZK if it exists", 
instancePartitionsName);
+        
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(),
 instancePartitionsName);
+      }
+      return defaultInstancePartitions;
+    }
+
+    String tableNameWithType = tableConfig.getTableName();
+    String instancePartitionsName =
+        
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
 tier.getName());
+    if (reassignInstances) {
+      // Set existing instance partition to null if bootstrap mode is enabled, 
so that the instance partition
+      // map can be fully recalculated.
+      InstancePartitions existingInstancePartitions = bootstrap ? null
+          : 
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+              instancePartitionsName);
+      InstanceAssignmentDriver instanceAssignmentDriver = new 
InstanceAssignmentDriver(tableConfig);
+      InstancePartitions instancePartitions = 
instanceAssignmentDriver.assignInstances(tier.getName(),
+          
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
 true),
+          existingInstancePartitions, 
tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
+      if (!dryRun) {
+        LOGGER.info("Persisting instance partitions: {} to ZK", 
instancePartitions);
+        
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
 instancePartitions);
+      }
+      return instancePartitions;
+    }
+
+    InstancePartitions instancePartitions =
+        
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+            
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, 
tier.getName()));
+    if (instancePartitions != null) {
+      return instancePartitions;
+    }
+
+    return defaultInstancePartitions;
   }
 
   private IdealState waitForExternalViewToConverge(String tableNameWithType, 
boolean bestEfforts,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index 38334bb25a..ef95da5135 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -21,15 +21,19 @@ package org.apache.pinot.controller.api;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -59,6 +63,8 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
 
+  private static final String TIER_NAME = "tier1";
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -114,13 +120,13 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
         new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), 
false, 0, null), null,
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
     offlineTableConfig.setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE, 
offlineInstanceAssignmentConfig));
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), 
offlineInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(offlineTableConfig);
 
     // OFFLINE instance partitions should be generated
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = 
getInstancePartitionsMap();
+    Map<String, InstancePartitions> instancePartitionsMap = 
getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 1);
-    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
@@ -132,72 +138,108 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
         new 
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), 
false, 0, null), null,
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
     realtimeTableConfig.setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.CONSUMING, 
consumingInstanceAssignmentConfig));
+        Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), 
consumingInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
 
     // CONSUMING instance partitions should be generated
     instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 2);
-    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
-    InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    InstancePartitions consumingInstancePartitions =
+        instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
     assertNotNull(consumingInstancePartitions);
     assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
     assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1);
     String consumingInstanceId = consumingInstancePartitions.getInstances(0, 
0).get(0);
 
+    // Add tier config and tier instance assignment config to the offline 
table config
+    offlineTableConfig.setTierConfigsList(Collections.singletonList(
+        new TierConfig(TIER_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"7d", null,
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, 
TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), null,
+            null)));
+    InstanceAssignmentConfig tierInstanceAssignmentConfig = new 
InstanceAssignmentConfig(
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), 
false, 0, null), null,
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
+    Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new 
HashMap<>();
+    instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), 
offlineInstanceAssignmentConfig);
+    instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
+    
offlineTableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap);
+    _helixResourceManager.setExistingTableConfig(offlineTableConfig);
+
+    // tier instance partitions should be generated
+    Map<String, InstancePartitions> tierInstancePartitionsMap = 
getInstancePartitionsMap();
+    assertEquals(tierInstancePartitionsMap.size(), 3);
+    InstancePartitions tierInstancePartitions = 
tierInstancePartitionsMap.get(TIER_NAME);
+    assertNotNull(tierInstancePartitions);
+    assertEquals(tierInstancePartitions.getNumReplicaGroups(), 1);
+    assertEquals(tierInstancePartitions.getNumPartitions(), 1);
+    assertEquals(tierInstancePartitions.getInstances(0, 0).size(), 1);
+
     // Use OFFLINE instance assignment config as the COMPLETED instance 
assignment config
-    realtimeTableConfig.setInstanceAssignmentConfigMap(
-        new TreeMap<InstancePartitionsType, InstanceAssignmentConfig>() {{
-          put(InstancePartitionsType.CONSUMING, 
consumingInstanceAssignmentConfig);
-          put(InstancePartitionsType.COMPLETED, 
offlineInstanceAssignmentConfig);
-        }});
+    realtimeTableConfig.setInstanceAssignmentConfigMap(new TreeMap<String, 
InstanceAssignmentConfig>() {{
+      put(InstancePartitionsType.CONSUMING.toString(), 
consumingInstanceAssignmentConfig);
+      put(InstancePartitionsType.COMPLETED.toString(), 
offlineInstanceAssignmentConfig);
+    }});
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
 
     // COMPLETED instance partitions should be generated
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 3);
-    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertEquals(instancePartitionsMap.size(), 4);
+    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
-    consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
     assertNotNull(consumingInstancePartitions);
     assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
     assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(consumingInstanceId));
-    InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+    InstancePartitions completedInstancePartitions =
+        instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
     assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(completedInstancePartitions.getNumPartitions(), 1);
     assertEquals(completedInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+    InstancePartitions tInstancePartitions = 
instancePartitionsMap.get(TIER_NAME);
+    assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+    assertEquals(tInstancePartitions.getNumPartitions(), 1);
+    assertEquals(tInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
 
     // Test fetching instance partitions by table name with type suffix
     instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
         
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME),
             null)));
-    assertEquals(instancePartitionsMap.size(), 1);
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+    assertEquals(instancePartitionsMap.size(), 2);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
     instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
         
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
             null)));
     assertEquals(instancePartitionsMap.size(), 2);
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // Test fetching instance partitions by table name and instance partitions 
type
     for (InstancePartitionsType instancePartitionsType : 
InstancePartitionsType.values()) {
-      instancePartitionsMap = deserializeInstancePartitionsMap(
-          
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 instancePartitionsType)));
+      instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
+          _controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, 
instancePartitionsType.toString())));
       assertEquals(instancePartitionsMap.size(), 1);
-      
assertEquals(instancePartitionsMap.get(instancePartitionsType).getInstancePartitionsName(),
+      
assertEquals(instancePartitionsMap.get(instancePartitionsType.toString()).getInstancePartitionsName(),
           instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME));
     }
 
+    // Test fetching instance partitions by table name and tier name
+    instancePartitionsMap = deserializeInstancePartitionsMap(
+        
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 TIER_NAME)));
+    assertEquals(instancePartitionsMap.size(), 1);
+    
assertEquals(instancePartitionsMap.get(TIER_NAME).getInstancePartitionsName(),
+        
InstancePartitionsUtils.getInstancePartitionsNameForTier(RAW_TABLE_NAME, 
TIER_NAME));
+
     // Remove the instance partitions for both offline and real-time table
     
sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null));
     try {
@@ -210,21 +252,25 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Assign instances without instance partitions type (dry run)
     instancePartitionsMap = deserializeInstancePartitionsMap(
         
sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, 
null, true), null));
-    assertEquals(instancePartitionsMap.size(), 3);
-    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+    assertEquals(instancePartitionsMap.size(), 4);
+    offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
     assertNotNull(offlineInstancePartitions);
     assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
-    consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
     assertNotNull(consumingInstancePartitions);
     assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
     assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(consumingInstanceId));
-    completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+    completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
     assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(completedInstancePartitions.getNumPartitions(), 1);
     assertEquals(completedInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+    tInstancePartitions = instancePartitionsMap.get(TIER_NAME);
+    assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+    assertEquals(tInstancePartitions.getNumPartitions(), 1);
+    assertEquals(tInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
 
     // Instance partitions should not be persisted
     try {
@@ -239,34 +285,36 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
 
     // Instance partitions should be persisted
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 3);
+    assertEquals(instancePartitionsMap.size(), 4);
 
     // Remove the instance partitions for real-time table
     sendDeleteRequest(
         
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
             null));
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 1);
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+    assertEquals(instancePartitionsMap.size(), 2);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
 
     // Assign instances for COMPLETED segments
     instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
         _controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, 
InstancePartitionsType.COMPLETED, false), null));
     assertEquals(instancePartitionsMap.size(), 1);
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // There should be OFFLINE and COMPLETED instance partitions persisted
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 2);
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    assertEquals(instancePartitionsMap.size(), 3);
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+    assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // Replace OFFLINE instance with CONSUMING instance for COMPLETED instance 
partitions
     instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
         _controllerRequestURLBuilder.forInstanceReplace(RAW_TABLE_NAME, 
InstancePartitionsType.COMPLETED,
             offlineInstanceId, consumingInstanceId), null));
     assertEquals(instancePartitionsMap.size(), 1);
-    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
 0),
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0,
 0),
         Collections.singletonList(consumingInstanceId));
 
     // Replace the instance again using real-time table name (old instance 
does not exist)
@@ -284,26 +332,27 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
         
sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null),
             consumingInstancePartitions.toJsonString()));
     assertEquals(instancePartitionsMap.size(), 1);
-    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
 0),
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0,
 0),
         Collections.singletonList(consumingInstanceId));
 
     // OFFLINE instance partitions should have OFFLINE instance, CONSUMING and 
COMPLETED instance partitions should have
     // CONSUMING instance
     instancePartitionsMap = getInstancePartitionsMap();
-    assertEquals(instancePartitionsMap.size(), 3);
-    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0,
 0),
+    assertEquals(instancePartitionsMap.size(), 4);
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString()).getInstances(0,
 0),
         Collections.singletonList(offlineInstanceId));
-    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
 0),
+    assertEquals(instancePartitionsMap.get(TIER_NAME).getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0,
 0),
         Collections.singletonList(consumingInstanceId));
-    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
 0),
+    
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0,
 0),
         Collections.singletonList(consumingInstanceId));
 
     // Delete the offline table
     _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
     instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 2);
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
-    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+    
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
 
     // Delete the real-time table
     _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
@@ -315,18 +364,16 @@ public class 
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     }
   }
 
-  private Map<InstancePartitionsType, InstancePartitions> 
getInstancePartitionsMap()
+  private Map<String, InstancePartitions> getInstancePartitionsMap()
       throws Exception {
     return deserializeInstancePartitionsMap(
         
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null)));
   }
 
-  private Map<InstancePartitionsType, InstancePartitions> 
deserializeInstancePartitionsMap(
-      String instancePartitionsMapString)
+  private Map<String, InstancePartitions> 
deserializeInstancePartitionsMap(String instancePartitionsMapString)
       throws Exception {
-    return JsonUtils.stringToObject(instancePartitionsMapString,
-        new TypeReference<Map<InstancePartitionsType, InstancePartitions>>() {
-        });
+    return JsonUtils.stringToObject(instancePartitionsMapString, new 
TypeReference<Map<String, InstancePartitions>>() {
+    });
   }
 
   @AfterClass
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 4715271fc5..53bd2da317 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -352,7 +352,7 @@ public class InstanceAssignmentTest {
     InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 
0, 0, false);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig))).build();
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
 
@@ -405,7 +405,7 @@ public class InstanceAssignmentTest {
 
     // Select all 3 pools in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
@@ -427,7 +427,7 @@ public class InstanceAssignmentTest {
 
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, 
Arrays.asList(0, 1));
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -449,7 +449,7 @@ public class InstanceAssignmentTest {
     // Assign instances from 2 pools to 3 replica-groups
     numReplicaGroups = numPools;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -479,7 +479,7 @@ public class InstanceAssignmentTest {
     numPools = 2;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, true);
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, 
null);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
     // Reset the instance configs to have only two pools.
     instanceConfigs.clear();
@@ -528,7 +528,7 @@ public class InstanceAssignmentTest {
 
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, 
Arrays.asList(0, 1));
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
 
     // Get the latest existingInstancePartitions from last computation.
@@ -555,7 +555,7 @@ public class InstanceAssignmentTest {
     // Assign instances from 2 pools to 3 replica-groups
     numReplicaGroups = 3;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, true);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
 
     // Get the latest existingInstancePartitions from last computation.
@@ -634,7 +634,7 @@ public class InstanceAssignmentTest {
     // Reduce number of replica groups from 3 to 2.
     numReplicaGroups = 2;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 
numReplicaGroups, 0, 0, 0, true);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig)));
 
     // Get the latest existingInstancePartitions from last computation.
@@ -761,7 +761,7 @@ public class InstanceAssignmentTest {
     InstanceTagPoolConfig tagPoolConfig = new 
InstanceTagPoolConfig(OFFLINE_TAG, false, 0, null);
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // No instance with correct tag
@@ -791,7 +791,7 @@ public class InstanceAssignmentTest {
 
     // Enable pool
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // No instance has correct pool configured
@@ -825,7 +825,7 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(0, 0), expectedInstances);
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Ask for too many pools
@@ -837,7 +837,7 @@ public class InstanceAssignmentTest {
     }
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, 
Arrays.asList(0, 2));
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Ask for pool that does not exist
@@ -850,7 +850,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Ask for too many instances
@@ -863,7 +863,7 @@ public class InstanceAssignmentTest {
 
     // Enable replica-group
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Number of replica-groups must be positive
@@ -875,7 +875,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Ask for too many replica-groups
@@ -888,7 +888,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Ask for too many instances
@@ -900,7 +900,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Ask for too many instances per partition
@@ -913,7 +913,7 @@ public class InstanceAssignmentTest {
     }
 
     replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
@@ -952,7 +952,7 @@ public class InstanceAssignmentTest {
 
     try {
       tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-          
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+          
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
               new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
@@ -979,7 +979,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, 0, 0, false);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
                 
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1011,7 +1011,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, 0, 0, false);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
                 
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1051,7 +1051,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, 0, 0, false);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
                 
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1089,7 +1089,7 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
             numInstancesPerPartition, false);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
                 
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
@@ -1161,7 +1161,7 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 
numInstancesPerReplicaGroup, numPartitions,
             numInstancesPerPartition, true);
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-        Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
                 
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -1242,7 +1242,7 @@ public class InstanceAssignmentTest {
     SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(
         Collections.singletonMap(partitionColumnName, new 
ColumnPartitionConfig("Modulo", numPartitionsSegment, null)));
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
-            Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
@@ -1316,7 +1316,7 @@ public class InstanceAssignmentTest {
         new InstanceConstraintConfig(Arrays.asList("constraint1", 
"constraint2"));
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1372,7 +1372,7 @@ public class InstanceAssignmentTest {
     instanceConstraintConfig = new 
InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1439,7 +1439,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1505,7 +1505,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1576,7 +1576,7 @@ public class InstanceAssignmentTest {
     instanceConstraintConfig = new 
InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1627,7 +1627,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
@@ -1691,7 +1691,7 @@ public class InstanceAssignmentTest {
     // Do not rotate pool sequence (for testing)
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME 
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+            
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, 
instanceConstraintConfig, replicaPartitionConfig,
                     
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
             .build();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 4c006967d1..4b9e869dcf 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -196,7 +196,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), 
false, 0, null);
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 
0, false);
-    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
@@ -403,6 +403,131 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         assertTrue(instance.startsWith(expectedPrefix));
       }
     }
+    _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
+  }
+
+  @Test
+  public void testRebalanceWithTiersAndInstanceAssignments()
+      throws Exception {
+    int numServers = 3;
+    for (int i = 0; i < numServers; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(
+          "replicaAssignment" + NO_TIER_NAME + "_" + SERVER_INSTANCE_ID_PREFIX 
+ i, false);
+    }
+    _helixResourceManager.createServerTenant(
+        new Tenant(TenantRole.SERVER, "replicaAssignment" + NO_TIER_NAME, 
numServers, numServers, 0));
+
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+            .setServerTenant("replicaAssignment" + NO_TIER_NAME).build();
+    // Create the table
+    _helixResourceManager.addTable(tableConfig);
+
+    // Add the segments
+    int numSegments = 10;
+    long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i,
+              nowInDays), null);
+    }
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add 6 nodes tierA
+    for (int i = 0; i < 6; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(
+          "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX 
+ i, false);
+    }
+    _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, 
"replicaAssignment" + TIER_A_NAME, 6, 6, 0));
+    // rebalance is NOOP and no change in assignment caused by new instances
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    // Segment assignment should not change
+    assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+    // add tier config
+    tableConfig.setTierConfigsList(Lists.newArrayList(
+        new TierConfig(TIER_A_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"0d", null,
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, "replicaAssignment" + 
TIER_A_NAME + "_OFFLINE", null, null)));
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    // rebalance should change assignment
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+    // check that segments have moved to tier a
+    Map<String, Map<String, String>> tierSegmentAssignment = 
rebalanceResult.getSegmentAssignment();
+    for (Map.Entry<String, Map<String, String>> entry : 
tierSegmentAssignment.entrySet()) {
+      Map<String, String> instanceStateMap = entry.getValue();
+      for (String instance : instanceStateMap.keySet()) {
+        assertTrue(instance.startsWith("replicaAssignment" + TIER_A_NAME + "_" 
+ SERVER_INSTANCE_ID_PREFIX));
+      }
+    }
+
+    // Test rebalance with tier instance assignment
+    InstanceTagPoolConfig tagPoolConfig =
+        new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + 
TIER_A_NAME), false, 0,
+            null);
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+        new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 
0, false);
+    
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
+        new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
+    _helixResourceManager.updateTableConfig(tableConfig);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
+
+    InstancePartitions instancePartitions = 
rebalanceResult.getTierInstanceAssignment().get(TIER_A_NAME);
+
+    // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
+    // [i2, i3, i4, i5, i0, i1]
+    //  r0  r1  r2  r0  r1  r2
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + 
SERVER_INSTANCE_ID_PREFIX + 2,
+            "replicaAssignment" + TIER_A_NAME + "_" + 
SERVER_INSTANCE_ID_PREFIX + 5));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + 
SERVER_INSTANCE_ID_PREFIX + 3,
+            "replicaAssignment" + TIER_A_NAME + "_" + 
SERVER_INSTANCE_ID_PREFIX + 0));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" + 
SERVER_INSTANCE_ID_PREFIX + 4,
+            "replicaAssignment" + TIER_A_NAME + "_" + 
SERVER_INSTANCE_ID_PREFIX + 1));
+
+    // The assignment are based on replica-group 0 and mirrored to all the 
replica-groups, so server of index 0, 1, 5
+    // should have the same segments assigned, and server of index 2, 3, 4 
should have the same segments assigned, each
+    // with 5 segments
+    Map<String, Map<String, String>> newSegmentAssignment = 
rebalanceResult.getSegmentAssignment();
+    int numSegmentsOnServer0 = 0;
+    for (int i = 0; i < numSegments; i++) {
+      String segmentName = SEGMENT_NAME_PREFIX + i;
+      Map<String, String> instanceStateMap = 
newSegmentAssignment.get(segmentName);
+      assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+      if (instanceStateMap.containsKey("replicaAssignment" + TIER_A_NAME + "_" 
+ SERVER_INSTANCE_ID_PREFIX + 0)) {
+        numSegmentsOnServer0++;
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + 
"_" + SERVER_INSTANCE_ID_PREFIX + 0),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + 
"_" + SERVER_INSTANCE_ID_PREFIX + 1),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + 
"_" + SERVER_INSTANCE_ID_PREFIX + 5),
+            ONLINE);
+      } else {
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + 
"_" + SERVER_INSTANCE_ID_PREFIX + 2),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + 
"_" + SERVER_INSTANCE_ID_PREFIX + 3),
+            ONLINE);
+        assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME + 
"_" + SERVER_INSTANCE_ID_PREFIX + 4),
+            ONLINE);
+      }
+    }
+    assertEquals(numSegmentsOnServer0, numSegments / 2);
 
     _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 046c0175e7..361999a881 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -595,7 +595,8 @@ public final class TableConfigUtils {
       return;
     }
     for (InstancePartitionsType instancePartitionsType : 
tableConfig.getInstancePartitionsMap().keySet()) {
-      
Preconditions.checkState(!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType),
+      Preconditions.checkState(
+          
!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()),
           String.format("Both InstanceAssignmentConfigMap and 
InstancePartitionsMap set for %s",
               instancePartitionsType));
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 88a829d846..ee14ef2b4e 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1724,10 +1724,10 @@ public class TableConfigUtilsTest {
     // Call validate with a table-config with instance partitions set but not 
instance assignment config
     
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);
 
-    TableConfig invalidTableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-            
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, 
"test_OFFLINE"))
-            
.setInstanceAssignmentConfigMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, 
instanceAssignmentConfig))
+    TableConfig invalidTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, 
"test_OFFLINE"))
+        .setInstanceAssignmentConfigMap(
+            ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), 
instanceAssignmentConfig))
             .build();
     try {
       // Call validate with instance partitions and config set for the same 
type
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index bae7b7c798..91a54bc942 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -89,7 +89,7 @@ public class TableConfig extends BaseJsonConfig {
   private TableTaskConfig _taskConfig;
   private RoutingConfig _routingConfig;
   private QueryConfig _queryConfig;
-  private Map<InstancePartitionsType, InstanceAssignmentConfig> 
_instanceAssignmentConfigMap;
+  private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
 
   @JsonPropertyDescription(value = "Point to an existing instance partitions")
   private Map<InstancePartitionsType, String> _instancePartitionsMap;
@@ -128,7 +128,7 @@ public class TableConfig extends BaseJsonConfig {
       @JsonProperty(ROUTING_CONFIG_KEY) @Nullable RoutingConfig routingConfig,
       @JsonProperty(QUERY_CONFIG_KEY) @Nullable QueryConfig queryConfig,
       @JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
-          Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap,
+          Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap,
       @JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List<FieldConfig> 
fieldConfigList,
       @JsonProperty(UPSERT_CONFIG_KEY) @Nullable UpsertConfig upsertConfig,
       @JsonProperty(DEDUP_CONFIG_KEY) @Nullable DedupConfig dedupConfig,
@@ -267,12 +267,12 @@ public class TableConfig extends BaseJsonConfig {
 
   @JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY)
   @Nullable
-  public Map<InstancePartitionsType, InstanceAssignmentConfig> 
getInstanceAssignmentConfigMap() {
+  public Map<String, InstanceAssignmentConfig> 
getInstanceAssignmentConfigMap() {
     return _instanceAssignmentConfigMap;
   }
 
   public void setInstanceAssignmentConfigMap(
-      Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap) {
+      Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
     _instanceAssignmentConfigMap = instanceAssignmentConfigMap;
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index a687a3eafd..2a00acc28f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -401,7 +401,7 @@ public class ControllerRequestURLBuilder {
     return url;
   }
 
-  public String forInstancePartitions(String tableName, @Nullable 
InstancePartitionsType instancePartitionsType) {
+  public String forInstancePartitions(String tableName, @Nullable String 
instancePartitionsType) {
     String url = StringUtil.join("/", _baseUrl, "tables", tableName, 
"instancePartitions");
     if (instancePartitionsType != null) {
       url += "?type=" + instancePartitionsType;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 4efa7db452..4916668eed 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -109,7 +109,7 @@ public class TableConfigBuilder {
   private TableTaskConfig _taskConfig;
   private RoutingConfig _routingConfig;
   private QueryConfig _queryConfig;
-  private Map<InstancePartitionsType, InstanceAssignmentConfig> 
_instanceAssignmentConfigMap;
+  private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
   private Map<InstancePartitionsType, String> _instancePartitionsMap;
   private Map<String, SegmentAssignmentConfig> _segmentAssignmentConfigMap;
   private List<FieldConfig> _fieldConfigList;
@@ -344,7 +344,7 @@ public class TableConfigBuilder {
   }
 
   public TableConfigBuilder setInstanceAssignmentConfigMap(
-      Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap) {
+      Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
     _instanceAssignmentConfigMap = instanceAssignmentConfigMap;
     return this;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to