This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c6c0fa243b Refactor LoaderTest and avoid using setters for 
IndexLoadingConfig (#14149)
c6c0fa243b is described below

commit c6c0fa243b7be407c37749e45512f156297e271a
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Oct 2 19:29:23 2024 -0700

    Refactor LoaderTest and avoid using setters for IndexLoadingConfig (#14149)
---
 .../local/segment/index/h3/H3IndexType.java        |  10 +-
 .../segment/index/loader/IndexLoadingConfig.java   | 230 +-----
 .../local/segment/index/map/MapIndexType.java      |  21 +-
 .../readers/vector/HnswVectorIndexReader.java      |   5 +-
 .../segment/index/vector/VectorIndexType.java      |  44 +-
 .../segment/store/SingleFileIndexDirectory.java    |   8 +
 .../local/segment/index/loader/LoaderTest.java     | 795 +++++++++++----------
 .../org/apache/pinot/spi/config/table/FSTType.java |   2 +-
 .../pinot/spi/config/table/IndexingConfig.java     |  37 +-
 9 files changed, 448 insertions(+), 704 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
index e5b301ec16..9cb51b60eb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
@@ -30,8 +30,6 @@ import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.realtime.impl.geospatial.MutableH3Index;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
-import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.H3IndexHandler;
 import 
org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -59,8 +57,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class H3IndexType extends AbstractIndexType<H3IndexConfig, 
H3IndexReader, GeoSpatialIndexCreator>
-  implements ConfigurableFromIndexLoadingConfig<H3IndexConfig> {
+public class H3IndexType extends AbstractIndexType<H3IndexConfig, 
H3IndexReader, GeoSpatialIndexCreator> {
   public static final String INDEX_DISPLAY_NAME = "h3";
   private static final List<String> EXTENSIONS = 
Collections.singletonList(V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
 
@@ -73,11 +70,6 @@ public class H3IndexType extends 
AbstractIndexType<H3IndexConfig, H3IndexReader,
     return H3IndexConfig.class;
   }
 
-  @Override
-  public Map<String, H3IndexConfig> fromIndexLoadingConfig(IndexLoadingConfig 
indexLoadingConfig) {
-    return indexLoadingConfig.getH3IndexConfigs();
-  }
-
   @Override
   public H3IndexConfig getDefaultConfig() {
     return H3IndexConfig.DISABLED;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 8c6c1604b7..bb4e0543fc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.segment.local.segment.index.loader;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -42,8 +41,6 @@ import 
org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
 import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.RangeIndexConfig;
-import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
-import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
@@ -53,7 +50,6 @@ import 
org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
-import org.apache.pinot.spi.config.table.MapIndexConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -85,19 +81,16 @@ public class IndexLoadingConfig {
   private Set<String> _fstIndexColumns = new HashSet<>();
   private FSTType _fstIndexType = FSTType.LUCENE;
   private Map<String, JsonIndexConfig> _jsonIndexConfigs = new HashMap<>();
-  private Map<String, MapIndexConfig> _mapIndexConfigs = new HashMap<>();
-  private Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
-  private Map<String, VectorIndexConfig> _vectorIndexConfigs = new HashMap<>();
-  private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace 
this by _noDictionaryConfig.
+  private final Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: 
replace this by _noDictionaryConfig.
   private final Map<String, String> _noDictionaryConfig = new HashMap<>();
   private final Set<String> _varLengthDictionaryColumns = new HashSet<>();
   private Set<String> _onHeapDictionaryColumns = new HashSet<>();
-  private Set<String> _forwardIndexDisabledColumns = new HashSet<>();
+  private final Set<String> _forwardIndexDisabledColumns = new HashSet<>();
   private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>();
   private boolean _enableDynamicStarTreeCreation;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private boolean _enableDefaultStarTree;
-  private Map<String, CompressionCodec> _compressionConfigs = new HashMap<>();
+  private final Map<String, CompressionCodec> _compressionConfigs = new 
HashMap<>();
   private Map<String, FieldIndexConfigs> _indexConfigsByColName = new 
HashMap<>();
 
   private SegmentVersion _segmentVersion;
@@ -224,8 +217,6 @@ public class IndexLoadingConfig {
     extractCompressionConfigs(tableConfig);
     extractTextIndexColumnsFromTableConfig(tableConfig);
     extractFSTIndexColumnsFromTableConfig(tableConfig);
-    extractH3IndexConfigsFromTableConfig(tableConfig);
-    extractVectorIndexConfigsFromTableConfig(tableConfig);
     extractForwardIndexDisabledColumnsFromTableConfig(tableConfig);
 
     Map<String, String> noDictionaryConfig = 
indexingConfig.getNoDictionaryConfig();
@@ -395,28 +386,6 @@ public class IndexLoadingConfig {
     }
   }
 
-  private void extractH3IndexConfigsFromTableConfig(TableConfig tableConfig) {
-    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
-    if (fieldConfigList != null) {
-      for (FieldConfig fieldConfig : fieldConfigList) {
-        if (fieldConfig.getIndexTypes().contains(FieldConfig.IndexType.H3)) {
-          _h3IndexConfigs.put(fieldConfig.getName(), new 
H3IndexConfig(fieldConfig.getProperties()));
-        }
-      }
-    }
-  }
-
-  private void extractVectorIndexConfigsFromTableConfig(TableConfig 
tableConfig) {
-    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
-    if (fieldConfigList != null) {
-      for (FieldConfig fieldConfig : fieldConfigList) {
-        if 
(fieldConfig.getIndexTypes().contains(FieldConfig.IndexType.VECTOR)) {
-          _vectorIndexConfigs.put(fieldConfig.getName(), new 
VectorIndexConfig(fieldConfig.getProperties()));
-        }
-      }
-    }
-  }
-
   private void extractFromInstanceConfig(InstanceDataManagerConfig 
instanceDataManagerConfig) {
     if (instanceDataManagerConfig == null) {
       return;
@@ -488,20 +457,6 @@ public class IndexLoadingConfig {
     return unmodifiable(_sortedColumns);
   }
 
-  /**
-   * For tests only.
-   */
-  @VisibleForTesting
-  public void setSortedColumn(String sortedColumn) {
-    if (sortedColumn != null) {
-      _sortedColumns = new ArrayList<>();
-      _sortedColumns.add(sortedColumn);
-    } else {
-      _sortedColumns = Collections.emptyList();
-    }
-    _dirty = true;
-  }
-
   public Set<String> getInvertedIndexColumns() {
     return unmodifiable(_invertedIndexColumns);
   }
@@ -510,11 +465,6 @@ public class IndexLoadingConfig {
     return unmodifiable(_rangeIndexColumns);
   }
 
-  public void addRangeIndexColumn(String... columns) {
-    _rangeIndexColumns.addAll(Arrays.asList(columns));
-    _dirty = true;
-  }
-
   public int getRangeIndexVersion() {
     return _rangeIndexVersion;
   }
@@ -543,171 +493,67 @@ public class IndexLoadingConfig {
     return unmodifiable(_jsonIndexConfigs);
   }
 
-  public Map<String, MapIndexConfig> getMapIndexConfigs() {
-    return unmodifiable(_mapIndexConfigs);
-  }
-
-  public Map<String, H3IndexConfig> getH3IndexConfigs() {
-    return unmodifiable(_h3IndexConfigs);
-  }
-
-  public Map<String, VectorIndexConfig> getVectorIndexConfigs() {
-    return unmodifiable(_vectorIndexConfigs);
-  }
-
   public Map<String, Map<String, String>> getColumnProperties() {
     return unmodifiable(_columnProperties);
   }
 
+  @Deprecated
+  @VisibleForTesting
   public void setColumnProperties(Map<String, Map<String, String>> 
columnProperties) {
     _columnProperties = new HashMap<>(columnProperties);
     _dirty = true;
   }
 
-  /**
-   * For tests only.
-   */
+  @Deprecated
   @VisibleForTesting
   public void setInvertedIndexColumns(Set<String> invertedIndexColumns) {
     _invertedIndexColumns = new HashSet<>(invertedIndexColumns);
     _dirty = true;
   }
 
+  @Deprecated
   @VisibleForTesting
   public void addInvertedIndexColumns(String... invertedIndexColumns) {
     _invertedIndexColumns.addAll(Arrays.asList(invertedIndexColumns));
     _dirty = true;
   }
 
-  @VisibleForTesting
-  public void addInvertedIndexColumns(Collection<String> invertedIndexColumns) 
{
-    _invertedIndexColumns.addAll(invertedIndexColumns);
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void removeInvertedIndexColumns(String... invertedIndexColumns) {
-    removeInvertedIndexColumns(Arrays.asList(invertedIndexColumns));
-    assert _dirty;
-  }
-
-  @VisibleForTesting
-  public void removeInvertedIndexColumns(Collection<String> 
invertedIndexColumns) {
-    _invertedIndexColumns.removeAll(invertedIndexColumns);
-    _dirty = true;
-  }
-
-  /**
-   * For tests only.
-   * Used by segmentPreProcessorTest to set raw columns.
-   */
-  @VisibleForTesting
-  public void setNoDictionaryColumns(Set<String> noDictionaryColumns) {
-    _noDictionaryColumns = new HashSet<>(noDictionaryColumns);
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void removeNoDictionaryColumns(String... noDictionaryColumns) {
-    Arrays.asList(noDictionaryColumns).forEach(_noDictionaryColumns::remove);
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void removeNoDictionaryColumns(Collection<String> 
noDictionaryColumns) {
-    noDictionaryColumns.forEach(_noDictionaryColumns::remove);
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void addNoDictionaryColumns(String... noDictionaryColumns) {
-    _noDictionaryColumns.addAll(Arrays.asList(noDictionaryColumns));
-    _dirty = true;
-  }
-
+  @Deprecated
   @VisibleForTesting
   public void addNoDictionaryColumns(Collection<String> noDictionaryColumns) {
     _noDictionaryColumns.addAll(noDictionaryColumns);
     _dirty = true;
   }
 
-  /**
-   * For tests only.
-   * Used by segmentPreProcessorTest to set compression configs.
-   */
-  @VisibleForTesting
-  public void setCompressionConfigs(Map<String, CompressionCodec> 
compressionConfigs) {
-    _compressionConfigs = new HashMap<>(compressionConfigs);
-    _dirty = true;
-  }
-
-  /**
-   * For tests only.
-   */
+  @Deprecated
   @VisibleForTesting
   public void setRangeIndexColumns(Set<String> rangeIndexColumns) {
     _rangeIndexColumns = new HashSet<>(rangeIndexColumns);
     _dirty = true;
   }
 
-  public void addRangeIndexColumns(String... rangeIndexColumns) {
-    _rangeIndexColumns.addAll(Arrays.asList(rangeIndexColumns));
-    _dirty = true;
-  }
-
-  public void removeRangeIndexColumns(String... rangeIndexColumns) {
-    Arrays.asList(rangeIndexColumns).forEach(_rangeIndexColumns::remove);
-    _dirty = true;
-  }
-
-  /**
-   * Used directly from text search unit test code since the test code
-   * doesn't really have a table config and is directly testing the
-   * query execution code of text search using data from generated segments
-   * and then loading those segments.
-   */
+  @Deprecated
   @VisibleForTesting
   public void setTextIndexColumns(Set<String> textIndexColumns) {
     _textIndexColumns = new HashSet<>(textIndexColumns);
     _dirty = true;
   }
 
-  @VisibleForTesting
-  public void addTextIndexColumns(String... textIndexColumns) {
-    _textIndexColumns.addAll(Arrays.asList(textIndexColumns));
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void removeTextIndexColumns(String... textIndexColumns) {
-    Arrays.asList(textIndexColumns).forEach(_textIndexColumns::remove);
-    _dirty = true;
-  }
-
+  @Deprecated
   @VisibleForTesting
   public void setFSTIndexColumns(Set<String> fstIndexColumns) {
     _fstIndexColumns = new HashSet<>(fstIndexColumns);
     _dirty = true;
   }
 
-  @VisibleForTesting
-  public void addFSTIndexColumns(String... fstIndexColumns) {
-    _fstIndexColumns.addAll(Arrays.asList(fstIndexColumns));
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void removeFSTIndexColumns(String... fstIndexColumns) {
-    Arrays.asList(fstIndexColumns).forEach(_fstIndexColumns::remove);
-    _dirty = true;
-  }
-
+  @Deprecated
   @VisibleForTesting
   public void setFSTIndexType(FSTType fstType) {
     _fstIndexType = fstType;
     _dirty = true;
   }
 
+  @Deprecated
   @VisibleForTesting
   public void setJsonIndexColumns(Set<String> jsonIndexColumns) {
     if (jsonIndexColumns != null) {
@@ -721,58 +567,20 @@ public class IndexLoadingConfig {
     _dirty = true;
   }
 
-  @VisibleForTesting
-  public void setMapIndexColumns(Map<String, MapIndexConfig> mapIndexConfigs) {
-    _mapIndexConfigs = new HashMap<>(mapIndexConfigs);
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void setH3IndexConfigs(Map<String, H3IndexConfig> h3IndexConfigs) {
-    _h3IndexConfigs = new HashMap<>(h3IndexConfigs);
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void setVectorIndexConfigs(Map<String, VectorIndexConfig> 
vectorIndexConfigs) {
-    _vectorIndexConfigs = new HashMap<>(vectorIndexConfigs);
-    _dirty = true;
-  }
-
+  @Deprecated
   @VisibleForTesting
   public void setBloomFilterConfigs(Map<String, BloomFilterConfig> 
bloomFilterConfigs) {
     _bloomFilterConfigs = new HashMap<>(bloomFilterConfigs);
     _dirty = true;
   }
 
+  @Deprecated
   @VisibleForTesting
   public void setOnHeapDictionaryColumns(Set<String> onHeapDictionaryColumns) {
     _onHeapDictionaryColumns = new HashSet<>(onHeapDictionaryColumns);
     _dirty = true;
   }
 
-  /**
-   * For tests only.
-   */
-  @VisibleForTesting
-  public void setForwardIndexDisabledColumns(Set<String> 
forwardIndexDisabledColumns) {
-    _forwardIndexDisabledColumns =
-        forwardIndexDisabledColumns == null ? new HashSet<>() : new 
HashSet<>(forwardIndexDisabledColumns);
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void addForwardIndexDisabledColumns(String... 
forwardIndexDisabledColumns) {
-    
_forwardIndexDisabledColumns.addAll(Arrays.asList(forwardIndexDisabledColumns));
-    _dirty = true;
-  }
-
-  @VisibleForTesting
-  public void removeForwardIndexDisabledColumns(String... 
forwardIndexDisabledColumns) {
-    
Arrays.asList(forwardIndexDisabledColumns).forEach(_forwardIndexDisabledColumns::remove);
-    _dirty = true;
-  }
-
   public Set<String> getNoDictionaryColumns() {
     return unmodifiable(_noDictionaryColumns);
   }
@@ -860,14 +668,6 @@ public class IndexLoadingConfig {
     return _segmentStoreURI;
   }
 
-  /**
-   * For tests only.
-   */
-  public void setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode 
columnMinMaxValueGeneratorMode) {
-    _columnMinMaxValueGeneratorMode = columnMinMaxValueGeneratorMode;
-    _dirty = true;
-  }
-
   public int getRealtimeAvgMultiValueCount() {
     return _realtimeAvgMultiValueCount;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexType.java
index 264340690d..c7e5bda113 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexType.java
@@ -26,8 +26,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import 
org.apache.pinot.segment.local.segment.index.readers.map.ImmutableMapIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
@@ -51,8 +49,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class MapIndexType extends AbstractIndexType<MapIndexConfig, 
MapIndexReader, MapIndexCreator>
-    implements ConfigurableFromIndexLoadingConfig<MapIndexConfig> {
+public class MapIndexType extends AbstractIndexType<MapIndexConfig, 
MapIndexReader, MapIndexCreator> {
   public static final String INDEX_DISPLAY_NAME = "map";
   private static final List<String> EXTENSIONS =
       Collections.singletonList(V1Constants.Indexes.MAP_INDEX_FILE_EXTENSION);
@@ -68,11 +65,6 @@ public class MapIndexType extends 
AbstractIndexType<MapIndexConfig, MapIndexRead
     return MapIndexConfig.class;
   }
 
-  @Override
-  public Map<String, MapIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig 
indexLoadingConfig) {
-    return indexLoadingConfig.getMapIndexConfigs();
-  }
-
   @Override
   public MapIndexConfig getDefaultConfig() {
     return MapIndexConfig.DISABLED;
@@ -90,12 +82,10 @@ public class MapIndexType extends 
AbstractIndexType<MapIndexConfig, MapIndexRead
         IndexConfigDeserializer.fromMap(tableConfig -> 
tableConfig.getIndexingConfig().getMapIndexConfigs());
     // reads tableConfig.indexingConfig.mapIndexColumns
     ColumnConfigDeserializer<MapIndexConfig> fromMapIndexCols =
-        IndexConfigDeserializer.fromCollection(
-            tableConfig -> 
tableConfig.getIndexingConfig().getMapIndexColumns(),
+        IndexConfigDeserializer.fromCollection(tableConfig -> 
tableConfig.getIndexingConfig().getMapIndexColumns(),
             (accum, column) -> accum.put(column, new MapIndexConfig()));
-    return IndexConfigDeserializer.fromIndexes(getPrettyName(), 
getIndexConfigClass())
-        .withExclusiveAlternative(
-            
IndexConfigDeserializer.ifIndexingConfig(fromMapIndexCols.withExclusiveAlternative(fromMapIndexConf)));
+    return IndexConfigDeserializer.fromIndexes(getPrettyName(), 
getIndexConfigClass()).withExclusiveAlternative(
+        
IndexConfigDeserializer.ifIndexingConfig(fromMapIndexCols.withExclusiveAlternative(fromMapIndexConf)));
   }
 
   @Override
@@ -152,8 +142,7 @@ public class MapIndexType extends 
AbstractIndexType<MapIndexConfig, MapIndexRead
         String className = 
indexConfig.getConfigs().get(MAP_INDEX_READER_CLASS_NAME).toString();
         Preconditions.checkNotNull(className, "MapIndexReader class name must 
be provided");
         try {
-          return (MapIndexReader) Class.forName(className)
-              .getConstructor(PinotDataBuffer.class, ColumnMetadata.class)
+          return (MapIndexReader) 
Class.forName(className).getConstructor(PinotDataBuffer.class, 
ColumnMetadata.class)
               .newInstance(dataBuffer, metadata);
         } catch (Exception e) {
           throw new RuntimeException("Failed to create MapIndexReader", e);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswVectorIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswVectorIndexReader.java
index a361b97855..693e488b7a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswVectorIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswVectorIndexReader.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteOrder;
+import java.util.Arrays;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -110,8 +111,8 @@ public class HnswVectorIndexReader implements 
VectorIndexReader {
       _indexSearcher.search(knnFloatVectorQuery, docIDCollector);
       return docIds;
     } catch (Exception e) {
-      String msg =
-          "Caught exception while searching the HNSW index for column:" + 
_column + ", search query:" + searchQuery;
+      String msg = "Caught exception while searching the HNSW index for 
column: " + _column + ", search query: "
+          + Arrays.toString(searchQuery);
       throw new RuntimeException(msg, e);
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
index cb228b81aa..4188b6821d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java
@@ -21,14 +21,11 @@ package org.apache.pinot.segment.local.segment.index.vector;
 import com.clearspring.analytics.util.Preconditions;
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.realtime.impl.vector.MutableVectorIndex;
 import 
org.apache.pinot.segment.local.segment.creator.impl.vector.HnswVectorIndexCreator;
-import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.VectorIndexHandler;
 import 
org.apache.pinot.segment.local.segment.index.readers.vector.HnswVectorIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -48,6 +45,7 @@ import 
org.apache.pinot.segment.spi.index.mutable.MutableIndex;
 import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
 import org.apache.pinot.segment.spi.index.reader.VectorIndexReader;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -58,8 +56,7 @@ import org.apache.pinot.spi.data.Schema;
  * Currently only supports for float array columns and the supported vector 
index type is: HNSW.
  *
  */
-public class VectorIndexType extends AbstractIndexType<VectorIndexConfig, 
VectorIndexReader, VectorIndexCreator>
-    implements ConfigurableFromIndexLoadingConfig<VectorIndexConfig> {
+public class VectorIndexType extends AbstractIndexType<VectorIndexConfig, 
VectorIndexReader, VectorIndexCreator> {
   public static final String INDEX_DISPLAY_NAME = "vector";
 
   protected VectorIndexType() {
@@ -71,11 +68,6 @@ public class VectorIndexType extends 
AbstractIndexType<VectorIndexConfig, Vector
     return VectorIndexConfig.class;
   }
 
-  @Override
-  public Map<String, VectorIndexConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
-    return indexLoadingConfig.getVectorIndexConfigs();
-  }
-
   @Override
   public VectorIndexConfig getDefaultConfig() {
     return VectorIndexConfig.DISABLED;
@@ -88,32 +80,20 @@ public class VectorIndexType extends 
AbstractIndexType<VectorIndexConfig, Vector
 
   @Override
   public ColumnConfigDeserializer<VectorIndexConfig> createDeserializer() {
-
-    // reads tableConfig.indexingConfig.jsonIndexColumns
-    ColumnConfigDeserializer<VectorIndexConfig> fromVectorIndexCols =
-        IndexConfigDeserializer.fromCollection(
-            tableConfig -> 
tableConfig.getIndexingConfig().getVectorIndexColumns(),
-            (accum, column) -> accum.put(column, new VectorIndexConfig(new 
HashMap<>())));
-
-    return IndexConfigDeserializer.fromIndexes(getPrettyName(), 
getIndexConfigClass())
-        .withExclusiveAlternative(
-            IndexConfigDeserializer.ifIndexingConfig(fromVectorIndexCols));
+    return IndexConfigDeserializer.fromIndexes(getPrettyName(), 
getIndexConfigClass()).withExclusiveAlternative(
+        IndexConfigDeserializer.fromIndexTypes(FieldConfig.IndexType.VECTOR,
+            (tableConfig, fieldConfig) -> new 
VectorIndexConfig(fieldConfig.getProperties())));
   }
 
   @Override
   public VectorIndexCreator createIndexCreator(IndexCreationContext context, 
VectorIndexConfig indexConfig)
       throws IOException {
-    Preconditions.checkState(context.getFieldSpec().getDataType() == 
FieldSpec.DataType.FLOAT
-            && !context.getFieldSpec().isSingleValueField(),
-        "Vector index is currently only supported on float array columns");
-
-    switch (IndexType.valueOf(indexConfig.getVectorIndexType())) {
-      case HNSW:
-        return new HnswVectorIndexCreator(context.getFieldSpec().getName(), 
context.getIndexDir(), indexConfig);
-      // TODO: Support more vector index types.
-      default:
-        throw new UnsupportedOperationException("Unsupported vector index 
type: " + indexConfig.getVectorIndexType());
-    }
+    Preconditions.checkState(context.getFieldSpec().getDataType() == 
FieldSpec.DataType.FLOAT && !context.getFieldSpec()
+        .isSingleValueField(), "Vector index is currently only supported on 
float array columns");
+    // TODO: Support more vector index types.
+    Preconditions.checkState("HNSW".equals(indexConfig.getVectorIndexType()),
+        "Unsupported vector index type: %s, only 'HNSW' is support", 
indexConfig.getVectorIndexType());
+    return new HnswVectorIndexCreator(context.getFieldSpec().getName(), 
context.getIndexDir(), indexConfig);
   }
 
   @Override
@@ -143,7 +123,7 @@ public class VectorIndexType extends 
AbstractIndexType<VectorIndexConfig, Vector
     @Override
     public VectorIndexReader createIndexReader(SegmentDirectory.Reader 
segmentReader,
         FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata)
-        throws IOException, IndexReaderConstraintException {
+        throws IndexReaderConstraintException {
       if (metadata.getDataType() != FieldSpec.DataType.FLOAT || 
metadata.getFieldSpec().isSingleValueField()) {
         throw new IndexReaderConstraintException(metadata.getColumnName(), 
StandardIndexes.vector(),
             "HNSW Vector index is currently only supported on float array type 
columns");
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
index 78968106e8..a0159e2b13 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
@@ -389,6 +389,14 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
       }
       return columns;
     }
+    if (type == StandardIndexes.vector()) {
+      for (String column : _segmentMetadata.getAllColumns()) {
+        if (VectorIndexUtils.hasVectorIndex(_segmentDirectory, column)) {
+          columns.add(column);
+        }
+      }
+      return columns;
+    }
     for (IndexKey indexKey : _columnEntries.keySet()) {
       if (indexKey._type == type) {
         columns.add(indexKey._name);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index 98dff135d5..9c4950b8b9 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -18,60 +18,49 @@
  */
 package org.apache.pinot.segment.local.segment.index.loader;
 
-import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.net.URL;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
-import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import 
org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter;
-import 
org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
-import org.apache.pinot.segment.spi.index.FstIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
-import org.apache.pinot.segment.spi.index.TextIndexConfig;
-import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
 import org.apache.pinot.spi.utils.ReadMode;
-import org.testng.Assert;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import org.testng.collections.Lists;
 
 import static 
org.apache.pinot.segment.spi.V1Constants.Indexes.LUCENE_V99_FST_INDEX_FILE_EXTENSION;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class LoaderTest {
   private static final File INDEX_DIR = new File(LoaderTest.class.getName());
+  private static final String RAW_TABLE_NAME = "testTable";
   private static final String AVRO_DATA = "data/test_data-mv.avro";
   private static final String VECTOR_AVRO_DATA = "data/test_vector_data.avro";
 
@@ -83,13 +72,10 @@ public class LoaderTest {
   private static final int VECTOR_DIM_SIZE = 512;
 
   private File _avroFile;
-
   private File _vectorAvroFile;
-  private File _indexDir;
   private IndexLoadingConfig _v1IndexLoadingConfig;
   private IndexLoadingConfig _v3IndexLoadingConfig;
-  private SegmentDirectoryLoader _localSegmentDirectoryLoader;
-  private PinotConfiguration _pinotConfiguration;
+  private File _indexDir;
 
   @BeforeClass
   public void setUp()
@@ -97,47 +83,51 @@ public class LoaderTest {
     FileUtils.deleteQuietly(INDEX_DIR);
 
     URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA);
-    Assert.assertNotNull(resourceUrl);
+    assertNotNull(resourceUrl);
     _avroFile = new File(resourceUrl.getFile());
-    _vectorAvroFile = new 
File(getClass().getClassLoader().getResource(VECTOR_AVRO_DATA).getFile());
-    Map<String, Object> props = new HashMap<>();
-    props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.heap.toString());
-    _pinotConfiguration = new PinotConfiguration(props);
-
-    _v1IndexLoadingConfig = new IndexLoadingConfig();
-    _v1IndexLoadingConfig.setReadMode(ReadMode.mmap);
-    _v1IndexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
-    _v1IndexLoadingConfig.setVectorIndexConfigs(new HashMap<>());
-
-    _v3IndexLoadingConfig = new IndexLoadingConfig();
-    _v3IndexLoadingConfig.setReadMode(ReadMode.mmap);
-    _v3IndexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    _v3IndexLoadingConfig.setVectorIndexConfigs(new HashMap<>());
-
-    _localSegmentDirectoryLoader = 
SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader();
+    resourceUrl = getClass().getClassLoader().getResource(VECTOR_AVRO_DATA);
+    assertNotNull(resourceUrl);
+    _vectorAvroFile = new File(resourceUrl.getFile());
+
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSegmentVersion("v1").build();
+    Schema schema = createSchema();
+    _v1IndexLoadingConfig = new IndexLoadingConfig(tableConfig, schema);
+
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSegmentVersion("v3").build();
+    _v3IndexLoadingConfig = new IndexLoadingConfig(tableConfig, schema);
+  }
+
+  private Schema createSchema()
+      throws Exception {
+    return SegmentTestUtils.extractSchemaFromAvroWithoutTime(_avroFile);
   }
 
   private Schema constructV1Segment()
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
 
-    SegmentGeneratorConfig segmentGeneratorConfig =
-        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, "testTable");
-    segmentGeneratorConfig.setSegmentVersion(SegmentVersion.v1);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(segmentGeneratorConfig);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setSegmentVersion("v1").build();
+    Schema schema = createSchema();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setInputFilePath(_avroFile.getAbsolutePath());
+    config.setOutDir(INDEX_DIR.getAbsolutePath());
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config);
     driver.build();
 
     _indexDir = new File(INDEX_DIR, driver.getSegmentName());
-    return segmentGeneratorConfig.getSchema();
+    return schema;
   }
 
   @Test
   public void testLoad()
       throws Exception {
     constructV1Segment();
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
 
     testConversion();
   }
@@ -152,9 +142,9 @@ public class LoaderTest {
     constructV1Segment();
 
     File v3TempDir = new 
SegmentV1V2ToV3FormatConverter().v3ConversionTempDirectory(_indexDir);
-    Assert.assertTrue(v3TempDir.isDirectory());
+    assertTrue(v3TempDir.isDirectory());
     testConversion();
-    Assert.assertFalse(v3TempDir.exists());
+    assertFalse(v3TempDir.exists());
   }
 
   @Test
@@ -164,45 +154,43 @@ public class LoaderTest {
 
     // The newly generated segment is consistent with table config and schema, 
thus
     // in follow checks, whether it needs reprocess or not depends on segment 
format.
-    SegmentDirectory segmentDir = 
_localSegmentDirectoryLoader.load(_indexDir.toURI(),
-        new 
SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_pinotConfiguration).build());
-
-    // The segmentVersionToLoad is null, not leading to reprocess.
-    assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, new 
IndexLoadingConfig(), null));
-
-    // The segmentVersionToLoad is v1, not leading to reprocess.
-    assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, 
_v1IndexLoadingConfig, null));
-
-    // The segmentVersionToLoad is v3, leading to reprocess.
-    assertTrue(ImmutableSegmentLoader.needPreprocess(segmentDir, 
_v3IndexLoadingConfig, null));
+    try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(_indexDir, ReadMode.mmap)) {
+      // The segmentVersionToLoad is null, not leading to reprocess.
+      assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDirectory, new 
IndexLoadingConfig(), null));
+      // The segmentVersionToLoad is v1, not leading to reprocess.
+      assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDirectory, 
_v1IndexLoadingConfig, null));
+      // The segmentVersionToLoad is v3, leading to reprocess.
+      assertTrue(ImmutableSegmentLoader.needPreprocess(segmentDirectory, 
_v3IndexLoadingConfig, null));
+    }
 
     // The segment is in v3 format now, not leading to reprocess.
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(_indexDir, 
_v3IndexLoadingConfig);
-    // Need to reset `segmentDir` to point to the correct index directory 
after the above load since the path changes
-    segmentDir = 
_localSegmentDirectoryLoader.load(immutableSegment.getSegmentMetadata().getIndexDir().toURI(),
-        new 
SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_pinotConfiguration).build());
-    segmentDir.reloadMetadata();
-    assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, 
_v3IndexLoadingConfig, null));
+    ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig);
+
+    // Need to reset `segmentDirectory` to point to the correct index 
directory after the above load since the path
+    // changes
+    try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(_indexDir, ReadMode.mmap)) {
+      assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDirectory, 
_v3IndexLoadingConfig, null));
+    }
   }
 
   private void testConversion()
       throws Exception {
     // Do not set segment version, should not convert the segment
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
ReadMode.mmap);
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     indexSegment.destroy();
 
     // Set segment version to v1, should not convert the segment
     indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v1IndexLoadingConfig);
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     indexSegment.destroy();
 
     // Set segment version to v3, should convert the segment to v3
     indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v3IndexLoadingConfig);
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     indexSegment.destroy();
   }
 
@@ -221,11 +209,11 @@ public class LoaderTest {
   }
 
   private void testBuiltInVirtualColumns(IndexSegment indexSegment) {
-    Assert.assertTrue(indexSegment.getColumnNames().containsAll(
+    assertTrue(indexSegment.getColumnNames().containsAll(
         Arrays.asList(BuiltInVirtualColumn.DOCID, 
BuiltInVirtualColumn.HOSTNAME, BuiltInVirtualColumn.SEGMENTNAME)));
-    
Assert.assertNotNull(indexSegment.getDataSource(BuiltInVirtualColumn.DOCID));
-    
Assert.assertNotNull(indexSegment.getDataSource(BuiltInVirtualColumn.HOSTNAME));
-    
Assert.assertNotNull(indexSegment.getDataSource(BuiltInVirtualColumn.SEGMENTNAME));
+    assertNotNull(indexSegment.getDataSource(BuiltInVirtualColumn.DOCID));
+    assertNotNull(indexSegment.getDataSource(BuiltInVirtualColumn.HOSTNAME));
+    
assertNotNull(indexSegment.getDataSource(BuiltInVirtualColumn.SEGMENTNAME));
   }
 
   /**
@@ -239,13 +227,13 @@ public class LoaderTest {
     schema.addField(new DimensionFieldSpec("MVString", 
FieldSpec.DataType.STRING, false, ""));
 
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v1IndexLoadingConfig, schema);
-    
Assert.assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0),
 "");
-    
Assert.assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0),
 "");
+    
assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
+    
assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
     indexSegment.destroy();
 
     indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v3IndexLoadingConfig, schema);
-    
Assert.assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0),
 "");
-    
Assert.assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0),
 "");
+    
assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
+    
assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
     indexSegment.destroy();
   }
 
@@ -259,8 +247,7 @@ public class LoaderTest {
     FieldSpec byteMetric = new MetricFieldSpec(newColumnName, 
FieldSpec.DataType.BYTES, defaultValue);
     schema.addField(byteMetric);
     IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
_v3IndexLoadingConfig, schema);
-    Assert.assertEquals(
-        BytesUtils.toHexString((byte[]) 
indexSegment.getDataSource(newColumnName).getDictionary().get(0)),
+    assertEquals(BytesUtils.toHexString((byte[]) 
indexSegment.getDataSource(newColumnName).getDictionary().get(0)),
         defaultValue);
     indexSegment.destroy();
   }
@@ -268,62 +255,72 @@ public class LoaderTest {
   private void constructSegmentWithFSTIndex(SegmentVersion segmentVersion)
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
-    SegmentGeneratorConfig segmentGeneratorConfig =
-        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, "testTable");
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    List<String> fstIndexCreationColumns = 
Lists.newArrayList(FST_INDEX_COL_NAME);
-    FstIndexConfig fstConfig = new 
FstIndexConfig(segmentGeneratorConfig.getFSTIndexType());
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.fst(), fstConfig, 
fstIndexCreationColumns);
-    segmentGeneratorConfig.setSegmentVersion(segmentVersion);
-    driver.init(segmentGeneratorConfig);
+
+    TableConfig tableConfig = createTableConfigWithFSTIndex(segmentVersion);
+    Schema schema = createSchema();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setInputFilePath(_avroFile.getAbsolutePath());
+    config.setOutDir(INDEX_DIR.getAbsolutePath());
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config);
     driver.build();
 
     _indexDir = new File(INDEX_DIR, driver.getSegmentName());
   }
 
+  private TableConfig createTableConfigWithFSTIndex(@Nullable SegmentVersion 
segmentVersion) {
+    FieldConfig fieldConfig =
+        new FieldConfig(FST_INDEX_COL_NAME, 
FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.FST),
+            null, null);
+    TableConfigBuilder builder =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setFieldConfigList(List.of(fieldConfig));
+    if (segmentVersion != null) {
+      builder.setSegmentVersion(segmentVersion.toString());
+    }
+    return builder.build();
+  }
+
   @Test
   public void testFSTIndexLoad()
       throws Exception {
     constructSegmentWithFSTIndex(SegmentVersion.v3);
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
     File fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, 
FST_INDEX_COL_NAME);
-    Assert.assertNull(fstIndexFile);
+    assertNull(fstIndexFile);
 
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setFSTIndexColumns(new 
HashSet<>(Arrays.asList(FST_INDEX_COL_NAME)));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
indexLoadingConfig);
+    TableConfig tableConfig = createTableConfigWithFSTIndex(null);
+    Schema schema = createSchema();
+    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
-
-    SegmentDirectory segmentDir = 
_localSegmentDirectoryLoader.load(_indexDir.toURI(),
-        new 
SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_pinotConfiguration).build());
-    SegmentDirectory.Reader reader = segmentDir.createReader();
-    Assert.assertNotNull(reader);
-    Assert.assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, 
StandardIndexes.fst()));
     indexSegment.destroy();
+    try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(_indexDir, ReadMode.mmap);
+        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+      assertNotNull(reader);
+      assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, 
StandardIndexes.fst()));
+    }
 
     // CASE 2: set the segment version to load in IndexLoadingConfig as V3
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on disk (V3)
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithFSTIndex(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
-    segmentDir = _localSegmentDirectoryLoader.load(_indexDir.toURI(),
-        new 
SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_pinotConfiguration).build());
-    reader = segmentDir.createReader();
-    Assert.assertNotNull(reader);
-    Assert.assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, 
StandardIndexes.fst()));
     indexSegment.destroy();
+    try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(_indexDir, ReadMode.mmap);
+        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+      assertNotNull(reader);
+      assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, 
StandardIndexes.fst()));
+    }
 
     // Test for scenarios by creating on-disk segment in V1 and then loading
     // the segment with and without specifying segmentVersion in 
IndexLoadingConfig
@@ -333,92 +330,105 @@ public class LoaderTest {
     constructSegmentWithFSTIndex(SegmentVersion.v1);
 
     // check that segment on-disk version is V1 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
     // check that segment v1 dir exists
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
     // check that v3 index sub-dir does not exist
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, 
FST_INDEX_COL_NAME);
-    Assert.assertNotNull(fstIndexFile);
-    Assert.assertFalse(fstIndexFile.isDirectory());
-    Assert.assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + 
LUCENE_V99_FST_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(fstIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertNotNull(fstIndexFile);
+    assertFalse(fstIndexFile.isDirectory());
+    assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + 
LUCENE_V99_FST_INDEX_FILE_EXTENSION);
+    assertEquals(fstIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
     indexSegment.destroy();
 
     // CASE 1: don't set the segment version to load in IndexLoadingConfig
     // there should be no conversion done by ImmutableSegmentLoader and it 
should
     // be able to create fst index reader with on-disk version V1
-    indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setFSTIndexColumns(new 
HashSet<>(Arrays.asList(FST_INDEX_COL_NAME)));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithFSTIndex(null);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, 
FST_INDEX_COL_NAME);
-    Assert.assertNotNull(fstIndexFile);
-    Assert.assertFalse(fstIndexFile.isDirectory());
-    Assert.assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + 
LUCENE_V99_FST_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(fstIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertNotNull(fstIndexFile);
+    assertFalse(fstIndexFile.isDirectory());
+    assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + 
LUCENE_V99_FST_INDEX_FILE_EXTENSION);
+    assertEquals(fstIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
     indexSegment.destroy();
 
     // CASE 2: set the segment version to load in IndexLoadingConfig to V1
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on fisk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithFSTIndex(SegmentVersion.v1);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, 
FST_INDEX_COL_NAME);
-    Assert.assertNotNull(fstIndexFile);
-    Assert.assertFalse(fstIndexFile.isDirectory());
-    Assert.assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + 
LUCENE_V99_FST_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(fstIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertNotNull(fstIndexFile);
+    assertFalse(fstIndexFile.isDirectory());
+    assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + 
LUCENE_V99_FST_INDEX_FILE_EXTENSION);
+    assertEquals(fstIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
     indexSegment.destroy();
 
     // CASE 3: set the segment version to load in IndexLoadingConfig to V3
     // there should be conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is different than the version of segment on disk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithFSTIndex(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
     // the index dir should exist in v3 format due to conversion
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
     fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, 
FST_INDEX_COL_NAME);
-    Assert.assertNull(fstIndexFile);
-    segmentDir = _localSegmentDirectoryLoader.load(_indexDir.toURI(),
-        new 
SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_pinotConfiguration).build());
-    reader = segmentDir.createReader();
-    Assert.assertNotNull(reader);
-    Assert.assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, 
StandardIndexes.fst()));
+    assertNull(fstIndexFile);
     indexSegment.destroy();
+    try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(_indexDir, ReadMode.mmap);
+        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+      assertNotNull(reader);
+      assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, 
StandardIndexes.fst()));
+    }
   }
 
   private void constructSegmentWithForwardIndexDisabled(SegmentVersion 
segmentVersion, boolean enableInvertedIndex)
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
-    SegmentGeneratorConfig segmentGeneratorConfig =
-        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, "testTable");
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.forward(), 
ForwardIndexConfig.DISABLED,
-        NO_FORWARD_INDEX_COL_NAME);
-    if (enableInvertedIndex) {
-      segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), 
IndexConfig.ENABLED, NO_FORWARD_INDEX_COL_NAME);
-    }
-    segmentGeneratorConfig.setSegmentVersion(segmentVersion);
-    driver.init(segmentGeneratorConfig);
+
+    TableConfig tableConfig = 
createTableConfigWithForwardIndexDisabled(segmentVersion, enableInvertedIndex);
+    Schema schema = createSchema();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setInputFilePath(_avroFile.getAbsolutePath());
+    config.setOutDir(INDEX_DIR.getAbsolutePath());
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config);
     driver.build();
 
     _indexDir = new File(INDEX_DIR, driver.getSegmentName());
   }
 
+  private TableConfig createTableConfigWithForwardIndexDisabled(@Nullable 
SegmentVersion segmentVersion,
+      boolean enableInvertedIndex) {
+    FieldConfig fieldConfig =
+        new FieldConfig(NO_FORWARD_INDEX_COL_NAME, 
FieldConfig.EncodingType.DICTIONARY, List.of(), null,
+            Map.of(FieldConfig.FORWARD_INDEX_DISABLED, "true"));
+    TableConfigBuilder builder =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setFieldConfigList(List.of(fieldConfig));
+    if (segmentVersion != null) {
+      builder.setSegmentVersion(segmentVersion.toString());
+    }
+    if (enableInvertedIndex) {
+      builder.setInvertedIndexColumns(List.of(NO_FORWARD_INDEX_COL_NAME))
+          .setCreateInvertedIndexDuringSegmentGeneration(true);
+    }
+    return builder.build();
+  }
+
   @Test
   public void testForwardIndexDisabledLoad()
       throws Exception {
@@ -430,29 +440,24 @@ public class LoaderTest {
     constructSegmentWithForwardIndexDisabled(SegmentVersion.v3, true);
 
     // check that segment on-disk version is V3 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
     // check that V3 index sub-dir exists
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
 
     // CASE 1: don't set the segment version to load in IndexLoadingConfig
     // there should be no conversion done by ImmutableSegmentLoader and it 
should
     // be able to create all index readers with on-disk version V3
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-
-    Set<String> forwardIndexDisabledColumns = new HashSet<>();
-    forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
-    
indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    ImmutableSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
indexLoadingConfig);
+    TableConfig tableConfig = createTableConfigWithForwardIndexDisabled(null, 
false);
+    Schema schema = createSchema();
+    ImmutableSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
-    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
-    
Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
-        .hasDictionary());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    
assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME).hasDictionary());
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     indexSegment.destroy();
@@ -460,15 +465,14 @@ public class LoaderTest {
     // CASE 2: set the segment version to load in IndexLoadingConfig as V3
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on disk (V3)
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithForwardIndexDisabled(SegmentVersion.v3, 
false);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
-    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
-    
Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
-        .hasDictionary());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    
assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME).hasDictionary());
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     indexSegment.destroy();
@@ -481,58 +485,51 @@ public class LoaderTest {
     constructSegmentWithForwardIndexDisabled(SegmentVersion.v1, true);
 
     // check that segment on-disk version is V1 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
     // check that segment v1 dir exists
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
     // check that v3 index sub-dir does not exist
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
 
     // CASE 1: don't set the segment version to load in IndexLoadingConfig
     // there should be no conversion done by ImmutableSegmentLoader and it 
should
     // be able to create all index readers with on-disk version V1
-    indexLoadingConfig = new IndexLoadingConfig();
-    forwardIndexDisabledColumns = new HashSet<>();
-    forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
-    
indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithForwardIndexDisabled(null, false);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
-    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
-    
Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
-        .hasDictionary());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    
assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME).hasDictionary());
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     indexSegment.destroy();
 
     // CASE 2: set the segment version to load in IndexLoadingConfig to V1
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on fisk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithForwardIndexDisabled(SegmentVersion.v1, 
false);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
-    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
-    
Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
-        .hasDictionary());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    
assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME).hasDictionary());
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     indexSegment.destroy();
 
     // CASE 3: set the segment version to load in IndexLoadingConfig to V3
     // there should be conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is different than the version of segment on disk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithForwardIndexDisabled(SegmentVersion.v3, 
false);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
-    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
-    
Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
-        .hasDictionary());
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    
assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME).hasDictionary());
     // the index dir should exist in v3 format due to conversion
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
     indexSegment.destroy();
 
@@ -542,33 +539,44 @@ public class LoaderTest {
     try {
       constructSegmentWithForwardIndexDisabled(SegmentVersion.v3, false);
     } catch (IllegalStateException e) {
-      Assert.fail("Disabling forward index without enabling inverted index is 
allowed now");
+      fail("Disabling forward index without enabling inverted index is allowed 
now");
     }
 
     try {
       constructSegmentWithForwardIndexDisabled(SegmentVersion.v1, false);
     } catch (IllegalStateException e) {
-      Assert.fail("Disabling forward index without enabling inverted index is 
allowed now");
+      fail("Disabling forward index without enabling inverted index is allowed 
now");
     }
   }
 
   private void constructSegmentWithTextIndex(SegmentVersion segmentVersion)
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
-    SegmentGeneratorConfig segmentGeneratorConfig =
-        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, "testTable");
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    List<String> rawIndexCreationColumns = 
Lists.newArrayList(TEXT_INDEX_COL_NAME);
-    TextIndexConfig textIndexConfig = new 
TextIndexConfigBuilder(segmentGeneratorConfig.getFSTIndexType()).build();
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.text(), textIndexConfig, 
TEXT_INDEX_COL_NAME);
-    segmentGeneratorConfig.setRawIndexCreationColumns(rawIndexCreationColumns);
-    segmentGeneratorConfig.setSegmentVersion(segmentVersion);
-    driver.init(segmentGeneratorConfig);
+
+    TableConfig tableConfig = createTableConfigWithTextIndex(segmentVersion);
+    Schema schema = createSchema();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setInputFilePath(_avroFile.getAbsolutePath());
+    config.setOutDir(INDEX_DIR.getAbsolutePath());
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config);
     driver.build();
 
     _indexDir = new File(INDEX_DIR, driver.getSegmentName());
   }
 
+  private TableConfig createTableConfigWithTextIndex(@Nullable SegmentVersion 
segmentVersion) {
+    FieldConfig fieldConfig =
+        new FieldConfig(TEXT_INDEX_COL_NAME, 
FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.TEXT),
+            null, null);
+    TableConfigBuilder builder =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setFieldConfigList(List.of(fieldConfig));
+    if (segmentVersion != null) {
+      builder.setSegmentVersion(segmentVersion.toString());
+    }
+    return builder.build();
+  }
+
   @Test
   public void testTextIndexLoad()
       throws Exception {
@@ -580,30 +588,29 @@ public class LoaderTest {
     constructSegmentWithTextIndex(SegmentVersion.v3);
 
     // check that segment on-disk version is V3 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
     // check that V3 index sub-dir exists
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     // check that text index exists under V3 subdir.
     File textIndexFile = 
SegmentDirectoryPaths.findTextIndexIndexFile(_indexDir, TEXT_INDEX_COL_NAME);
-    Assert.assertNotNull(textIndexFile);
-    Assert.assertTrue(textIndexFile.isDirectory());
-    Assert.assertEquals(textIndexFile.getName(),
+    assertNotNull(textIndexFile);
+    assertTrue(textIndexFile.isDirectory());
+    assertEquals(textIndexFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
 
     // CASE 1: don't set the segment version to load in IndexLoadingConfig
     // there should be no conversion done by ImmutableSegmentLoader and it 
should
     // be able to create text index reader with on-disk version V3
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTextIndexColumns(new 
HashSet<>(Arrays.asList(TEXT_INDEX_COL_NAME)));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
indexLoadingConfig);
+    TableConfig tableConfig = createTableConfigWithTextIndex(null);
+    Schema schema = createSchema();
+    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     // no change/conversion should have happened for textIndex dir
@@ -611,45 +618,43 @@ public class LoaderTest {
     // segment load should have created the docID mapping file in V3 structure
     File textIndexDocIdMappingFile =
         SegmentDirectoryPaths.findTextIndexDocIdMappingFile(_indexDir, 
TEXT_INDEX_COL_NAME);
-    Assert.assertNotNull(textIndexFile);
-    Assert.assertNotNull(textIndexDocIdMappingFile);
-    Assert.assertTrue(textIndexFile.isDirectory());
-    Assert.assertFalse(textIndexDocIdMappingFile.isDirectory());
-    Assert.assertEquals(textIndexFile.getName(),
+    assertNotNull(textIndexFile);
+    assertNotNull(textIndexDocIdMappingFile);
+    assertTrue(textIndexFile.isDirectory());
+    assertFalse(textIndexDocIdMappingFile.isDirectory());
+    assertEquals(textIndexFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
-    Assert.assertEquals(textIndexDocIdMappingFile.getName(),
+    assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(textIndexDocIdMappingFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
-    Assert.assertEquals(textIndexDocIdMappingFile.getParentFile().getName(),
-        SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(textIndexDocIdMappingFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
     indexSegment.destroy();
 
     // CASE 2: set the segment version to load in IndexLoadingConfig as V3
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on disk (V3)
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithTextIndex(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     // no change/conversion should have happened for textIndex dir
     textIndexFile = SegmentDirectoryPaths.findTextIndexIndexFile(_indexDir, 
TEXT_INDEX_COL_NAME);
     // segment load should have created the docID mapping file in V3 structure
     textIndexDocIdMappingFile = 
SegmentDirectoryPaths.findTextIndexDocIdMappingFile(_indexDir, 
TEXT_INDEX_COL_NAME);
-    Assert.assertNotNull(textIndexFile);
-    Assert.assertNotNull(textIndexDocIdMappingFile);
-    Assert.assertTrue(textIndexFile.isDirectory());
-    Assert.assertFalse(textIndexDocIdMappingFile.isDirectory());
-    Assert.assertEquals(textIndexFile.getName(),
+    assertNotNull(textIndexFile);
+    assertNotNull(textIndexDocIdMappingFile);
+    assertTrue(textIndexFile.isDirectory());
+    assertFalse(textIndexDocIdMappingFile.isDirectory());
+    assertEquals(textIndexFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
-    Assert.assertEquals(textIndexDocIdMappingFile.getName(),
+    assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(textIndexDocIdMappingFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
-    Assert.assertEquals(textIndexDocIdMappingFile.getParentFile().getName(),
-        SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(textIndexDocIdMappingFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
     indexSegment.destroy();
 
     // Test for scenarios by creating on-disk segment in V1 and then loading
@@ -660,101 +665,129 @@ public class LoaderTest {
     constructSegmentWithTextIndex(SegmentVersion.v1);
 
     // check that segment on-disk version is V1 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
     // check that segment v1 dir exists
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
     // check that v3 index sub-dir does not exist
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that text index exists directly under indexDir (V1). it should 
exist and should be a subdir
     textIndexFile = SegmentDirectoryPaths.findTextIndexIndexFile(_indexDir, 
TEXT_INDEX_COL_NAME);
-    Assert.assertNotNull(textIndexFile);
-    Assert.assertTrue(textIndexFile.isDirectory());
-    Assert.assertFalse(textIndexDocIdMappingFile.isDirectory());
-    Assert.assertEquals(textIndexFile.getName(),
+    assertNotNull(textIndexFile);
+    assertTrue(textIndexFile.isDirectory());
+    assertFalse(textIndexDocIdMappingFile.isDirectory());
+    assertEquals(textIndexFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(textIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(textIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
 
     // CASE 1: don't set the segment version to load in IndexLoadingConfig
     // there should be no conversion done by ImmutableSegmentLoader and it 
should
     // be able to create text index reader with on-disk version V1
-    indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTextIndexColumns(new 
HashSet<>(Arrays.asList(TEXT_INDEX_COL_NAME)));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithTextIndex(null);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // no change/conversion should have happened in text index Dir
     textIndexFile = SegmentDirectoryPaths.findTextIndexIndexFile(_indexDir, 
TEXT_INDEX_COL_NAME);
     // segment load should have created the docID mapping file in V1 structure
     textIndexDocIdMappingFile = 
SegmentDirectoryPaths.findTextIndexDocIdMappingFile(_indexDir, 
TEXT_INDEX_COL_NAME);
-    Assert.assertNotNull(textIndexFile);
-    Assert.assertNotNull(textIndexDocIdMappingFile);
-    Assert.assertTrue(textIndexFile.isDirectory());
-    Assert.assertEquals(textIndexFile.getName(),
+    assertNotNull(textIndexFile);
+    assertNotNull(textIndexDocIdMappingFile);
+    assertTrue(textIndexFile.isDirectory());
+    assertEquals(textIndexFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(textIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
-    Assert.assertEquals(textIndexDocIdMappingFile.getName(),
+    assertEquals(textIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(textIndexDocIdMappingFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
-    Assert.assertEquals(textIndexDocIdMappingFile.getParentFile().getName(),
-        new SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(textIndexDocIdMappingFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
     indexSegment.destroy();
 
     // CASE 2: set the segment version to load in IndexLoadingConfig to V1
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on fisk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithTextIndex(SegmentVersion.v1);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // no change/conversion should have happened in text index Dir
     textIndexFile = SegmentDirectoryPaths.findTextIndexIndexFile(_indexDir, 
TEXT_INDEX_COL_NAME);
     // segment load should have created the docID mapping file in V1 structure
     textIndexDocIdMappingFile = 
SegmentDirectoryPaths.findTextIndexDocIdMappingFile(_indexDir, 
TEXT_INDEX_COL_NAME);
-    Assert.assertNotNull(textIndexFile);
-    Assert.assertNotNull(textIndexDocIdMappingFile);
-    Assert.assertTrue(textIndexFile.isDirectory());
-    Assert.assertEquals(textIndexFile.getName(),
+    assertNotNull(textIndexFile);
+    assertNotNull(textIndexDocIdMappingFile);
+    assertTrue(textIndexFile.isDirectory());
+    assertEquals(textIndexFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(textIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
-    Assert.assertEquals(textIndexDocIdMappingFile.getName(),
+    assertEquals(textIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(textIndexDocIdMappingFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
-    Assert.assertEquals(textIndexDocIdMappingFile.getParentFile().getName(),
-        new SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(textIndexDocIdMappingFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
     indexSegment.destroy();
 
     // CASE 3: set the segment version to load in IndexLoadingConfig to V3
     // there should be conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is different than the version of segment on disk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithTextIndex(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
     // the index dir should exist in v3 format due to conversion
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
     // check that text index exists under V3 subdir. It should exist and 
should be a subdir
     textIndexFile = SegmentDirectoryPaths.findTextIndexIndexFile(_indexDir, 
TEXT_INDEX_COL_NAME);
     // segment load should have created the docID mapping file in V3 structure
     textIndexDocIdMappingFile = 
SegmentDirectoryPaths.findTextIndexDocIdMappingFile(_indexDir, 
TEXT_INDEX_COL_NAME);
-    Assert.assertNotNull(textIndexFile);
-    Assert.assertNotNull(textIndexDocIdMappingFile);
-    Assert.assertTrue(textIndexFile.isDirectory());
-    Assert.assertEquals(textIndexFile.getName(),
+    assertNotNull(textIndexFile);
+    assertNotNull(textIndexDocIdMappingFile);
+    assertTrue(textIndexFile.isDirectory());
+    assertEquals(textIndexFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_V99_TEXT_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
-    Assert.assertEquals(textIndexDocIdMappingFile.getName(),
+    assertEquals(textIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(textIndexDocIdMappingFile.getName(),
         TEXT_INDEX_COL_NAME + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
-    Assert.assertEquals(textIndexDocIdMappingFile.getParentFile().getName(),
-        SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(textIndexDocIdMappingFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
     indexSegment.destroy();
   }
 
+  private void constructSegmentWithVectorIndex(SegmentVersion segmentVersion)
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    TableConfig tableConfig = createTableConfigWithVectorIndex(segmentVersion);
+    Schema schema = createVectorSchema();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setInputFilePath(_vectorAvroFile.getAbsolutePath());
+    config.setOutDir(INDEX_DIR.getAbsolutePath());
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config);
+    driver.build();
+
+    _indexDir = new File(INDEX_DIR, driver.getSegmentName());
+  }
+
+  private TableConfig createTableConfigWithVectorIndex(SegmentVersion 
segmentVersion) {
+    FieldConfig fieldConfig =
+        new FieldConfig(VECTOR_INDEX_COL_NAME, FieldConfig.EncodingType.RAW, 
List.of(FieldConfig.IndexType.VECTOR),
+            null, Map.of("vectorDimension", Integer.toString(VECTOR_DIM_SIZE), 
"vectorIndexType", "HNSW"));
+    TableConfigBuilder builder = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setNoDictionaryColumns(List.of(VECTOR_INDEX_COL_NAME)).setFieldConfigList(List.of(fieldConfig));
+    if (segmentVersion != null) {
+      builder.setSegmentVersion(segmentVersion.toString());
+    }
+    return builder.build();
+  }
+
+  private Schema createVectorSchema()
+      throws Exception {
+    return SegmentTestUtils.extractSchemaFromAvroWithoutTime(_vectorAvroFile);
+  }
+
   @Test
   public void testVectorIndexLoad()
       throws Exception {
@@ -766,63 +799,58 @@ public class LoaderTest {
     constructSegmentWithVectorIndex(SegmentVersion.v3);
 
     // check that segment on-disk version is V3 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v3);
     // check that V3 index sub-dir exists
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     // check that vector index exists under V3 subdir.
     File vectorIndexFile = 
SegmentDirectoryPaths.findVectorIndexIndexFile(_indexDir, 
VECTOR_INDEX_COL_NAME);
-    Assert.assertNotNull(vectorIndexFile);
-    Assert.assertTrue(vectorIndexFile.isDirectory());
-    Assert.assertEquals(vectorIndexFile.getName(),
+    assertNotNull(vectorIndexFile);
+    assertTrue(vectorIndexFile.isDirectory());
+    assertEquals(vectorIndexFile.getName(),
         VECTOR_INDEX_COL_NAME + 
V1Constants.Indexes.VECTOR_V99_HNSW_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
 
     // CASE 1: don't set the segment version to load in IndexLoadingConfig
     // there should be no conversion done by ImmutableSegmentLoader and it 
should
     // be able to create vector index reader with on-disk version V3
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setVectorIndexConfigs(
-        ImmutableMap.of(VECTOR_INDEX_COL_NAME, new 
VectorIndexConfig(ImmutableMap.of(
-            "vectorDimension", String.valueOf(VECTOR_DIM_SIZE),
-            "vectorIndexType", "HNSW"
-        ))));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, 
indexLoadingConfig);
+    TableConfig tableConfig = createTableConfigWithVectorIndex(null);
+    Schema schema = createVectorSchema();
+    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     // no change/conversion should have happened for vectorIndex dir
     vectorIndexFile = 
SegmentDirectoryPaths.findVectorIndexIndexFile(_indexDir, 
VECTOR_INDEX_COL_NAME);
-    Assert.assertNotNull(vectorIndexFile);
-    Assert.assertTrue(vectorIndexFile.isDirectory());
-    Assert.assertEquals(vectorIndexFile.getName(),
+    assertNotNull(vectorIndexFile);
+    assertTrue(vectorIndexFile.isDirectory());
+    assertEquals(vectorIndexFile.getName(),
         VECTOR_INDEX_COL_NAME + 
V1Constants.Indexes.VECTOR_V99_HNSW_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
     indexSegment.destroy();
 
     // CASE 2: set the segment version to load in IndexLoadingConfig as V3
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on disk (V3)
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithVectorIndex(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that index dir is not in V1 format (the only subdir it should 
have is V3)
     verifyIndexDirIsV3(_indexDir);
     // no change/conversion should have happened for vectorIndex dir
     vectorIndexFile = 
SegmentDirectoryPaths.findVectorIndexIndexFile(_indexDir, 
VECTOR_INDEX_COL_NAME);
-    Assert.assertNotNull(vectorIndexFile);
-    Assert.assertTrue(vectorIndexFile.isDirectory());
-    Assert.assertEquals(vectorIndexFile.getName(),
+    assertNotNull(vectorIndexFile);
+    assertTrue(vectorIndexFile.isDirectory());
+    assertEquals(vectorIndexFile.getName(),
         VECTOR_INDEX_COL_NAME + 
V1Constants.Indexes.VECTOR_V99_HNSW_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
     indexSegment.destroy();
 
     // Test for scenarios by creating on-disk segment in V1 and then loading
@@ -833,107 +861,82 @@ public class LoaderTest {
     constructSegmentWithVectorIndex(SegmentVersion.v1);
 
     // check that segment on-disk version is V1 after creation
-    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
+    assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), 
SegmentVersion.v1);
     // check that segment v1 dir exists
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
     // check that v3 index sub-dir does not exist
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // check that vector index exists directly under indexDir (V1). it should 
exist and should be a subdir
     vectorIndexFile = 
SegmentDirectoryPaths.findVectorIndexIndexFile(_indexDir, 
VECTOR_INDEX_COL_NAME);
-    Assert.assertNotNull(vectorIndexFile);
-    Assert.assertTrue(vectorIndexFile.isDirectory());
-    Assert.assertEquals(vectorIndexFile.getName(),
+    assertNotNull(vectorIndexFile);
+    assertTrue(vectorIndexFile.isDirectory());
+    assertEquals(vectorIndexFile.getName(),
         VECTOR_INDEX_COL_NAME + 
V1Constants.Indexes.VECTOR_V99_HNSW_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(vectorIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(vectorIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
 
     // CASE 1: don't set the segment version to load in IndexLoadingConfig
     // there should be no conversion done by ImmutableSegmentLoader and it 
should
     // be able to create vector index reader with on-disk version V1
-    indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setVectorIndexConfigs(
-        ImmutableMap.of(VECTOR_INDEX_COL_NAME, new 
VectorIndexConfig(ImmutableMap.of(
-            "vectorDimension", String.valueOf(VECTOR_DIM_SIZE),
-            "vectorIndexType", "HNSW"
-        ))));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithVectorIndex(null);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // no change/conversion should have happened in vector index Dir
     vectorIndexFile = 
SegmentDirectoryPaths.findVectorIndexIndexFile(_indexDir, 
VECTOR_INDEX_COL_NAME);
-    Assert.assertNotNull(vectorIndexFile);
-    Assert.assertTrue(vectorIndexFile.isDirectory());
-    Assert.assertEquals(vectorIndexFile.getName(),
+    assertNotNull(vectorIndexFile);
+    assertTrue(vectorIndexFile.isDirectory());
+    assertEquals(vectorIndexFile.getName(),
         VECTOR_INDEX_COL_NAME + 
V1Constants.Indexes.VECTOR_V99_HNSW_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(vectorIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(vectorIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
     indexSegment.destroy();
 
     // CASE 2: set the segment version to load in IndexLoadingConfig to V1
     // there should be no conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is same as the version of segment on fisk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithVectorIndex(SegmentVersion.v1);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v1
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v1);
     // no change/conversion should have happened in indexDir
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
-    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v1).exists());
+    assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     // no change/conversion should have happened in vector index Dir
     vectorIndexFile = 
SegmentDirectoryPaths.findVectorIndexIndexFile(_indexDir, 
VECTOR_INDEX_COL_NAME);
-    Assert.assertNotNull(vectorIndexFile);
-    Assert.assertTrue(vectorIndexFile.isDirectory());
-    Assert.assertEquals(vectorIndexFile.getName(),
+    assertNotNull(vectorIndexFile);
+    assertTrue(vectorIndexFile.isDirectory());
+    assertEquals(vectorIndexFile.getName(),
         VECTOR_INDEX_COL_NAME + 
V1Constants.Indexes.VECTOR_V99_HNSW_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(vectorIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
+    assertEquals(vectorIndexFile.getParentFile().getName(), new 
SegmentMetadataImpl(_indexDir).getName());
     indexSegment.destroy();
 
     // CASE 3: set the segment version to load in IndexLoadingConfig to V3
     // there should be conversion done by ImmutableSegmentLoader since the 
segmentVersionToLoad
     // is different than the version of segment on disk
-    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    tableConfig = createTableConfigWithVectorIndex(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, new 
IndexLoadingConfig(tableConfig, schema));
     // check that loaded segment version is v3
-    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
+    assertEquals(indexSegment.getSegmentMetadata().getVersion(), 
SegmentVersion.v3);
     // the index dir should exist in v3 format due to conversion
-    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
+    assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, 
SegmentVersion.v3).exists());
     verifyIndexDirIsV3(_indexDir);
     // check that vector index exists under V3 subdir. It should exist and 
should be a subdir
     vectorIndexFile = 
SegmentDirectoryPaths.findVectorIndexIndexFile(_indexDir, 
VECTOR_INDEX_COL_NAME);
-    Assert.assertNotNull(vectorIndexFile);
-    Assert.assertTrue(vectorIndexFile.isDirectory());
-    Assert.assertEquals(vectorIndexFile.getName(),
+    assertNotNull(vectorIndexFile);
+    assertTrue(vectorIndexFile.isDirectory());
+    assertEquals(vectorIndexFile.getName(),
         VECTOR_INDEX_COL_NAME + 
V1Constants.Indexes.VECTOR_V99_HNSW_INDEX_FILE_EXTENSION);
-    Assert.assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertEquals(vectorIndexFile.getParentFile().getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
     indexSegment.destroy();
   }
 
-  private void constructSegmentWithVectorIndex(SegmentVersion segmentVersion)
-      throws Exception {
-    FileUtils.deleteQuietly(INDEX_DIR);
-    SegmentGeneratorConfig segmentGeneratorConfig =
-        
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_vectorAvroFile, 
INDEX_DIR, "testTable");
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    List<String> rawIndexCreationColumns = 
Lists.newArrayList(VECTOR_INDEX_COL_NAME);
-    VectorIndexConfig vectorIndexConfig = new 
VectorIndexConfig(ImmutableMap.of(
-        "vectorDimension", String.valueOf(VECTOR_DIM_SIZE),
-        "vectorIndexType", "HNSW"
-    ));
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.vector(), 
vectorIndexConfig, VECTOR_INDEX_COL_NAME);
-    segmentGeneratorConfig.setRawIndexCreationColumns(rawIndexCreationColumns);
-    segmentGeneratorConfig.setSegmentVersion(segmentVersion);
-    driver.init(segmentGeneratorConfig);
-    driver.build();
-
-    _indexDir = new File(INDEX_DIR, driver.getSegmentName());
-  }
-
   private void verifyIndexDirIsV3(File indexDir) {
     File[] files = indexDir.listFiles();
-    Assert.assertEquals(files.length, 1);
-    Assert.assertEquals(files[0].getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
+    assertNotNull(files);
+    assertEquals(files.length, 1);
+    assertEquals(files[0].getName(), 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
   }
 
   @AfterClass
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FSTType.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FSTType.java
index 8f20390f01..74c72d914e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FSTType.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FSTType.java
@@ -22,5 +22,5 @@ package org.apache.pinot.spi.config.table;
  * Type of FST to be used
  */
 public enum FSTType {
-    LUCENE, NATIVE
+  LUCENE, NATIVE
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index e98e3c4c7d..1cfd610710 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -43,8 +43,6 @@ public class IndexingConfig extends BaseJsonConfig {
   private Map<String, JsonIndexConfig> _jsonIndexConfigs;
   private List<String> _mapIndexColumns;
   private Map<String, MapIndexConfig> _mapIndexConfigs;
-  private List<String> _h3IndexColumns;
-  private List<String> _vectorIndexColumns;
   private List<String> _sortedColumn;
   private List<String> _bloomFilterColumns;
   private Map<String, BloomFilterConfig> _bloomFilterConfigs;
@@ -52,7 +50,7 @@ public class IndexingConfig extends BaseJsonConfig {
   @Deprecated // Moved to {@link IngestionConfig#getStreamIngestionConfig}
   private Map<String, String> _streamConfigs;
   private String _segmentFormatVersion;
-  private FSTType _fstTypeForFSTIndex;
+  private FSTType _fstIndexType;
   private String _columnMinMaxValueGeneratorMode;
   private List<String> _noDictionaryColumns; // TODO: replace this with 
noDictionaryConfig.
   private Map<String, String> _noDictionaryConfig;
@@ -115,12 +113,12 @@ public class IndexingConfig extends BaseJsonConfig {
     return _rangeIndexVersion;
   }
 
-  public void setFSTIndexType(FSTType fstType) {
-    _fstTypeForFSTIndex = fstType;
+  public void setFSTIndexType(FSTType fstIndexType) {
+    _fstIndexType = fstIndexType;
   }
 
   public FSTType getFSTIndexType() {
-    return _fstTypeForFSTIndex;
+    return _fstIndexType;
   }
 
   public void setRangeIndexVersion(int rangeIndexVersion) {
@@ -175,33 +173,6 @@ public class IndexingConfig extends BaseJsonConfig {
     _createInvertedIndexDuringSegmentGeneration = 
createInvertedIndexDuringSegmentGeneration;
   }
 
-  public List<String> getH3IndexColumns() {
-    return _h3IndexColumns;
-  }
-
-  public IndexingConfig setH3IndexColumns(List<String> h3IndexColumns) {
-    _h3IndexColumns = h3IndexColumns;
-    return this;
-  }
-
-  public List<String> getVectorIndexColumns() {
-    return _vectorIndexColumns;
-  }
-
-  public IndexingConfig setVectorIndexColumns(List<String> vectorIndexColumns) 
{
-    _vectorIndexColumns = vectorIndexColumns;
-    return this;
-  }
-
-  public FSTType getFstTypeForFSTIndex() {
-    return _fstTypeForFSTIndex;
-  }
-
-  public IndexingConfig setFstTypeForFSTIndex(FSTType fstTypeForFSTIndex) {
-    _fstTypeForFSTIndex = fstTypeForFSTIndex;
-    return this;
-  }
-
   @Nullable
   public List<String> getSortedColumn() {
     return _sortedColumn;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to