This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fce17d3d39 Do not allow setting TableConfig in IndexLoadingConfig 
after construction (#14098)
fce17d3d39 is described below

commit fce17d3d392f4de2484ab7d72062f6b89b58b02a
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Sep 27 15:43:00 2024 -0700

    Do not allow setting TableConfig in IndexLoadingConfig after construction 
(#14098)
---
 .../apache/pinot/queries/BaseJsonQueryTest.java    |  15 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      |  45 +--
 .../org/apache/pinot/queries/ExprMinMaxTest.java   |   6 +-
 .../ForwardIndexDisabledMultiValueQueriesTest.java | 163 ++++-----
 ...dexDisabledMultiValueQueriesWithReloadTest.java | 242 +++++--------
 ...ForwardIndexDisabledSingleValueQueriesTest.java | 277 ++++++---------
 .../ForwardIndexHandlerReloadQueriesTest.java      | 374 ++++++++-------------
 .../pinot/queries/JsonDataTypeQueriesTest.java     |  37 +-
 .../queries/JsonIngestionFromAvroQueriesTest.java  |  68 ++--
 .../pinot/queries/JsonMalformedIndexTest.java      | 144 ++++----
 .../JsonUnnestIngestionFromAvroQueriesTest.java    |  68 ++--
 .../segment/index/loader/IndexLoadingConfig.java   |  11 +-
 .../index/loader/SegmentPreProcessorTest.java      |   5 +-
 .../apache/pinot/spi/config/table/FieldConfig.java |   8 +-
 14 files changed, 544 insertions(+), 919 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseJsonQueryTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseJsonQueryTest.java
index bccb650f92..d2cefa9475 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseJsonQueryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseJsonQueryTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -35,7 +34,6 @@ import 
org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
@@ -99,6 +97,7 @@ public abstract class BaseJsonQueryTest extends 
BaseQueriesTest {
     FileUtils.deleteDirectory(indexDir);
 
     TableConfig tableConfig = tableConfig();
+    Schema schema = schema();
 
     List<GenericRow> records = new ArrayList<>(numRecords());
     records.add(createRecord(1, 1, "daffy duck",
@@ -142,10 +141,8 @@ public abstract class BaseJsonQueryTest extends 
BaseQueriesTest {
         "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": 
\"array\"},\"days\": 111}"));
     records.add(createRecord(14, 14, "top level array", "[{\"i1\":1,\"i2\":2}, 
{\"i1\":3,\"i2\":4}]"));
 
-    List<String> jsonIndexColumns = new ArrayList<>();
-    jsonIndexColumns.add("jsonColumn");
-    tableConfig.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema());
+    tableConfig.getIndexingConfig().setJsonIndexColumns(List.of("jsonColumn"));
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
     segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
     segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
     segmentGeneratorConfig.setOutDir(indexDir.getPath());
@@ -154,11 +151,7 @@ public abstract class BaseJsonQueryTest extends 
BaseQueriesTest {
     driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
     driver.build();
 
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(tableConfig);
-    indexLoadingConfig.setJsonIndexColumns(new HashSet<>(jsonIndexColumns));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
     ImmutableSegment immutableSegment =
         ImmutableSegmentLoader.load(new File(indexDir, SEGMENT_NAME), 
indexLoadingConfig);
     _indexSegment = immutableSegment;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index 23fdd0c545..dce00515d0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -59,7 +58,7 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -68,7 +67,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -131,16 +129,23 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
       .addMultiValueDimension(MV_COL1_NO_INDEX, DataType.INT)
       .build();
 
+  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .setNoDictionaryColumns(List.of(COL1_RAW, MV_COL1_RAW))
+      .setSortedColumn(COL1_SORTED_INDEX)
+      .setInvertedIndexColumns(List.of(COL1_INVERTED_INDEX, 
COL2_INVERTED_INDEX, COL3_INVERTED_INDEX))
+      .setRangeIndexColumns(List.of(COL1_RANGE_INDEX, COL2_RANGE_INDEX, 
COL3_RANGE_INDEX))
+      .setJsonIndexColumns(List.of(COL1_JSON_INDEX))
+      .setFieldConfigList(List.of(
+          new FieldConfig(COL1_TEXT_INDEX, 
FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.TEXT),
+              null, null)))
+      .build();
+
   private static final DataSchema DATA_SCHEMA = new DataSchema(
       new String[]{"Operator", "Operator_Id", "Parent_Id"},
       new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.INT}
   );
   //@formatter:on
 
-  private static final TableConfig TABLE_CONFIG =
-      new 
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(List.of(COL1_RAW, 
MV_COL1_RAW))
-          .setTableName(RAW_TABLE_NAME).build();
-
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
   private List<String> _segmentNames;
@@ -198,22 +203,6 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
 
   ImmutableSegment createImmutableSegment(List<GenericRow> records, String 
segmentName)
       throws Exception {
-    IndexingConfig indexingConfig = TABLE_CONFIG.getIndexingConfig();
-
-    List<String> invertedIndexColumns = List.of(COL1_INVERTED_INDEX, 
COL2_INVERTED_INDEX, COL3_INVERTED_INDEX);
-    indexingConfig.setInvertedIndexColumns(invertedIndexColumns);
-
-    List<String> rangeIndexColumns = List.of(COL1_RANGE_INDEX, 
COL2_RANGE_INDEX, COL3_RANGE_INDEX);
-    indexingConfig.setRangeIndexColumns(rangeIndexColumns);
-
-    List<String> sortedIndexColumns = List.of(COL1_SORTED_INDEX);
-    indexingConfig.setSortedColumn(sortedIndexColumns);
-
-    List<String> jsonIndexColumns = List.of(COL1_JSON_INDEX);
-    indexingConfig.setJsonIndexColumns(jsonIndexColumns);
-
-    List<String> textIndexColumns = List.of(COL1_TEXT_INDEX);
-
     File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
     segmentGeneratorConfig.setSegmentName(segmentName);
@@ -223,16 +212,8 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
     driver.build();
 
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(TABLE_CONFIG);
-    indexLoadingConfig.setInvertedIndexColumns(new 
HashSet<>(invertedIndexColumns));
-    indexLoadingConfig.setRangeIndexColumns(new HashSet<>(rangeIndexColumns));
-    indexLoadingConfig.setJsonIndexColumns(new HashSet<>(jsonIndexColumns));
-    indexLoadingConfig.setTextIndexColumns(new HashSet<>(textIndexColumns));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
     _segmentNames.add(segmentName);
-
     return ImmutableSegmentLoader.load(new File(tableDataDir, segmentName), 
indexLoadingConfig);
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
index 121ed8c563..20d9fa80ab 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExprMinMaxTest.java
@@ -45,14 +45,13 @@ import 
org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.spi.utils.CommonConstants.RewriterConstants.*;
+import static 
org.apache.pinot.spi.utils.CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX;
+import static 
org.apache.pinot.spi.utils.CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -63,7 +62,6 @@ import static org.testng.Assert.fail;
  * Queries test for exprmin/exprmax functions.
  */
 public class ExprMinMaxTest extends BaseQueriesTest {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExprMinMaxTest.class);
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"ExprMinMaxTest");
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String SEGMENT_NAME = "testSegment";
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
index 4d392f9766..4a6010c37f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
@@ -20,13 +20,8 @@ package org.apache.pinot.queries;
 
 import java.io.File;
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -39,20 +34,15 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
-import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.*;
@@ -78,127 +68,96 @@ import static org.testng.Assert.*;
  * </ul>
  */
 public class ForwardIndexDisabledMultiValueQueriesTest extends BaseQueriesTest 
{
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
ForwardIndexDisabledMultiValueQueriesTest.class.getSimpleName());
   private static final String AVRO_DATA = "data" + File.separator + 
"test_data-mv.avro";
-  private static final String SEGMENT_NAME_1 = 
"testTable_1756015688_1756015688";
-  private static final String SEGMENT_NAME_2 = 
"testTable_1756015689_1756015689";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
-      "ForwardIndexDisabledMultiValueQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
 
   private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
 
-  // Hard-coded query filter.
+  //@formatter:off
+  // Hard-coded query filter
   protected static final String FILTER = " WHERE column1 > 100000000"
       + " AND column2 BETWEEN 20000000 AND 1000000000"
       + " AND column3 <> 'w'"
       + " AND (column6 < 500000 OR column7 NOT IN (225, 407))"
       + " AND daysSinceEpoch = 1756015683";
+  //@formatter:on
 
   private IndexSegment _indexSegment;
   // Contains 2 identical index segments.
   private List<IndexSegment> _indexSegments;
 
-  private TableConfig _tableConfig;
-  private List<String> _invertedIndexColumns;
-  private List<String> _forwardIndexDisabledColumns;
-
-  @BeforeMethod
-  public void buildSegment()
+  @BeforeClass
+  public void setUp()
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
 
-    // Get resource file path.
+    //@formatter:off
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addMetric("column1", DataType.INT)
+        .addMetric("column2", DataType.INT)
+        .addSingleValueDimension("column3", DataType.STRING)
+        .addSingleValueDimension("column5", DataType.STRING)
+        .addMultiValueDimension("column6", DataType.INT)
+        .addMultiValueDimension("column7", DataType.INT)
+        .addSingleValueDimension("column8", DataType.INT)
+        .addMetric("column9", DataType.INT)
+        .addMetric("column10", DataType.INT)
+        .addDateTime("daysSinceEpoch", DataType.INT, "EPOCH|DAYS", "1:DAYS")
+        .build();
+
+    List<FieldConfig> fieldConfigs = List.of(
+        new FieldConfig("column6", FieldConfig.EncodingType.DICTIONARY, 
List.of(), null,
+            Map.of(FieldConfig.FORWARD_INDEX_DISABLED, "true")),
+        new FieldConfig("column7", FieldConfig.EncodingType.DICTIONARY, 
List.of(), null,
+            Map.of(FieldConfig.FORWARD_INDEX_DISABLED, "true")));
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName("daysSinceEpoch")
+        .setNoDictionaryColumns(List.of("column5"))
+        .setInvertedIndexColumns(List.of("column3", "column6", "column7", 
"column8", "column9"))
+        .setCreateInvertedIndexDuringSegmentGeneration(true)
+        .setFieldConfigList(fieldConfigs)
+        .build();
+    //@formatter:on
+
     URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
     assertNotNull(resource);
-    String filePath = resource.getFile();
-
-    // Build the segment schema.
-    Schema schema = new 
Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", 
FieldSpec.DataType.INT)
-        .addMetric("column2", 
FieldSpec.DataType.INT).addSingleValueDimension("column3", 
FieldSpec.DataType.STRING)
-        .addSingleValueDimension("column5", FieldSpec.DataType.STRING)
-        .addMultiValueDimension("column6", FieldSpec.DataType.INT)
-        .addMultiValueDimension("column7", FieldSpec.DataType.INT)
-        .addSingleValueDimension("column8", 
FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
-        .addMetric("column10", FieldSpec.DataType.INT)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
-
-    createSegment(filePath, SEGMENT_NAME_1, schema);
-    createSegment(filePath, SEGMENT_NAME_2, schema);
-
-    ImmutableSegment immutableSegment1 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment2 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
-
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-  }
-
-  private void createSegment(String filePath, String segmentName, Schema 
schema)
-      throws Exception {
-    // Create field configs for the no forward index columns
-    List<FieldConfig> fieldConfigList = new ArrayList<>();
-    fieldConfigList.add(new FieldConfig("column6", 
FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
-        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
-    if (segmentName.equals(SEGMENT_NAME_1)) {
-      fieldConfigList.add(new FieldConfig("column7", 
FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
-          null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
+    String avroFile = resource.getFile();
 
-      // Build table config based on segment 1 as it contains both columns 
under no forward index
-      _tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList("column5"))
-          
.setTableName("testTable").setTimeColumnName("daysSinceEpoch").setFieldConfigList(fieldConfigList).build();
-    }
-
-    // Create the segment generator config.
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, schema);
-    segmentGeneratorConfig.setInputFilePath(filePath);
-    segmentGeneratorConfig.setTableName("testTable");
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentName(segmentName);
-    _invertedIndexColumns = Arrays.asList("column3", "column6", "column7", 
"column8", "column9");
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), 
IndexConfig.ENABLED, _invertedIndexColumns);
-    _forwardIndexDisabledColumns = new ArrayList<>(Arrays.asList("column6", 
"column7"));
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.forward(), 
ForwardIndexConfig.DISABLED,
-        _forwardIndexDisabledColumns);
-    // The segment generation code in SegmentColumnarIndexCreator will throw
-    // exception if start and end time in time column are not in acceptable
-    // range. For this test, we first need to fix the input avro data
-    // to have the time column values in allowed range. Until then, the check
-    // is explicitly disabled
-    segmentGeneratorConfig.setSkipTimeValueCheck(true);
-
-    // Build the index segment.
+    SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    generatorConfig.setInputFilePath(avroFile);
+    generatorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    generatorConfig.setSegmentName(SEGMENT_NAME);
+    generatorConfig.setSkipTimeValueCheck(true);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig);
+    driver.init(generatorConfig);
     driver.build();
-  }
 
-  private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
-      throws Exception {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    indexLoadingConfig.setInvertedIndexColumns(new 
HashSet<>(_invertedIndexColumns));
-    indexLoadingConfig.setForwardIndexDisabledColumns(new 
HashSet<>(_forwardIndexDisabledColumns));
-    indexLoadingConfig.setReadMode(ReadMode.heap);
-
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, segmentName),
-        indexLoadingConfig);
-
-    Map<String, ColumnMetadata> columnMetadataMap1 = 
immutableSegment.getSegmentMetadata().getColumnMetadataMap();
-    columnMetadataMap1.forEach((column, metadata) -> {
+    ImmutableSegment segment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME),
+        new IndexLoadingConfig(tableConfig, schema));
+    Map<String, ColumnMetadata> columnMetadataMap = 
segment.getSegmentMetadata().getColumnMetadataMap();
+    for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+      String column = entry.getKey();
+      ColumnMetadata metadata = entry.getValue();
       if (column.equals("column6") || column.equals("column7")) {
         assertTrue(metadata.hasDictionary());
         assertFalse(metadata.isSingleValue());
-        assertNull(immutableSegment.getForwardIndex(column));
+        assertNull(segment.getForwardIndex(column));
       } else {
-        assertNotNull(immutableSegment.getForwardIndex(column));
+        assertNotNull(segment.getForwardIndex(column));
       }
-    });
+    }
 
-    return immutableSegment;
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
   }
 
-  @AfterMethod
-  public void deleteAndDestroySegment() {
+  @AfterClass
+  public void tearDown() {
+    _indexSegment.destroy();
     FileUtils.deleteQuietly(INDEX_DIR);
-    _indexSegments.forEach((IndexSegment::destroy));
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
index 5f32ac11c8..0a8efc9f1a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesWithReloadTest.java
@@ -21,13 +21,10 @@ package org.apache.pinot.queries;
 import java.io.File;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -40,16 +37,11 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
-import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -80,22 +72,25 @@ import static org.testng.Assert.*;
  * </ul>
  */
 public class ForwardIndexDisabledMultiValueQueriesWithReloadTest extends 
BaseQueriesTest {
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
ForwardIndexDisabledMultiValueQueriesWithReloadTest.class.getSimpleName());
   private static final String AVRO_DATA = "data" + File.separator + 
"test_data-mv.avro";
-  private static final String SEGMENT_NAME_1 = 
"testTable_1756015688_1756015688";
-  private static final String SEGMENT_NAME_2 = 
"testTable_1756015689_1756015689";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
-      "ForwardIndexDisabledMultiValueQueriesWithReloadTest");
-
-  // Build the segment schema.
-  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("testTable")
-      .addMetric("column1", FieldSpec.DataType.INT)
-      .addMetric("column2", 
FieldSpec.DataType.INT).addSingleValueDimension("column3", 
FieldSpec.DataType.STRING)
-      .addSingleValueDimension("column5", FieldSpec.DataType.STRING)
-      .addMultiValueDimension("column6", FieldSpec.DataType.INT)
-      .addMultiValueDimension("column7", FieldSpec.DataType.INT)
-      .addSingleValueDimension("column8", 
FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
-      .addMetric("column10", FieldSpec.DataType.INT)
-      .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"daysSinceEpoch"), null).build();
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  //@formatter:off
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addMetric("column1", DataType.INT)
+      .addMetric("column2", DataType.INT)
+      .addSingleValueDimension("column3", DataType.STRING)
+      .addSingleValueDimension("column5", DataType.STRING)
+      .addMultiValueDimension("column6", DataType.INT)
+      .addMultiValueDimension("column7", DataType.INT)
+      .addSingleValueDimension("column8", DataType.INT)
+      .addMetric("column9", DataType.INT)
+      .addMetric("column10", DataType.INT)
+      .addDateTime("daysSinceEpoch", DataType.INT, "EPOCH|DAYS", "1:DAYS")
+      .build();
 
   // Hard-coded query filter.
   protected static final String FILTER = " WHERE column1 > 100000000"
@@ -103,103 +98,69 @@ public class 
ForwardIndexDisabledMultiValueQueriesWithReloadTest extends BaseQue
       + " AND column3 <> 'w'"
       + " AND (column6 < 500000 OR column7 NOT IN (225, 407))"
       + " AND daysSinceEpoch = 1756015683";
+  //@formatter:on
 
   private IndexSegment _indexSegment;
   // Contains 2 identical index segments.
   private List<IndexSegment> _indexSegments;
 
-  private TableConfig _tableConfig;
-  private List<String> _invertedIndexColumns;
-  private List<String> _forwardIndexDisabledColumns;
-  private List<String> _noDictionaryColumns;
-
   @BeforeMethod
-  public void buildSegment()
+  public void setUp()
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
 
-    // Get resource file path.
+    TableConfig tableConfig =
+        createTableConfig(List.of("column5", "column7"), List.of("column3", 
"column6", "column8", "column9"),
+            List.of("column6"));
+
     URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
     assertNotNull(resource);
-    String filePath = resource.getFile();
-
-    createSegment(filePath, SEGMENT_NAME_1);
-    createSegment(filePath, SEGMENT_NAME_2);
-
-    ImmutableSegment immutableSegment1 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment2 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
+    String avroFile = resource.getFile();
 
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-  }
-
-  private void createSegment(String filePath, String segmentName)
-      throws Exception {
-    // Create field configs for the no forward index columns
-    List<FieldConfig> fieldConfigList = new ArrayList<>();
-    fieldConfigList.add(new FieldConfig("column6", 
FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
-        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
-    // Build table config based on segment 1 as it contains both columns under 
no forward index
-    _noDictionaryColumns = new ArrayList<>(Arrays.asList("column5", 
"column7"));
-    _tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
-        
.setTimeColumnName("daysSinceEpoch").setNoDictionaryColumns(_noDictionaryColumns)
-        .setFieldConfigList(fieldConfigList).build();
-
-    // Create the segment generator config.
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, SCHEMA);
-    segmentGeneratorConfig.setInputFilePath(filePath);
-    segmentGeneratorConfig.setTableName("testTable");
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentName(segmentName);
-    _invertedIndexColumns = Arrays.asList("column3", "column6", "column8", 
"column9");
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), 
IndexConfig.ENABLED, _invertedIndexColumns);
-    _forwardIndexDisabledColumns = new ArrayList<>(Arrays.asList("column6"));
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.forward(), 
ForwardIndexConfig.DISABLED,
-        _forwardIndexDisabledColumns);
-    segmentGeneratorConfig.setRawIndexCreationColumns(_noDictionaryColumns);
-    // The segment generation code in SegmentColumnarIndexCreator will throw
-    // exception if start and end time in time column are not in acceptable
-    // range. For this test, we first need to fix the input avro data
-    // to have the time column values in allowed range. Until then, the check
-    // is explicitly disabled
-    segmentGeneratorConfig.setSkipTimeValueCheck(true);
-
-    // Build the index segment.
+    SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, SCHEMA);
+    generatorConfig.setInputFilePath(avroFile);
+    generatorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    generatorConfig.setSegmentName(SEGMENT_NAME);
+    generatorConfig.setSkipTimeValueCheck(true);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig);
+    driver.init(generatorConfig);
     driver.build();
-  }
 
-  private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
-      throws Exception {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    indexLoadingConfig.setInvertedIndexColumns(new 
HashSet<>(_invertedIndexColumns));
-    indexLoadingConfig.setForwardIndexDisabledColumns(new 
HashSet<>(_forwardIndexDisabledColumns));
-    indexLoadingConfig.setNoDictionaryColumns(new 
HashSet<>(_noDictionaryColumns));
-    indexLoadingConfig.setReadMode(ReadMode.heap);
-
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, segmentName),
-        indexLoadingConfig);
-
-    Map<String, ColumnMetadata> columnMetadataMap1 = 
immutableSegment.getSegmentMetadata().getColumnMetadataMap();
-    columnMetadataMap1.forEach((column, metadata) -> {
+    ImmutableSegment segment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME),
+        new IndexLoadingConfig(tableConfig, SCHEMA));
+    Map<String, ColumnMetadata> columnMetadataMap = 
segment.getSegmentMetadata().getColumnMetadataMap();
+    for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+      String column = entry.getKey();
+      ColumnMetadata metadata = entry.getValue();
       if (column.equals("column6")) {
         assertTrue(metadata.hasDictionary());
         assertFalse(metadata.isSingleValue());
-        assertNull(immutableSegment.getForwardIndex(column));
+        assertNull(segment.getForwardIndex(column));
       } else {
-        assertNotNull(immutableSegment.getForwardIndex(column));
+        assertNotNull(segment.getForwardIndex(column));
       }
-    });
+    }
 
-    return immutableSegment;
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+  }
+
+  private TableConfig createTableConfig(List<String> noDictionaryColumns, 
List<String> invertedIndexColumns,
+      List<String> forwardIndexDisabledColumns) {
+    List<FieldConfig> fieldConfigs = new 
ArrayList<>(forwardIndexDisabledColumns.size());
+    for (String column : forwardIndexDisabledColumns) {
+      fieldConfigs.add(new FieldConfig(column, 
FieldConfig.EncodingType.DICTIONARY, List.of(), null,
+          Map.of(FieldConfig.FORWARD_INDEX_DISABLED, "true")));
+    }
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName("daysSinceEpoch")
+        
.setNoDictionaryColumns(noDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
+        
.setCreateInvertedIndexDuringSegmentGeneration(true).setFieldConfigList(fieldConfigs).build();
   }
 
   @AfterMethod
-  public void deleteAndDestroySegment() {
+  public void tearDown() {
+    _indexSegment.destroy();
     FileUtils.deleteQuietly(INDEX_DIR);
-    _indexSegments.forEach((IndexSegment::destroy));
   }
 
   @Override
@@ -409,7 +370,8 @@ public class 
ForwardIndexDisabledMultiValueQueriesWithReloadTest extends BaseQue
     // Validate that the result row size before disabling the forward index 
matches the result row size after
     // re-enabling the forward index
     assertEquals(resultRowsAfterReenabling.size(), 
resultRowsBeforeDisabling.size());
-    for (int i = 0; i < resultRowsAfterReenabling.size(); i++) {
+    // Validate the first 10 rows
+    for (int i = 0; i < 10; i++) {
       Object[] resultRow = resultRowsAfterReenabling.get(i);
       assertEquals(resultRow.length, 1);
       int[] rowValuesAfterReenabling = (int[]) resultRow[0];
@@ -745,69 +707,45 @@ public class 
ForwardIndexDisabledMultiValueQueriesWithReloadTest extends BaseQue
 
   private void disableForwardIndexForSomeColumns()
       throws Exception {
-    // Now disable forward index for column7 in the index loading config.
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    Set<String> invertedIndexEnabledColumns = new 
HashSet<>(_invertedIndexColumns);
-    invertedIndexEnabledColumns.add("column7");
-    indexLoadingConfig.setInvertedIndexColumns(invertedIndexEnabledColumns);
-    Set<String> forwardIndexDisabledColumns = new 
HashSet<>(_forwardIndexDisabledColumns);
-    forwardIndexDisabledColumns.add("column7");
-    
indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-    indexLoadingConfig.removeNoDictionaryColumns("column7");
-    indexLoadingConfig.setReadMode(ReadMode.heap);
+    // Now disable forward index for column7 in the table config
+    TableConfig tableConfig =
+        createTableConfig(List.of("column5"), List.of("column3", "column6", 
"column7", "column8", "column9"),
+            List.of("column6", "column7"));
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
 
     // Reload the segments to pick up the new configs
-    File indexDir = new File(INDEX_DIR, SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment1 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    indexDir = new File(INDEX_DIR, SEGMENT_NAME_2);
-    ImmutableSegment immutableSegment2 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-
-    assertNull(immutableSegment1.getForwardIndex("column7"));
-    assertNotNull(immutableSegment1.getInvertedIndex("column7"));
-    assertNotNull(immutableSegment1.getDictionary("column7"));
-
-    assertNull(immutableSegment2.getForwardIndex("column7"));
-    assertNotNull(immutableSegment2.getInvertedIndex("column7"));
-    assertNotNull(immutableSegment2.getDictionary("column7"));
+    File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
+    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    _indexSegment.destroy();
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+
+    assertNull(segment.getForwardIndex("column7"));
+    assertNotNull(segment.getInvertedIndex("column7"));
+    assertNotNull(segment.getDictionary("column7"));
   }
 
   private void reenableForwardIndexForSomeColumns()
       throws Exception {
-    // Now re-enable forward index for column7 in the index loading config.
+    // Now re-enable forward index for column7 in the table config
     // Also re-enable forward index for column6
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    Set<String> invertedIndexEnabledColumns = new 
HashSet<>(_invertedIndexColumns);
-    indexLoadingConfig.setInvertedIndexColumns(invertedIndexEnabledColumns);
-    Set<String> forwardIndexDisabledColumns = new 
HashSet<>(_forwardIndexDisabledColumns);
-    forwardIndexDisabledColumns.remove("column6");
-    
indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-    indexLoadingConfig.addNoDictionaryColumns("column7");
-    indexLoadingConfig.setReadMode(ReadMode.heap);
+    TableConfig tableConfig =
+        createTableConfig(List.of("column5", "column7"), List.of("column3", 
"column6", "column8", "column9"),
+            List.of());
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
 
     // Reload the segments to pick up the new configs
-    File indexDir = new File(INDEX_DIR, SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment1 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    indexDir = new File(INDEX_DIR, SEGMENT_NAME_2);
-    ImmutableSegment immutableSegment2 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-
-    assertNotNull(immutableSegment1.getForwardIndex("column7"));
-    assertNull(immutableSegment1.getInvertedIndex("column7"));
-    assertNull(immutableSegment1.getDictionary("column7"));
-    assertNotNull(immutableSegment1.getForwardIndex("column6"));
-    assertNotNull(immutableSegment1.getInvertedIndex("column6"));
-    assertNotNull(immutableSegment1.getDictionary("column6"));
-
-    assertNotNull(immutableSegment2.getForwardIndex("column7"));
-    assertNull(immutableSegment2.getInvertedIndex("column7"));
-    assertNull(immutableSegment2.getDictionary("column7"));
-    assertNotNull(immutableSegment2.getForwardIndex("column6"));
-    assertNotNull(immutableSegment2.getInvertedIndex("column6"));
-    assertNotNull(immutableSegment2.getDictionary("column6"));
+    File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
+    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    _indexSegment.destroy();
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+
+    assertNotNull(segment.getForwardIndex("column7"));
+    assertNull(segment.getInvertedIndex("column7"));
+    assertNull(segment.getDictionary("column7"));
+    assertNotNull(segment.getForwardIndex("column6"));
+    assertNotNull(segment.getInvertedIndex("column6"));
+    assertNotNull(segment.getDictionary("column6"));
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
index c37f14a449..a09f1dd863 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
@@ -21,13 +21,8 @@ package org.apache.pinot.queries;
 import java.io.File;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -40,17 +35,11 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
-import org.apache.pinot.segment.spi.index.RangeIndexConfig;
-import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -85,139 +74,99 @@ import static org.testng.Assert.assertTrue;
  * </ul>
  */
 public class ForwardIndexDisabledSingleValueQueriesTest extends 
BaseQueriesTest {
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
ForwardIndexDisabledSingleValueQueriesTest.class.getSimpleName());
   private static final String AVRO_DATA = "data" + File.separator + 
"test_data-sv.avro";
-  private static final String SEGMENT_NAME_1 = "testTable_126164076_167572857";
-  private static final String SEGMENT_NAME_2 = "testTable_126164076_167572858";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
-      "ForwardIndexDisabledSingleValueQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
 
   private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
 
-  // Hard-coded query filter.
+  //@formatter:off
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addMetric("column1", DataType.INT)
+      .addMetric("column3", DataType.INT)
+      .addSingleValueDimension("column5", DataType.STRING)
+      .addSingleValueDimension("column6", DataType.INT)
+      .addSingleValueDimension("column7", DataType.INT)
+      .addSingleValueDimension("column9", DataType.INT)
+      .addSingleValueDimension("column11", DataType.STRING)
+      .addSingleValueDimension("column12", DataType.STRING)
+      .addMetric("column17", DataType.INT)
+      .addMetric("column18", DataType.INT)
+      .addDateTime("daysSinceEpoch", DataType.INT, "EPOCH|DAYS", "1:DAYS")
+      .build();
+
+  // Hard-coded query filter
   private static final String FILTER = " WHERE column1 > 100000000"
       + " AND column3 BETWEEN 20000000 AND 1000000000"
       + " AND column5 = 'gFuH'"
       + " AND (column6 < 500000000 OR column11 NOT IN ('t', 'P'))"
       + " AND daysSinceEpoch = 126164076";
-
-  // Build the segment schema.
-  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName("testTable")
-      .addMetric("column1", FieldSpec.DataType.INT)
-      .addMetric("column3", 
FieldSpec.DataType.INT).addSingleValueDimension("column5", 
FieldSpec.DataType.STRING)
-      .addSingleValueDimension("column6", FieldSpec.DataType.INT)
-      .addSingleValueDimension("column7", FieldSpec.DataType.INT)
-      .addSingleValueDimension("column9", FieldSpec.DataType.INT)
-      .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
-      .addSingleValueDimension("column12", 
FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
-      .addMetric("column18", FieldSpec.DataType.INT)
-      .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"daysSinceEpoch"), null).build();
+  //@formatter:on
 
   private IndexSegment _indexSegment;
-  // Contains 2 index segments, one with 2 columns with forward index 
disabled, and the other with just 1.
+  // Contains 2 identical index segments
   private List<IndexSegment> _indexSegments;
 
-  private TableConfig _tableConfig;
-  private List<String> _invertedIndexColumns;
-  private List<String> _forwardIndexDisabledColumns;
-  private List<String> _noDictionaryColumns;
-
   @BeforeMethod
-  public void buildAndLoadSegment()
+  public void setUp()
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
 
-    // Get resource file path.
+    TableConfig tableConfig =
+        createTableConfig(List.of("column9"), List.of("column6", "column7", 
"column11", "column17", "column18"),
+            List.of("column6"), List.of("column6", "column7"));
+
     URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
     assertNotNull(resource);
-    String filePath = resource.getFile();
-
-    createSegment(filePath, SEGMENT_NAME_1);
-    createSegment(filePath, SEGMENT_NAME_2);
-
-    ImmutableSegment immutableSegment1 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment2 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
-
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-  }
+    String avroFile = resource.getFile();
 
-  private void createSegment(String filePath, String segmentName)
-      throws Exception {
-    // Create field configs for the no forward index columns
-    _noDictionaryColumns = Arrays.asList("column9");
-    List<FieldConfig> fieldConfigList = new ArrayList<>();
-    fieldConfigList.add(new FieldConfig("column6", 
FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
-        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
-    if (segmentName.equals(SEGMENT_NAME_1)) {
-      fieldConfigList.add(new FieldConfig("column7", 
FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
-          null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, 
Boolean.TRUE.toString())));
-
-      // Build table config based on segment 1 as it contains both columns 
under no forward index
-      _tableConfig =
-          new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
-              
.setFieldConfigList(fieldConfigList).setRangeIndexColumns(Arrays.asList("column6"))
-              .setNoDictionaryColumns(_noDictionaryColumns).build();
-    }
-
-    // Create the segment generator config.
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, SCHEMA);
-    segmentGeneratorConfig.setInputFilePath(filePath);
-    segmentGeneratorConfig.setTableName("testTable");
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentName(segmentName);
-    // The segment generation code in SegmentColumnarIndexCreator will throw
-    // exception if start and end time in time column are not in acceptable
-    // range. For this test, we first need to fix the input avro data
-    // to have the time column values in allowed range. Until then, the check
-    // is explicitly disabled
-    segmentGeneratorConfig.setSkipTimeValueCheck(true);
-    _invertedIndexColumns = Arrays.asList("column6", "column7", "column11", 
"column17", "column18");
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), 
IndexConfig.ENABLED, _invertedIndexColumns);
-    segmentGeneratorConfig.setRawIndexCreationColumns(_noDictionaryColumns);
-
-    _forwardIndexDisabledColumns = new ArrayList<>(Arrays.asList("column6", 
"column7"));
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.forward(), 
ForwardIndexConfig.DISABLED,
-        _forwardIndexDisabledColumns);
-    RangeIndexConfig rangeIndexConfig = RangeIndexConfig.DEFAULT;
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.range(), 
rangeIndexConfig, "column6");
-
-    // Build the index segment.
+    SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, SCHEMA);
+    generatorConfig.setInputFilePath(avroFile);
+    generatorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    generatorConfig.setSegmentName(SEGMENT_NAME);
+    generatorConfig.setSkipTimeValueCheck(true);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig);
+    driver.init(generatorConfig);
     driver.build();
-  }
 
-  private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
-      throws Exception {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    indexLoadingConfig.setInvertedIndexColumns(new 
HashSet<>(_invertedIndexColumns));
-    indexLoadingConfig.setForwardIndexDisabledColumns(new 
HashSet<>(_forwardIndexDisabledColumns));
-    indexLoadingConfig.setRangeIndexColumns(new 
HashSet<>(Arrays.asList("column6")));
-    indexLoadingConfig.setNoDictionaryColumns(new 
HashSet<>(_noDictionaryColumns));
-    indexLoadingConfig.setReadMode(ReadMode.heap);
-
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, segmentName),
-        indexLoadingConfig);
-
-    Map<String, ColumnMetadata> columnMetadataMap1 = 
immutableSegment.getSegmentMetadata().getColumnMetadataMap();
-    columnMetadataMap1.forEach((column, metadata) -> {
+    ImmutableSegment segment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), new 
IndexLoadingConfig(tableConfig, SCHEMA));
+    Map<String, ColumnMetadata> columnMetadataMap = 
segment.getSegmentMetadata().getColumnMetadataMap();
+    for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+      String column = entry.getKey();
+      ColumnMetadata metadata = entry.getValue();
       if (column.equals("column6") || column.equals("column7")) {
         assertTrue(metadata.hasDictionary());
         assertTrue(metadata.isSingleValue());
-        assertNull(immutableSegment.getForwardIndex(column));
+        assertNull(segment.getForwardIndex(column));
       } else {
-        assertNotNull(immutableSegment.getForwardIndex(column));
+        assertNotNull(segment.getForwardIndex(column));
       }
-    });
+    }
+
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+  }
 
-    return immutableSegment;
+  private TableConfig createTableConfig(List<String> noDictionaryColumns, 
List<String> invertedIndexColumns,
+      List<String> rangeIndexColumns, List<String> 
forwardIndexDisabledColumns) {
+    List<FieldConfig> fieldConfigs = new 
ArrayList<>(forwardIndexDisabledColumns.size());
+    for (String column : forwardIndexDisabledColumns) {
+      fieldConfigs.add(new FieldConfig(column, 
FieldConfig.EncodingType.DICTIONARY, List.of(), null,
+          Map.of(FieldConfig.FORWARD_INDEX_DISABLED, "true")));
+    }
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName("daysSinceEpoch")
+        
.setNoDictionaryColumns(noDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
+        
.setCreateInvertedIndexDuringSegmentGeneration(true).setRangeIndexColumns(rangeIndexColumns)
+        .setFieldConfigList(fieldConfigs).build();
   }
 
   @AfterMethod
-  public void deleteAndDestroySegment() {
+  public void tearDown() {
+    _indexSegment.destroy();
     FileUtils.deleteQuietly(INDEX_DIR);
-    _indexSegments.forEach((IndexSegment::destroy));
   }
 
   @Override
@@ -1933,40 +1882,24 @@ public class ForwardIndexDisabledSingleValueQueriesTest 
extends BaseQueriesTest
       throws Exception {
     // Now disable forward index for column9 and column11 in the index loading 
config, while enabling inverted index
     // and range index for column9. column11 already has inverted index 
enabled.
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    Set<String> invertedIndexEnabledColumns = new 
HashSet<>(_invertedIndexColumns);
-    invertedIndexEnabledColumns.add("column9");
-    indexLoadingConfig.setInvertedIndexColumns(invertedIndexEnabledColumns);
-    Set<String> forwardIndexDisabledColumns = new 
HashSet<>(_forwardIndexDisabledColumns);
-    forwardIndexDisabledColumns.add("column9");
-    forwardIndexDisabledColumns.add("column11");
-    
indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-    indexLoadingConfig.setRangeIndexColumns(new 
HashSet<>(Arrays.asList("column6", "column9")));
-    indexLoadingConfig.removeNoDictionaryColumns("column9");
-    indexLoadingConfig.setReadMode(ReadMode.heap);
+    TableConfig tableConfig =
+        createTableConfig(List.of(), List.of("column6", "column7", "column9", 
"column11", "column17", "column18"),
+            List.of("column6", "column9"), List.of("column6", "column7", 
"column9", "column11"));
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
 
     // Reload the segments to pick up the new configs
-    File indexDir = new File(INDEX_DIR, SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment1 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    indexDir = new File(INDEX_DIR, SEGMENT_NAME_2);
-    ImmutableSegment immutableSegment2 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-
-    assertNull(immutableSegment1.getForwardIndex("column9"));
-    assertNotNull(immutableSegment1.getInvertedIndex("column9"));
-    assertNotNull(immutableSegment1.getDictionary("column9"));
-    assertNull(immutableSegment1.getForwardIndex("column11"));
-    assertNotNull(immutableSegment1.getInvertedIndex("column11"));
-    assertNotNull(immutableSegment1.getDictionary("column11"));
-
-    assertNull(immutableSegment2.getForwardIndex("column9"));
-    assertNotNull(immutableSegment2.getInvertedIndex("column9"));
-    assertNotNull(immutableSegment2.getDictionary("column9"));
-    assertNull(immutableSegment2.getForwardIndex("column11"));
-    assertNotNull(immutableSegment2.getInvertedIndex("column11"));
-    assertNotNull(immutableSegment2.getDictionary("column11"));
+    File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
+    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    _indexSegment.destroy();
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+
+    assertNull(segment.getForwardIndex("column9"));
+    assertNotNull(segment.getInvertedIndex("column9"));
+    assertNotNull(segment.getDictionary("column9"));
+    assertNull(segment.getForwardIndex("column11"));
+    assertNotNull(segment.getInvertedIndex("column11"));
+    assertNotNull(segment.getDictionary("column11"));
   }
 
   private void reenableForwardIndexForSomeColumns()
@@ -1974,44 +1907,26 @@ public class ForwardIndexDisabledSingleValueQueriesTest 
extends BaseQueriesTest
     // Now re-enable forward index for column9 and column11 in the index 
loading config, while disabling inverted index
     // and range index for column9. column11 already had inverted index 
enabled so leave it as is.
     // Also re-enable forward index for column6
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    Set<String> invertedIndexEnabledColumns = new 
HashSet<>(_invertedIndexColumns);
-    invertedIndexEnabledColumns.remove("column9");
-    indexLoadingConfig.setInvertedIndexColumns(invertedIndexEnabledColumns);
-    Set<String> forwardIndexDisabledColumns = new 
HashSet<>(_forwardIndexDisabledColumns);
-    forwardIndexDisabledColumns.remove("column6");
-    
indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-    indexLoadingConfig.setRangeIndexColumns(new 
HashSet<>(Collections.singletonList("column6")));
-    indexLoadingConfig.addNoDictionaryColumns("column9");
-    indexLoadingConfig.setReadMode(ReadMode.heap);
+    TableConfig tableConfig =
+        createTableConfig(List.of("column9"), List.of("column6", "column7", 
"column11", "column17", "column18"),
+            List.of("column6"), List.of("column7"));
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
 
     // Reload the segments to pick up the new configs
-    File indexDir = new File(INDEX_DIR, SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment1 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    indexDir = new File(INDEX_DIR, SEGMENT_NAME_2);
-    ImmutableSegment immutableSegment2 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-
-    assertNotNull(immutableSegment1.getForwardIndex("column9"));
-    assertNull(immutableSegment1.getInvertedIndex("column9"));
-    assertNull(immutableSegment1.getDictionary("column9"));
-    assertNotNull(immutableSegment1.getForwardIndex("column11"));
-    assertNotNull(immutableSegment1.getInvertedIndex("column11"));
-    assertNotNull(immutableSegment1.getDictionary("column11"));
-    assertNotNull(immutableSegment1.getForwardIndex("column6"));
-    assertNotNull(immutableSegment1.getInvertedIndex("column6"));
-    assertNotNull(immutableSegment1.getDictionary("column6"));
-
-    assertNotNull(immutableSegment2.getForwardIndex("column9"));
-    assertNull(immutableSegment2.getInvertedIndex("column9"));
-    assertNull(immutableSegment2.getDictionary("column9"));
-    assertNotNull(immutableSegment2.getForwardIndex("column11"));
-    assertNotNull(immutableSegment2.getInvertedIndex("column11"));
-    assertNotNull(immutableSegment2.getDictionary("column11"));
-    assertNotNull(immutableSegment2.getForwardIndex("column6"));
-    assertNotNull(immutableSegment2.getInvertedIndex("column6"));
-    assertNotNull(immutableSegment2.getDictionary("column6"));
+    File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
+    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    _indexSegment.destroy();
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+
+    assertNotNull(segment.getForwardIndex("column9"));
+    assertNull(segment.getInvertedIndex("column9"));
+    assertNull(segment.getDictionary("column9"));
+    assertNotNull(segment.getForwardIndex("column11"));
+    assertNotNull(segment.getInvertedIndex("column11"));
+    assertNotNull(segment.getDictionary("column11"));
+    assertNotNull(segment.getForwardIndex("column6"));
+    assertNotNull(segment.getInvertedIndex("column6"));
+    assertNotNull(segment.getDictionary("column6"));
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
index 147158dca7..c19b20f031 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
@@ -21,13 +21,8 @@ package org.apache.pinot.queries;
 import java.io.File;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -40,16 +35,11 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.RangeIndexConfig;
-import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -78,159 +68,99 @@ import static org.testng.Assert.*;
  * </ul>
  */
 public class ForwardIndexHandlerReloadQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
ForwardIndexHandlerReloadQueriesTest.class.getSimpleName());
   private static final String AVRO_DATA = "data" + File.separator + 
"test_data-mv.avro";
-  private static final String SEGMENT_NAME_1 = 
"testTable_1756015690_1756015690";
-  private static final String SEGMENT_NAME_2 = 
"testTable_1756015691_1756015691";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"ForwardIndexHandlerReloadQueriesTest");
-
-  // Build the segment schema.
-  private static final Schema SCHEMA =
-      new 
Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", 
FieldSpec.DataType.INT)
-          .addMetric("column2", 
FieldSpec.DataType.INT).addSingleValueDimension("column3", 
FieldSpec.DataType.STRING)
-          .addSingleValueDimension("column5", FieldSpec.DataType.STRING)
-          .addMultiValueDimension("column6", FieldSpec.DataType.INT)
-          .addMultiValueDimension("column7", FieldSpec.DataType.INT)
-          .addSingleValueDimension("column8", 
FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
-          .addMetric("column10", FieldSpec.DataType.INT)
-          .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
-
-  private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
-  // Hard-coded query filter.
-  protected static final String FILTER =
-      " WHERE column1 > 100000000" + " AND column2 BETWEEN 20000000 AND 
1000000000" + " AND column3 <> 'w'"
-          + " AND (column6 < 500000 OR column7 NOT IN (225, 407))" + " AND 
daysSinceEpoch = 1756015683";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  //@formatter:off
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addMetric("column1", DataType.INT)
+      .addMetric("column2", DataType.INT)
+      .addSingleValueDimension("column3", DataType.STRING)
+      .addSingleValueDimension("column5", DataType.STRING)
+      .addMultiValueDimension("column6", DataType.INT)
+      .addMultiValueDimension("column7", DataType.INT)
+      .addSingleValueDimension("column8", DataType.INT)
+      .addMetric("column9", DataType.INT)
+      .addMetric("column10", DataType.INT)
+      .addDateTime("daysSinceEpoch", DataType.INT, "EPOCH|DAYS", "1:DAYS")
+      .build();
+
+  // Hard-coded query filter
+  protected static final String FILTER = " WHERE column1 > 100000000"
+      + " AND column2 BETWEEN 20000000 AND 1000000000"
+      + " AND column3 <> 'w'"
+      + " AND (column6 < 500000 OR column7 NOT IN (225, 407))"
+      + " AND daysSinceEpoch = 1756015683";
+  //@formatter:on
 
   private IndexSegment _indexSegment;
   // Contains 2 identical index segments.
   private List<IndexSegment> _indexSegments;
 
-  private TableConfig _tableConfig;
-  private List<String> _invertedIndexColumns;
-  private List<String> _noDictionaryColumns;
-  private List<String> _rangeIndexColumns;
-
   @BeforeMethod
-  public void buildSegment()
+  public void setUp()
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
 
-    // Get resource file path.
+    List<String> noDictionaryColumns = List.of("column1", "column2", 
"column3", "column5", "column7", "column10");
+    List<String> invertedIndexColumns = List.of("column8", "column9");
+    List<FieldConfig> fieldConfigs = new 
ArrayList<>(noDictionaryColumns.size());
+    for (String column : noDictionaryColumns) {
+      fieldConfigs.add(
+          new FieldConfig(column, FieldConfig.EncodingType.RAW, List.of(), 
FieldConfig.CompressionCodec.SNAPPY, null));
+    }
+    TableConfig tableConfig = createTableConfig(noDictionaryColumns, 
invertedIndexColumns, List.of(), fieldConfigs);
+
     URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
     assertNotNull(resource);
-    String filePath = resource.getFile();
-
-    createSegment(filePath, SEGMENT_NAME_1);
-    createSegment(filePath, SEGMENT_NAME_2);
-
-    ImmutableSegment immutableSegment1 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment2 = 
loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
-
-    // immutableSegment1 checks
-    assertNotNull(immutableSegment1.getForwardIndex("column1"));
-    assertNull(immutableSegment1.getDictionary("column1"));
-    assertNotNull(immutableSegment1.getForwardIndex("column2"));
-    assertNull(immutableSegment1.getDictionary("column2"));
-    assertNotNull(immutableSegment1.getForwardIndex("column3"));
-    assertNull(immutableSegment1.getDictionary("column3"));
-    assertNotNull(immutableSegment1.getForwardIndex("column6"));
-    assertNotNull(immutableSegment1.getDictionary("column6"));
-    assertNotNull(immutableSegment1.getForwardIndex("column7"));
-    assertNull(immutableSegment1.getDictionary("column7"));
-    assertNotNull(immutableSegment1.getForwardIndex("column9"));
-    assertNotNull(immutableSegment1.getDictionary("column9"));
-    assertNotNull(immutableSegment1.getForwardIndex("column10"));
-    assertNull(immutableSegment1.getDictionary("column10"));
-
-    // immutableSegment2 checks
-    assertNotNull(immutableSegment2.getForwardIndex("column1"));
-    assertNull(immutableSegment2.getDictionary("column1"));
-    assertNotNull(immutableSegment2.getForwardIndex("column2"));
-    assertNull(immutableSegment2.getDictionary("column2"));
-    assertNotNull(immutableSegment2.getForwardIndex("column3"));
-    assertNull(immutableSegment2.getDictionary("column3"));
-    assertNotNull(immutableSegment1.getForwardIndex("column6"));
-    assertNotNull(immutableSegment1.getDictionary("column6"));
-    assertNotNull(immutableSegment2.getForwardIndex("column7"));
-    assertNull(immutableSegment2.getDictionary("column7"));
-    assertNotNull(immutableSegment1.getForwardIndex("column9"));
-    assertNotNull(immutableSegment1.getDictionary("column9"));
-    assertNotNull(immutableSegment2.getForwardIndex("column10"));
-    assertNull(immutableSegment2.getDictionary("column10"));
-
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-  }
+    String avroFile = resource.getFile();
 
-  private void createSegment(String filePath, String segmentName)
-      throws Exception {
-    _rangeIndexColumns = new ArrayList<>(Arrays.asList("column10", "column9"));
-
-    _noDictionaryColumns =
-        new ArrayList<>(Arrays.asList("column1", "column2", "column3", 
"column5", "column7", "column10"));
-    List<FieldConfig> fieldConfigs = new ArrayList<>();
-    for (String column : _noDictionaryColumns) {
-      fieldConfigs.add(new FieldConfig(column, FieldConfig.EncodingType.RAW, 
Collections.emptyList(),
-          FieldConfig.CompressionCodec.SNAPPY, null));
-    }
-
-    _tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(_noDictionaryColumns).setTableName("testTable")
-            
.setTimeColumnName("daysSinceEpoch").setFieldConfigList(fieldConfigs).build();
-
-    // Create the segment generator config.
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, SCHEMA);
-    segmentGeneratorConfig.setInputFilePath(filePath);
-    segmentGeneratorConfig.setTableName("testTable");
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentName(segmentName);
-    _invertedIndexColumns = Arrays.asList("column8", "column9");
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), 
IndexConfig.ENABLED, _invertedIndexColumns);
-    segmentGeneratorConfig.setRawIndexCreationColumns(_noDictionaryColumns);
-    RangeIndexConfig config = RangeIndexConfig.DEFAULT;
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.range(), config, 
_rangeIndexColumns);
-    // The segment generation code in SegmentColumnarIndexCreator will throw
-    // exception if start and end time in time column are not in acceptable
-    // range. For this test, we first need to fix the input avro data
-    // to have the time column values in allowed range. Until then, the check
-    // is explicitly disabled
-    segmentGeneratorConfig.setSkipTimeValueCheck(true);
-
-    // Build the index segment.
+    SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, SCHEMA);
+    generatorConfig.setInputFilePath(avroFile);
+    generatorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    generatorConfig.setSegmentName(SEGMENT_NAME);
+    generatorConfig.setSkipTimeValueCheck(true);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig);
+    driver.init(generatorConfig);
     driver.build();
-  }
 
-  private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
-      throws Exception {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    indexLoadingConfig.setInvertedIndexColumns(new 
HashSet<>(_invertedIndexColumns));
-    indexLoadingConfig.setNoDictionaryColumns(new 
HashSet<>(_noDictionaryColumns));
-    indexLoadingConfig.setReadMode(ReadMode.heap);
-
-    ImmutableSegment immutableSegment =
-        ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
indexLoadingConfig);
-
-    Map<String, ColumnMetadata> columnMetadataMap1 = 
immutableSegment.getSegmentMetadata().getColumnMetadataMap();
-    columnMetadataMap1.forEach((column, metadata) -> {
-      if (_invertedIndexColumns.contains(column)) {
-        assertTrue(metadata.hasDictionary());
-        assertNotNull(immutableSegment.getInvertedIndex(column));
-        assertNotNull(immutableSegment.getForwardIndex(column));
-      } else if (_noDictionaryColumns.contains(column)) {
+    ImmutableSegment segment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), new 
IndexLoadingConfig(tableConfig, SCHEMA));
+    Map<String, ColumnMetadata> columnMetadataMap = 
segment.getSegmentMetadata().getColumnMetadataMap();
+    for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+      String column = entry.getKey();
+      ColumnMetadata metadata = entry.getValue();
+      assertNotNull(segment.getForwardIndex(column));
+      if (noDictionaryColumns.contains(column)) {
         assertFalse(metadata.hasDictionary());
-        assertNotNull(immutableSegment.getForwardIndex(column));
+        assertNull(segment.getDictionary(column));
+      } else {
+        assertTrue(metadata.hasDictionary());
+        assertNotNull(segment.getDictionary(column));
       }
-    });
+      if (invertedIndexColumns.contains(column)) {
+        assertNotNull(segment.getInvertedIndex(column));
+      }
+    }
+
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+  }
 
-    return immutableSegment;
+  private TableConfig createTableConfig(List<String> noDictionaryColumns, 
List<String> invertedIndexColumns,
+      List<String> rangeIndexColumns, List<FieldConfig> fieldConfigs) {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName("daysSinceEpoch")
+        
.setNoDictionaryColumns(noDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
+        
.setRangeIndexColumns(rangeIndexColumns).setFieldConfigList(fieldConfigs).build();
   }
 
   @AfterMethod
-  public void deleteAndDestroySegment() {
+  public void tearDown() {
+    _indexSegment.destroy();
     FileUtils.deleteQuietly(INDEX_DIR);
-    _indexSegments.forEach((IndexSegment::destroy));
   }
 
   @Override
@@ -256,8 +186,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
             + "column1 > 100000000 AND column2 BETWEEN 20000000 AND 1000000000 
AND column3 <> 'w' AND (column6 < "
             + "500000 OR column7 NOT IN (225, 407)) AND daysSinceEpoch = 
1756015683 ORDER BY column1";
     BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     ResultTable resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -281,8 +210,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
 
     // Run the same query again.
     brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -312,8 +240,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
         "SELECT DISTINCT column1, column2, column3, column6, column7, column9, 
column10 FROM testTable ORDER BY "
             + "column1 LIMIT 10";
     BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     ResultTable resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -337,8 +264,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     changePropertiesAndReloadSegment();
 
     brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -379,9 +305,10 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
     assertNotNull(brokerResponseNative.getExceptions());
     assertEquals(brokerResponseNative.getExceptions().size(), 0);
-    assertEquals(resultTable.getDataSchema(), new DataSchema(new 
String[]{"column1", "column7", "column9"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT,
-            DataSchema.ColumnDataType.INT}));
+    assertEquals(resultTable.getDataSchema(),
+        new DataSchema(new String[]{"column1", "column7", "column9"}, new 
DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT
+        }));
     List<Object[]> resultRows1 = resultTable.getRows();
     int previousVal = -1;
     for (Object[] resultRow : resultRows1) {
@@ -403,9 +330,10 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
     assertNotNull(brokerResponseNative.getExceptions());
     assertEquals(brokerResponseNative.getExceptions().size(), 0);
-    assertEquals(resultTable.getDataSchema(), new DataSchema(new 
String[]{"column1", "column7", "column9"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT,
-            DataSchema.ColumnDataType.INT}));
+    assertEquals(resultTable.getDataSchema(),
+        new DataSchema(new String[]{"column1", "column7", "column9"}, new 
DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT
+        }));
     List<Object[]> resultRows2 = resultTable.getRows();
     previousVal = -1;
     for (Object[] resultRow : resultRows2) {
@@ -424,8 +352,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
         "SELECT MAX(column1), MIN(column1), MAX(column2), MIN(column2), 
MAXMV(column6), MINMV(column6), MAXMV"
             + "(column7), MINMV(column7), MAX(column9), MIN(column9), 
MAX(column10), MIN(column10) FROM testTable";
     BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     ResultTable resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -448,13 +375,10 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     }));
     List<Object[]> beforeResultRows = resultTable.getRows();
 
-
     changePropertiesAndReloadSegment();
 
-
     brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -486,8 +410,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     // TEST1 - Before Reload: Test for column7.
     String query1 = "SELECT MAX(ARRAYLENGTH(column7)) from testTable LIMIT 10";
     BrokerResponseNative brokerResponseNative = getBrokerResponse(query1);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     ResultTable resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -505,8 +428,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     // TEST2 - Before Reload: Test for column6.
     String query2 = "SELECT MAX(ARRAYLENGTH(column6)) from testTable LIMIT 10";
     brokerResponseNative = getBrokerResponse(query2);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -525,8 +447,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
 
     // TEST1 - After Reload: Test for column7.
     brokerResponseNative = getBrokerResponse(query1);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -544,8 +465,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
 
     // TEST2 - After Reload: Test for column6.
     brokerResponseNative = getBrokerResponse(query2);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -569,8 +489,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     String query1 = "SET \"timeoutMs\" = 30000; SELECT column1, max(column1), 
sum(column10) from testTable WHERE "
         + "column7 = 2147483647 GROUP BY column1 ORDER BY column1";
     BrokerResponseNative brokerResponseNative = getBrokerResponse(query1);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     ResultTable resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -587,13 +506,11 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
         }));
     List<Object[]> beforeResultRows1 = resultTable.getRows();
 
-
     // TEST2 - Before Reload: Test where column6 is in filter.
     String query2 = "SELECT column1, max(column1), sum(column10) from 
testTable WHERE column6 = 1001 GROUP BY "
         + "column1 ORDER BY column1";
     brokerResponseNative = getBrokerResponse(query2);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -614,8 +531,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
 
     // TEST1 - After reload. Test where column7 is in filter.
     brokerResponseNative = getBrokerResponse(query1);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -636,8 +552,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
 
     // TEST2 - After Reload: Test where column6 is in filter.
     brokerResponseNative = getBrokerResponse(query2);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -662,8 +577,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
       throws Exception {
     String query = "select count(*) from testTable where column10 > 674022574 
and column9 < 674022574";
     BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     ResultTable resultTable1 = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -685,8 +599,7 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     changePropertiesAndReloadSegment();
 
     brokerResponseNative = getBrokerResponse(query);
-    assertTrue(brokerResponseNative.getExceptions() == null
-        || brokerResponseNative.getExceptions().size() == 0);
+    assertTrue(brokerResponseNative.getExceptions() == null || 
brokerResponseNative.getExceptions().size() == 0);
     resultTable1 = brokerResponseNative.getResultTable();
     assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
     assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
@@ -733,68 +646,43 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
    */
   private void changePropertiesAndReloadSegment()
       throws Exception {
-    List<FieldConfig> newFieldConfigs = new ArrayList<>();
-    newFieldConfigs.add(new FieldConfig("column1", 
FieldConfig.EncodingType.RAW, Collections.emptyList(),
-        FieldConfig.CompressionCodec.ZSTANDARD, null));
-    _tableConfig.setFieldConfigList(newFieldConfigs);
-
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
_tableConfig);
-    indexLoadingConfig.setTableConfig(_tableConfig);
-    Set<String> invertedIndexEnabledColumns = new 
HashSet<>(_invertedIndexColumns);
-    invertedIndexEnabledColumns.add("column2");
-    invertedIndexEnabledColumns.add("column7");
-    indexLoadingConfig.setInvertedIndexColumns(invertedIndexEnabledColumns);
-    indexLoadingConfig.removeInvertedIndexColumns("column9");
-    Set<String> noDictionaryColumns = new HashSet<>(_noDictionaryColumns);
-    indexLoadingConfig.setNoDictionaryColumns(noDictionaryColumns);
-    indexLoadingConfig.removeNoDictionaryColumns("column2");
-    indexLoadingConfig.removeNoDictionaryColumns("column3");
-    indexLoadingConfig.removeNoDictionaryColumns("column7");
-    indexLoadingConfig.removeNoDictionaryColumns("column10");
-    indexLoadingConfig.addNoDictionaryColumns("column6");
-    indexLoadingConfig.addNoDictionaryColumns("column9");
-    Set<String> rangeIndexColumns = new HashSet<>(_rangeIndexColumns);
-    indexLoadingConfig.setRangeIndexColumns(rangeIndexColumns);
-    indexLoadingConfig.setReadMode(ReadMode.heap);
+    List<String> noDictionaryColumns = List.of("column1", "column5", 
"column6", "column9");
+    List<String> invertedIndexColumns = List.of("column2", "column7", 
"column8");
+    List<String> rangeIndexColumns = List.of("column9", "column10");
+    List<FieldConfig> fieldConfigs = new 
ArrayList<>(noDictionaryColumns.size());
+    for (String column : noDictionaryColumns) {
+      FieldConfig.CompressionCodec compressionCodec = 
FieldConfig.CompressionCodec.SNAPPY;
+      if (column.equals("column1")) {
+        compressionCodec = FieldConfig.CompressionCodec.ZSTANDARD;
+      }
+      fieldConfigs.add(new FieldConfig(column, FieldConfig.EncodingType.RAW, 
List.of(), compressionCodec, null));
+    }
+    TableConfig tableConfig =
+        createTableConfig(noDictionaryColumns, invertedIndexColumns, 
rangeIndexColumns, fieldConfigs);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
 
     // Reload the segments to pick up the new configs
-    File indexDir = new File(INDEX_DIR, SEGMENT_NAME_1);
-    ImmutableSegment immutableSegment1 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    indexDir = new File(INDEX_DIR, SEGMENT_NAME_2);
-    ImmutableSegment immutableSegment2 = reloadSegment(indexDir, 
indexLoadingConfig, SCHEMA);
-    _indexSegment = immutableSegment1;
-    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
-
-    // immutableSegment1 checks
-    assertNotNull(immutableSegment1.getForwardIndex("column1"));
-    assertNull(immutableSegment1.getDictionary("column1"));
-    assertNotNull(immutableSegment1.getForwardIndex("column2"));
-    assertNotNull(immutableSegment1.getDictionary("column2"));
-    assertNotNull(immutableSegment1.getForwardIndex("column3"));
-    assertNotNull(immutableSegment1.getDictionary("column3"));
-    assertNotNull(immutableSegment1.getForwardIndex("column6"));
-    assertNull(immutableSegment1.getDictionary("column6"));
-    assertNotNull(immutableSegment1.getForwardIndex("column7"));
-    assertNotNull(immutableSegment1.getDictionary("column7"));
-    assertNotNull(immutableSegment1.getForwardIndex("column9"));
-    assertNull(immutableSegment1.getDictionary("column9"));
-    assertNotNull(immutableSegment1.getForwardIndex("column10"));
-    assertNotNull(immutableSegment1.getDictionary("column10"));
-
-    // immutableSegment2 checks
-    assertNotNull(immutableSegment2.getForwardIndex("column1"));
-    assertNull(immutableSegment2.getDictionary("column1"));
-    assertNotNull(immutableSegment2.getForwardIndex("column2"));
-    assertNotNull(immutableSegment2.getDictionary("column2"));
-    assertNotNull(immutableSegment2.getForwardIndex("column3"));
-    assertNotNull(immutableSegment2.getDictionary("column3"));
-    assertNotNull(immutableSegment2.getForwardIndex("column6"));
-    assertNull(immutableSegment2.getDictionary("column6"));
-    assertNotNull(immutableSegment2.getForwardIndex("column7"));
-    assertNotNull(immutableSegment2.getDictionary("column7"));
-    assertNotNull(immutableSegment1.getForwardIndex("column9"));
-    assertNull(immutableSegment1.getDictionary("column9"));
-    assertNotNull(immutableSegment2.getForwardIndex("column10"));
-    assertNotNull(immutableSegment2.getDictionary("column10"));
+    File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
+    ImmutableSegment segment = reloadSegment(indexDir, indexLoadingConfig, 
SCHEMA);
+    _indexSegment.destroy();
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+
+    Map<String, ColumnMetadata> columnMetadataMap = 
segment.getSegmentMetadata().getColumnMetadataMap();
+    for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+      String column = entry.getKey();
+      ColumnMetadata metadata = entry.getValue();
+      assertNotNull(segment.getForwardIndex(column));
+      if (noDictionaryColumns.contains(column)) {
+        assertFalse(metadata.hasDictionary());
+        assertNull(segment.getDictionary(column));
+      } else {
+        assertTrue(metadata.hasDictionary());
+        assertNotNull(segment.getDictionary(column));
+      }
+      if (invertedIndexColumns.contains(column)) {
+        assertNotNull(segment.getInvertedIndex(column));
+      }
+    }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonDataTypeQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonDataTypeQueriesTest.java
index c23e8be405..5a0c91e872 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonDataTypeQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonDataTypeQueriesTest.java
@@ -21,9 +21,7 @@ package org.apache.pinot.queries;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
@@ -42,10 +40,9 @@ import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -62,12 +59,16 @@ public class JsonDataTypeQueriesTest extends 
BaseQueriesTest {
   private static final String INT_COLUMN = "intColumn";
   private static final String JSON_COLUMN = "jsonColumn";
   private static final String STRING_COLUMN = "stringColumn";
-  private static final Schema SCHEMA =
-      new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, 
FieldSpec.DataType.INT)
-          .addSingleValueDimension(JSON_COLUMN, FieldSpec.DataType.JSON)
-          .addSingleValueDimension(STRING_COLUMN, 
FieldSpec.DataType.STRING).build();
+  //@formatter:off
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(JSON_COLUMN, DataType.JSON)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .build();
+  //@formatter:on
   private static final TableConfig TABLE_CONFIG =
-      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setJsonIndexColumns(List.of(JSON_COLUMN))
+          .build();
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -140,27 +141,17 @@ public class JsonDataTypeQueriesTest extends 
BaseQueriesTest {
         "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": 
\"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],"
             + "[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
 
-    List<String> jsonIndexColumns = new ArrayList<>();
-    jsonIndexColumns.add("jsonColumn");
-    TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
     segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
     segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
     driver.build();
 
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(TABLE_CONFIG);
-    indexLoadingConfig.setJsonIndexColumns(new 
HashSet<String>(jsonIndexColumns));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-
-    ImmutableSegment immutableSegment =
-        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), 
indexLoadingConfig);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), indexLoadingConfig);
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
   }
 
   /** Verify result column type of a simple select query against JSON column */
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
index a6f376c6cd..eae1311a69 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
@@ -26,11 +26,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.avro.Schema;
@@ -44,7 +42,6 @@ import 
org.apache.pinot.common.function.scalar.StringFunctions;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -53,10 +50,8 @@ import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -83,16 +78,20 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
   private static final String JSON_COLUMN_4 = "jsonColumn4"; // for testing 
BYTES
   private static final String JSON_COLUMN_5 = "jsonColumn5"; // for testing 
ARRAY of MAPS
   private static final String STRING_COLUMN = "stringColumn";
-  private static final org.apache.pinot.spi.data.Schema SCHEMA =
-      new 
org.apache.pinot.spi.data.Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN,
 FieldSpec.DataType.INT)
-          .addSingleValueDimension(JSON_COLUMN_1, FieldSpec.DataType.JSON)
-          .addSingleValueDimension(JSON_COLUMN_2, FieldSpec.DataType.JSON)
-          .addSingleValueDimension(JSON_COLUMN_3, FieldSpec.DataType.JSON)
-          .addSingleValueDimension(JSON_COLUMN_4, FieldSpec.DataType.JSON)
-          .addSingleValueDimension(JSON_COLUMN_5, FieldSpec.DataType.JSON)
-          .addSingleValueDimension(STRING_COLUMN, 
FieldSpec.DataType.STRING).build();
-  private static final TableConfig TABLE_CONFIG =
-      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+  //@formatter:off
+  private static final org.apache.pinot.spi.data.Schema SCHEMA = new 
org.apache.pinot.spi.data.Schema.SchemaBuilder()
+      .setSchemaName(RAW_TABLE_NAME)
+      .addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(JSON_COLUMN_1, DataType.JSON)
+      .addSingleValueDimension(JSON_COLUMN_2, DataType.JSON)
+      .addSingleValueDimension(JSON_COLUMN_3, DataType.JSON)
+      .addSingleValueDimension(JSON_COLUMN_4, DataType.JSON)
+      .addSingleValueDimension(JSON_COLUMN_5, DataType.JSON)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .build();
+  //@formatter:on
+  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .setJsonIndexColumns(List.of(JSON_COLUMN_1, JSON_COLUMN_2, 
JSON_COLUMN_3)).build();
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -250,21 +249,6 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
     }
   }
 
-  private static RecordReader createRecordReader()
-      throws IOException {
-    Set<String> set = new HashSet<>();
-    set.add(INT_COLUMN);
-    set.add(STRING_COLUMN);
-    set.add(JSON_COLUMN_1);
-    set.add(JSON_COLUMN_2);
-    set.add(JSON_COLUMN_3);
-    set.add(JSON_COLUMN_4);
-    set.add(JSON_COLUMN_5);
-    AvroRecordReader avroRecordReader = new AvroRecordReader();
-    avroRecordReader.init(AVRO_DATA_FILE, set, null);
-    return avroRecordReader;
-  }
-
   /** Create an AVRO file and then ingest it into Pinot while creating a 
JsonIndex. */
   @BeforeClass
   public void setUp()
@@ -272,27 +256,19 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
     FileUtils.deleteDirectory(INDEX_DIR);
     createInputFile();
 
-    List<String> jsonIndexColumns = Arrays.asList(JSON_COLUMN_1, 
JSON_COLUMN_2, JSON_COLUMN_3);
-    TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
-    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
     segmentGeneratorConfig.setInputFilePath(AVRO_DATA_FILE.getPath());
-
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, createRecordReader());
+    driver.init(segmentGeneratorConfig);
     driver.build();
 
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(TABLE_CONFIG);
-    indexLoadingConfig.setJsonIndexColumns(new 
HashSet<String>(jsonIndexColumns));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-
-    ImmutableSegment immutableSegment =
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    ImmutableSegment segment =
         ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), 
indexLoadingConfig);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
   }
 
   /** Verify that we can query the JSON column that ingested ComplexType data 
from an AVRO file (see setUp). */
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonMalformedIndexTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonMalformedIndexTest.java
index 6a9c0a28cd..c6513710a0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonMalformedIndexTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonMalformedIndexTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -35,97 +34,90 @@ import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class JsonMalformedIndexTest extends BaseQueriesTest {
-    private static final String RAW_TABLE_NAME = "testTable";
-    private static final String SEGMENT_NAME = "testSegment";
-    private static final String STRING_COLUMN = "stringColumn";
-    private static final String JSON_COLUMN = "jsonColumn";
-    private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
-            .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
-            .addSingleValueDimension(JSON_COLUMN, 
FieldSpec.DataType.STRING).build();
-    private static final TableConfig TABLE_CONFIG =
-            new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
-    private IndexSegment _indexSegment;
-    private List<IndexSegment> _indexSegments;
-    private final List<GenericRow> _records = new ArrayList<>();
-
-    @BeforeClass
-    public void setUp()
-            throws Exception {
-        _records.add(createRecord("ludwik von drake",
-                "{\"name\": {\"first\": \"ludwik\", \"last\": \"von drake\"}, 
\"id\": 181, "
-                        + "\"data\": [\"l\", \"b\", \"c\", \"d\"]"));
-    }
 
-    protected void checkResult(String query, Object[][] expectedResults) {
-        BrokerResponseNative brokerResponse = 
getBrokerResponseForOptimizedQuery(query, TABLE_CONFIG, SCHEMA);
-        QueriesTestUtils.testInterSegmentsResult(brokerResponse, 
Arrays.asList(expectedResults));
-    }
+public class JsonMalformedIndexTest extends BaseQueriesTest {
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String JSON_COLUMN = "jsonColumn";
+  //@formatter:off
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(JSON_COLUMN, DataType.STRING)
+      .build();
+  //@formatter:on
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setJsonIndexColumns(List.of(JSON_COLUMN))
+          .build();
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+  private final List<GenericRow> _records = new ArrayList<>();
 
-    File indexDir() {
-        return new File(FileUtils.getTempDirectory(), 
getClass().getSimpleName());
-    }
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    _records.add(createRecord("ludwik von drake",
+        "{\"name\": {\"first\": \"ludwik\", \"last\": \"von drake\"}, \"id\": 
181, "
+            + "\"data\": [\"l\", \"b\", \"c\", \"d\"]"));
+  }
 
-    GenericRow createRecord(String stringValue, String jsonValue) {
-        GenericRow record = new GenericRow();
-        record.putValue(STRING_COLUMN, stringValue);
-        record.putValue(JSON_COLUMN, jsonValue);
-        return record;
-    }
+  protected void checkResult(String query, Object[][] expectedResults) {
+    BrokerResponseNative brokerResponse = 
getBrokerResponseForOptimizedQuery(query, TABLE_CONFIG, SCHEMA);
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 
Arrays.asList(expectedResults));
+  }
 
-    @Test(expectedExceptions = ColumnJsonParserException.class,
-          expectedExceptionsMessageRegExp = "Column: jsonColumn.*")
-    public void testJsonIndexBuild()
-            throws Exception {
-        File indexDir = indexDir();
-        FileUtils.deleteDirectory(indexDir);
+  File indexDir() {
+    return new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
+  }
 
-        List<String> jsonIndexColumns = new ArrayList<>();
-        jsonIndexColumns.add("jsonColumn");
-        TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
-        SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-        segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
-        segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-        segmentGeneratorConfig.setOutDir(indexDir.getPath());
+  GenericRow createRecord(String stringValue, String jsonValue) {
+    GenericRow record = new GenericRow();
+    record.putValue(STRING_COLUMN, stringValue);
+    record.putValue(JSON_COLUMN, jsonValue);
+    return record;
+  }
 
-        SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-        driver.init(segmentGeneratorConfig, new 
GenericRowRecordReader(_records));
-        driver.build();
+  @Test(expectedExceptions = ColumnJsonParserException.class, 
expectedExceptionsMessageRegExp = "Column: jsonColumn.*")
+  public void testJsonIndexBuild()
+      throws Exception {
+    File indexDir = indexDir();
+    FileUtils.deleteDirectory(indexDir);
 
-        IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-        indexLoadingConfig.setTableConfig(TABLE_CONFIG);
-        indexLoadingConfig.setJsonIndexColumns(new 
HashSet<>(jsonIndexColumns));
-        indexLoadingConfig.setReadMode(ReadMode.mmap);
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setOutDir(indexDir.getPath());
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(_records));
+    driver.build();
 
-        ImmutableSegment immutableSegment =
-                ImmutableSegmentLoader.load(new File(indexDir, SEGMENT_NAME), 
indexLoadingConfig);
-        _indexSegment = immutableSegment;
-        _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(new File(indexDir, 
SEGMENT_NAME), indexLoadingConfig);
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
 
-        Object[][] expecteds1 = {{"von drake"}, {"von drake"}, {"von drake"}, 
{"von drake"}};
-        checkResult("SELECT jsonextractscalar(jsonColumn, '$.name.last', 
'STRING') FROM testTable", expecteds1);
-    }
+    Object[][] expected = {{"von drake"}, {"von drake"}, {"von drake"}, {"von 
drake"}};
+    checkResult("SELECT jsonextractscalar(jsonColumn, '$.name.last', 'STRING') 
FROM testTable", expected);
+  }
 
-    @Override
-    protected String getFilter() {
-        return "";
-    }
+  @Override
+  protected String getFilter() {
+    return "";
+  }
 
-    @Override
-    protected IndexSegment getIndexSegment() {
-        return _indexSegment;
-    }
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
 
-    @Override
-    protected List<IndexSegment> getIndexSegments() {
-        return _indexSegments;
-    }
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java
index bdbad4b803..9694445dd7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java
@@ -49,10 +49,9 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -76,25 +75,32 @@ public class JsonUnnestIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
   private static final String JSON_COLUMN = "jsonColumn"; // for testing ARRAY 
of MAPS
   private static final String STRING_COLUMN = "stringColumn";
   private static final String EVENTTIME_JSON_COLUMN = "eventTimeColumn";
-  private static final org.apache.pinot.spi.data.Schema SCHEMA =
-      new org.apache.pinot.spi.data.Schema.SchemaBuilder()
-          .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
-          .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
-          .addSingleValueDimension(JSON_COLUMN, FieldSpec.DataType.JSON)
-          .addSingleValueDimension("jsonColumn.timestamp", 
FieldSpec.DataType.TIMESTAMP)
-          .addSingleValueDimension("jsonColumn.data", FieldSpec.DataType.JSON)
-          .addSingleValueDimension("jsonColumn.data.a", 
FieldSpec.DataType.STRING)
-          .addSingleValueDimension("jsonColumn.data.b", 
FieldSpec.DataType.STRING)
-          .addSingleValueDimension(EVENTTIME_JSON_COLUMN, 
FieldSpec.DataType.TIMESTAMP)
-          .addSingleValueDimension("eventTimeColumn_10m", 
FieldSpec.DataType.TIMESTAMP)
-          .build();
-  private static final TableConfig TABLE_CONFIG =
-      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(
-          new IngestionConfig(null, null, null, null,
-              List.of(new TransformConfig("eventTimeColumn", 
"eventTimeColumn.seconds * 1000"),
-                  new TransformConfig("eventTimeColumn_10m", 
"round(eventTimeColumn, 60000)")),
-              new ComplexTypeConfig(List.of(JSON_COLUMN), null, null, null), 
null, null, null)
-      ).build();
+  //@formatter:off
+  private static final org.apache.pinot.spi.data.Schema SCHEMA = new 
org.apache.pinot.spi.data.Schema.SchemaBuilder()
+      .setSchemaName(RAW_TABLE_NAME)
+      .addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(JSON_COLUMN, DataType.JSON)
+      .addSingleValueDimension("jsonColumn.timestamp", DataType.TIMESTAMP)
+      .addSingleValueDimension("jsonColumn.data", DataType.JSON)
+      .addSingleValueDimension("jsonColumn.data.a", DataType.STRING)
+      .addSingleValueDimension("jsonColumn.data.b", DataType.STRING)
+      .addSingleValueDimension(EVENTTIME_JSON_COLUMN, DataType.TIMESTAMP)
+      .addSingleValueDimension("eventTimeColumn_10m", DataType.TIMESTAMP)
+      .build();
+  //@formatter:on
+  private static final TableConfig TABLE_CONFIG;
+
+  static {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(
+        List.of(new TransformConfig("eventTimeColumn", 
"eventTimeColumn.seconds * 1000"),
+            new TransformConfig("eventTimeColumn_10m", "round(eventTimeColumn, 
60000)")));
+    ingestionConfig.setComplexTypeConfig(new 
ComplexTypeConfig(List.of(JSON_COLUMN), null, null, null));
+    TABLE_CONFIG =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
+            .setJsonIndexColumns(List.of(JSON_COLUMN)).build();
+  }
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -294,27 +300,19 @@ public class JsonUnnestIngestionFromAvroQueriesTest 
extends BaseQueriesTest {
     FileUtils.deleteDirectory(INDEX_DIR);
     createInputFile();
 
-    List<String> jsonIndexColumns = Arrays.asList(JSON_COLUMN);
-    TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
-    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
     segmentGeneratorConfig.setInputFilePath(AVRO_DATA_FILE.getPath());
-
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     driver.init(segmentGeneratorConfig, createRecordReader());
     driver.build();
 
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(TABLE_CONFIG);
-    indexLoadingConfig.setJsonIndexColumns(new HashSet<>(jsonIndexColumns));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-
-    ImmutableSegment immutableSegment =
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    ImmutableSegment segment =
         ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), 
indexLoadingConfig);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
   }
 
   @Test
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index b711a899c3..d4ad1e5e4d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -135,10 +135,14 @@ public class IndexLoadingConfig {
     this(instanceDataManagerConfig, tableConfig, null);
   }
 
+  @VisibleForTesting
   public IndexLoadingConfig(TableConfig tableConfig, @Nullable Schema schema) {
     extractFromTableConfigAndSchema(tableConfig, schema);
   }
 
+  /**
+   * NOTE: Can be used in production code when we want to load a segment as is 
without any modifications.
+   */
   public IndexLoadingConfig() {
   }
 
@@ -861,6 +865,7 @@ public class IndexLoadingConfig {
     return _realtimeAvgMultiValueCount;
   }
 
+  @Nullable
   public TableConfig getTableConfig() {
     return _tableConfig;
   }
@@ -870,12 +875,6 @@ public class IndexLoadingConfig {
     return _schema;
   }
 
-  @VisibleForTesting
-  public void setTableConfig(TableConfig tableConfig) {
-    _tableConfig = tableConfig;
-    _dirty = true;
-  }
-
   public String getSegmentDirectoryLoader() {
     return StringUtils.isNotBlank(_segmentDirectoryLoader) ? 
_segmentDirectoryLoader
         : SegmentDirectoryLoaderRegistry.DEFAULT_SEGMENT_DIRECTORY_LOADER_NAME;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index a02efd77d4..441c56f7a1 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -189,8 +189,6 @@ public class SegmentPreProcessorTest {
     _indexLoadingConfig.setInvertedIndexColumns(
         Sets.newHashSet(COLUMN1_NAME, COLUMN7_NAME, COLUMN13_NAME, 
NO_SUCH_COLUMN_NAME));
 
-    _indexLoadingConfig.setTableConfig(_tableConfig);
-
     ClassLoader classLoader = getClass().getClassLoader();
     URL resourceUrl = classLoader.getResource(AVRO_DATA);
     assertNotNull(resourceUrl);
@@ -231,13 +229,12 @@ public class SegmentPreProcessorTest {
   }
 
   private IndexLoadingConfig getDefaultIndexLoadingConfig() {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_tableConfig, null);
 
     // Set RAW columns. Otherwise, they will end up being converted to dict 
columns (default) during segment reload.
     indexLoadingConfig.setNoDictionaryColumns(
         Sets.newHashSet(EXISTING_STRING_COL_RAW, EXISTING_INT_COL_RAW_MV, 
EXISTING_INT_COL_RAW));
 
-    indexLoadingConfig.setTableConfig(_tableConfig);
     return indexLoadingConfig;
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 6f67ad6c67..bc7c21504c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -82,13 +82,13 @@ public class FieldConfig extends BaseJsonConfig {
   private final TimestampConfig _timestampConfig;
 
   @Deprecated
-  public FieldConfig(String name, EncodingType encodingType, IndexType 
indexType, CompressionCodec compressionCodec,
-      Map<String, String> properties) {
+  public FieldConfig(String name, EncodingType encodingType, @Nullable 
IndexType indexType,
+      @Nullable CompressionCodec compressionCodec, @Nullable Map<String, 
String> properties) {
     this(name, encodingType, indexType, null, compressionCodec, null, null, 
properties, null);
   }
 
-  public FieldConfig(String name, EncodingType encodingType, List<IndexType> 
indexTypes,
-      CompressionCodec compressionCodec, Map<String, String> properties) {
+  public FieldConfig(String name, EncodingType encodingType, @Nullable 
List<IndexType> indexTypes,
+      @Nullable CompressionCodec compressionCodec, @Nullable Map<String, 
String> properties) {
     this(name, encodingType, null, indexTypes, compressionCodec, null, null, 
properties, null);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to