This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a1c9b63  Produce GenericRow file in segment processing mapper (#7013)
a1c9b63 is described below

commit a1c9b631381a25ddd6d3164d6a9ce337c3939b9f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jun 4 14:45:44 2021 -0700

    Produce GenericRow file in segment processing mapper (#7013)
    
    Produce GenericRow file instead of Avro file in segment processing mapper:
    - To preserve the null value info from the source file
    - Apply default record transformation to solve the NPE described in #6902
    - GenericRow file can be random accessed, which can be used directly as the 
record reader
    
    Add `GenericRowFileManager` to wrap the file reader/writer and related 
arguments to simplify the handling.
    Also modify the `SegmentMapperTest` to ensure the null value info can be 
preserved
---
 .../processing/collector/ConcatCollector.java      |  66 ++---
 .../processing/framework/SegmentMapper.java        | 179 ++++++++------
 .../processing/framework/SegmentMapperConfig.java  |  18 +-
 .../framework/SegmentProcessorFramework.java       |  75 +++---
 .../processing/framework/SegmentReducer.java       |  49 ++--
 .../genericrow/GenericRowFileManager.java          | 109 +++++++++
 .../genericrow/GenericRowFileReader.java           |   3 +-
 .../processing/utils/SegmentProcessingUtils.java   |  84 +++++++
 .../{collector => utils}/SortOrderComparator.java  |   2 +-
 .../processing/framework/SegmentMapperTest.java    | 265 ++++++++++++---------
 .../processing/framework/SegmentReducerTest.java   |  79 +++---
 11 files changed, 569 insertions(+), 360 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
index d7cc173..263a0ec 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
@@ -22,16 +22,17 @@ import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.Arrays;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
 import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
 import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
+import org.apache.pinot.core.segment.processing.utils.SortOrderComparator;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
@@ -40,15 +41,10 @@ import org.apache.pinot.spi.data.readers.GenericRow;
  * A Collector implementation for collecting and concatenating all incoming 
rows.
  */
 public class ConcatCollector implements Collector {
-  private static final String RECORD_OFFSET_FILE_NAME = "record.offset";
-  private static final String RECORD_DATA_FILE_NAME = "record.data";
-
-  private final List<FieldSpec> _fieldSpecs = new ArrayList<>();
   private final int _numSortColumns;
   private final SortOrderComparator _sortOrderComparator;
   private final File _workingDir;
-  private final File _recordOffsetFile;
-  private final File _recordDataFile;
+  private final GenericRowFileManager _recordFileManager;
 
   private GenericRowFileWriter _recordFileWriter;
   private GenericRowFileReader _recordFileReader;
@@ -56,44 +52,28 @@ public class ConcatCollector implements Collector {
 
   public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
     List<String> sortOrder = collectorConfig.getSortOrder();
+    List<FieldSpec> fieldSpecs;
     if (CollectionUtils.isNotEmpty(sortOrder)) {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
       _numSortColumns = sortOrder.size();
-      DataType[] sortColumnStoredTypes = new DataType[_numSortColumns];
-      for (int i = 0; i < _numSortColumns; i++) {
-        String sortColumn = sortOrder.get(i);
-        FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
-        Preconditions.checkArgument(fieldSpec != null, "Failed to find sort 
column: %s", sortColumn);
-        Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot 
sort on MV column: %s", sortColumn);
-        sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType();
-        _fieldSpecs.add(fieldSpec);
-      }
-      _sortOrderComparator = new SortOrderComparator(_numSortColumns, 
sortColumnStoredTypes);
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn() && 
!sortOrder.contains(fieldSpec.getName())) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
+      _sortOrderComparator = 
SegmentProcessingUtils.getSortOrderComparator(fieldSpecs, _numSortColumns);
     } else {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
       _numSortColumns = 0;
       _sortOrderComparator = null;
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn()) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
     }
 
     _workingDir =
         new File(FileUtils.getTempDirectory(), 
String.format("concat_collector_%d", System.currentTimeMillis()));
     Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s 
for %s with config: %s",
         _workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(), 
collectorConfig);
-    _recordOffsetFile = new File(_workingDir, RECORD_OFFSET_FILE_NAME);
-    _recordDataFile = new File(_workingDir, RECORD_DATA_FILE_NAME);
 
+    // TODO: Pass 'includeNullFields' from the config
+    _recordFileManager = new GenericRowFileManager(_workingDir, fieldSpecs, 
true);
     try {
-      reset();
+      _recordFileWriter = _recordFileManager.getFileWriter();
     } catch (IOException e) {
-      throw new RuntimeException("Caught exception while resetting the 
collector", e);
+      throw new RuntimeException("Caught exception while creating the file 
writer", e);
     }
   }
 
@@ -107,8 +87,8 @@ public class ConcatCollector implements Collector {
   @Override
   public Iterator<GenericRow> iterator()
       throws IOException {
-    _recordFileWriter.close();
-    _recordFileReader = new GenericRowFileReader(_recordOffsetFile, 
_recordDataFile, _fieldSpecs, true);
+    _recordFileManager.closeFileWriter();
+    _recordFileReader = _recordFileManager.getFileReader();
 
     // TODO: A lot of this code can be made common across Collectors, once 
{@link RollupCollector} is also converted to off heap implementation
     if (_numSortColumns != 0) {
@@ -156,15 +136,8 @@ public class ConcatCollector implements Collector {
   @Override
   public void reset()
       throws IOException {
-    if (_recordFileWriter != null) {
-      _recordFileWriter.close();
-    }
-    if (_recordFileReader != null) {
-      _recordFileReader.close();
-    }
-    FileUtils.cleanDirectory(_workingDir);
-    // TODO: Pass 'includeNullFields' from the config
-    _recordFileWriter = new GenericRowFileWriter(_recordOffsetFile, 
_recordDataFile, _fieldSpecs, true);
+    _recordFileManager.cleanUp();
+    _recordFileWriter = _recordFileManager.getFileWriter();
     _numDocs = 0;
   }
 
@@ -172,12 +145,7 @@ public class ConcatCollector implements Collector {
   public void close()
       throws IOException {
     try {
-      if (_recordFileWriter != null) {
-        _recordFileWriter.close();
-      }
-      if (_recordFileReader != null) {
-        _recordFileReader.close();
-      }
+      _recordFileManager.cleanUp();
     } finally {
       FileUtils.deleteQuietly(_workingDir);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
index 7abe6ad..c16479c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
@@ -20,133 +20,156 @@ package 
org.apache.pinot.core.segment.processing.framework;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.segment.processing.filter.RecordFilter;
 import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
 import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
 import 
org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
-import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
+import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
 import org.apache.pinot.segment.spi.partition.Partitioner;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
  * Mapper phase of the SegmentProcessorFramework.
- * Reads the input segment and creates partitioned avro data files
+ * Reads the input records and creates partitioned generic row files.
  * Performs:
  * - record filtering
  * - column transformations
  * - partitioning
  */
 public class SegmentMapper {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
-  private final File _inputSegment;
+
+  private final List<RecordReader> _recordReaders;
   private final File _mapperOutputDir;
 
-  private final String _mapperId;
-  private final Schema _avroSchema;
-  private final RecordTransformer _recordTransformer;
+  private final List<FieldSpec> _fieldSpecs;
+  private final boolean _includeNullFields;
+
+  // TODO: Merge the following transformers into one. Currently we need an 
extra DataTypeTransformer in the end in case
+  //       _recordTransformer changes the data type.
+  private final CompositeTransformer _defaultRecordTransformer;
   private final RecordFilter _recordFilter;
-  private final int _numPartitioners;
-  private final List<Partitioner> _partitioners = new ArrayList<>();
-  private final Map<String, DataFileWriter<GenericData.Record>> 
_partitionToDataFileWriterMap = new HashMap<>();
+  private final RecordTransformer _recordTransformer;
+  private final DataTypeTransformer _dataTypeTransformer;
+
+  private final Partitioner[] _partitioners;
+  private final String[] _partitionsBuffer;
+  private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new HashMap<>();
 
-  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig 
mapperConfig, File mapperOutputDir) {
-    _inputSegment = inputSegment;
+  public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig 
mapperConfig, File mapperOutputDir) {
+    _recordReaders = recordReaders;
     _mapperOutputDir = mapperOutputDir;
 
-    _mapperId = mapperId;
-    _avroSchema = 
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    TableConfig tableConfig = mapperConfig.getTableConfig();
+    Schema schema = mapperConfig.getSchema();
+    List<String> sortOrder = tableConfig.getIndexingConfig().getSortedColumn();
+    if (CollectionUtils.isNotEmpty(sortOrder)) {
+      _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
+    } else {
+      _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
+    }
+    _includeNullFields = 
tableConfig.getIndexingConfig().isNullHandlingEnabled();
+    _defaultRecordTransformer = 
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
     _recordFilter = 
RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
     _recordTransformer = 
RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
-    for (PartitionerConfig partitionerConfig : 
mapperConfig.getPartitionerConfigs()) {
-      _partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig));
+    _dataTypeTransformer = new DataTypeTransformer(schema);
+    List<PartitionerConfig> partitionerConfigs = 
mapperConfig.getPartitionerConfigs();
+    int numPartitioners = partitionerConfigs.size();
+    _partitioners = new Partitioner[numPartitioners];
+    _partitionsBuffer = new String[numPartitioners];
+    for (int i = 0; i < numPartitioners; i++) {
+      _partitioners[i] = 
PartitionerFactory.getPartitioner(partitionerConfigs.get(i));
     }
-    _numPartitioners = _partitioners.size();
     LOGGER.info(
-        "Initialized mapper with id: {}, input segment: {}, output dir: {}, 
recordTransformer: {}, recordFilter: {}, partitioners: {}",
-        _mapperId, _inputSegment, _mapperOutputDir, 
_recordTransformer.getClass(), _recordFilter.getClass(),
-        _partitioners.stream().map(p -> 
p.getClass().toString()).collect(Collectors.joining(",")));
+        "Initialized mapper with {} record readers, output dir: {}, 
recordTransformer: {}, recordFilter: {}, partitioners: {}",
+        _recordReaders.size(), _mapperOutputDir, 
_recordTransformer.getClass(), _recordFilter.getClass(),
+        Arrays.stream(_partitioners).map(p -> 
p.getClass().toString()).collect(Collectors.joining(",")));
   }
 
   /**
-   * Reads the input segment and generates partitioned avro data files into 
the mapper output directory
-   * Records for each partition are put into a directory of its own withing 
the mapper output directory, identified by the partition name
+   * Reads the input records and generates partitioned generic row files into 
the mapper output directory.
+   * Records for each partition are put into a directory of the partition name 
within the mapper output directory.
    */
-  public void map()
+  public Map<String, GenericRowFileManager> map()
       throws Exception {
-
-    PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader(_inputSegment);
-    GenericRow reusableRow = new GenericRow();
-    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
-    String[] partitions = new String[_numPartitioners];
-
-    while (segmentRecordReader.hasNext()) {
-      reusableRow = segmentRecordReader.next(reusableRow);
-
-      // Record filtering
-      if (_recordFilter.filter(reusableRow)) {
-        continue;
-      }
-
-      // Record transformation
-      reusableRow = _recordTransformer.transformRecord(reusableRow);
-
-      // Partitioning
-      int p = 0;
-      for (Partitioner partitioner : _partitioners) {
-        partitions[p++] = partitioner.getPartition(reusableRow);
-      }
-      String partition = StringUtil.join("_", partitions);
-
-      // Create writer for the partition, if not exists
-      DataFileWriter<GenericData.Record> recordWriter = 
_partitionToDataFileWriterMap.get(partition);
-      if (recordWriter == null) {
-        File partDir = new File(_mapperOutputDir, partition);
-        if (!partDir.exists()) {
-          Files.createDirectory(Paths.get(partDir.getAbsolutePath()));
+    GenericRow reuse = new GenericRow();
+    for (RecordReader recordReader : _recordReaders) {
+      while (recordReader.hasNext()) {
+        reuse = recordReader.next(reuse);
+
+        // TODO: Add ComplexTypeTransformer here. Currently it is not 
idempotent so cannot add it
+
+        if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+          //noinspection unchecked
+          for (GenericRow row : (Collection<GenericRow>) 
reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+            GenericRow transformedRow = 
_defaultRecordTransformer.transform(row);
+            if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter
+                .filter(transformedRow)) {
+              writeRecord(transformedRow);
+            }
+          }
+        } else {
+          GenericRow transformedRow = 
_defaultRecordTransformer.transform(reuse);
+          if (transformedRow != null && 
IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter
+              .filter(transformedRow)) {
+            writeRecord(transformedRow);
+          }
         }
-        recordWriter = new DataFileWriter<>(new 
GenericDatumWriter<>(_avroSchema));
-        recordWriter.create(_avroSchema, new File(partDir, 
createMapperOutputFileName(_mapperId)));
-        _partitionToDataFileWriterMap.put(partition, recordWriter);
-      }
 
-      // Write record to avro file for its partition
-      SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(reusableRow, 
reusableRecord);
-      recordWriter.append(reusableRecord);
+        reuse.clear();
+      }
+    }
 
-      reusableRow.clear();
+    for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
+      fileManager.closeFileWriter();
     }
+
+    return _partitionToFileManagerMap;
   }
 
-  /**
-   * Cleanup the mapper state
-   */
-  public void cleanup()
+  private void writeRecord(GenericRow row)
       throws IOException {
-    for (DataFileWriter<GenericData.Record> recordDataFileWriter : 
_partitionToDataFileWriterMap.values()) {
-      recordDataFileWriter.close();
+    // Record transformation
+    row = 
_dataTypeTransformer.transform(_recordTransformer.transformRecord(row));
+
+    // Partitioning
+    int numPartitioners = _partitioners.length;
+    for (int i = 0; i < numPartitioners; i++) {
+      _partitionsBuffer[i] = _partitioners[i].getPartition(row);
+    }
+    String partition = StringUtil.join("_", _partitionsBuffer);
+
+    // Create writer for the partition if not exists
+    GenericRowFileManager fileManager = 
_partitionToFileManagerMap.get(partition);
+    if (fileManager == null) {
+      File partitionOutputDir = new File(_mapperOutputDir, partition);
+      FileUtils.forceMkdir(partitionOutputDir);
+      fileManager = new GenericRowFileManager(partitionOutputDir, _fieldSpecs, 
_includeNullFields);
+      _partitionToFileManagerMap.put(partition, fileManager);
     }
-  }
 
-  public static String createMapperOutputFileName(String mapperId) {
-    return "mapper_" + mapperId + ".avro";
+    fileManager.getFileWriter().write(row);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
index e6360a6..96cf2e5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
 import 
org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -29,25 +30,30 @@ import org.apache.pinot.spi.data.Schema;
  * Config for the mapper phase of SegmentProcessorFramework
  */
 public class SegmentMapperConfig {
-
-  private final Schema _pinotSchema;
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
   private final RecordTransformerConfig _recordTransformerConfig;
   private final RecordFilterConfig _recordFilterConfig;
   private final List<PartitionerConfig> _partitionerConfigs;
 
-  public SegmentMapperConfig(Schema pinotSchema, RecordTransformerConfig 
recordTransformerConfig,
+  public SegmentMapperConfig(TableConfig tableConfig, Schema schema, 
RecordTransformerConfig recordTransformerConfig,
       RecordFilterConfig recordFilterConfig, List<PartitionerConfig> 
partitionerConfigs) {
-    _pinotSchema = pinotSchema;
+    _tableConfig = tableConfig;
+    _schema = schema;
     _recordTransformerConfig = recordTransformerConfig;
     _recordFilterConfig = recordFilterConfig;
     _partitionerConfigs = partitionerConfigs;
   }
 
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
   /**
    * The Pinot schema
    */
-  public Schema getPinotSchema() {
-    return _pinotSchema;
+  public Schema getSchema() {
+    return _schema;
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 903f065..34c280a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -20,14 +20,20 @@ package org.apache.pinot.core.segment.processing.framework;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,15 +49,14 @@ import org.slf4j.LoggerFactory;
  * (eg task which merges segments, tasks which aligns segments per time 
boundaries etc)
  */
 public class SegmentProcessorFramework {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentProcessorFramework.class);
 
   private final File _inputSegmentsDir;
   private final File _outputSegmentsDir;
   private final SegmentProcessorConfig _segmentProcessorConfig;
 
-  private final Schema _pinotSchema;
   private final TableConfig _tableConfig;
+  private final Schema _schema;
 
   private final File _baseDir;
   private final File _mapperInputDir;
@@ -80,8 +85,8 @@ public class SegmentProcessorFramework {
         "Must provide existing empty output directory: %s", 
_outputSegmentsDir.getAbsolutePath());
 
     _segmentProcessorConfig = segmentProcessorConfig;
-    _pinotSchema = segmentProcessorConfig.getSchema();
     _tableConfig = segmentProcessorConfig.getTableConfig();
+    _schema = segmentProcessorConfig.getSchema();
 
     _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_" + 
System.currentTimeMillis());
     FileUtils.deleteQuietly(_baseDir);
@@ -105,58 +110,58 @@ public class SegmentProcessorFramework {
    */
   public void processSegments()
       throws Exception {
-
     // Check for input segments
     File[] segmentFiles = _inputSegmentsDir.listFiles();
-    if (segmentFiles.length == 0) {
-      throw new IllegalStateException("No segments found in input dir: " + 
_inputSegmentsDir.getAbsolutePath()
-          + ". Exiting SegmentProcessorFramework.");
-    }
+    Preconditions
+        .checkState(segmentFiles != null && segmentFiles.length > 0, "Failed 
to find segments under input dir: %s",
+            _inputSegmentsDir.getAbsolutePath());
 
     // Mapper phase.
     LOGGER.info("Beginning mapper phase. Processing segments: {}", 
Arrays.toString(_inputSegmentsDir.list()));
-    for (File segment : segmentFiles) {
-
-      String fileName = segment.getName();
-      File mapperInput = segment;
+    SegmentMapperConfig mapperConfig =
+        new SegmentMapperConfig(_tableConfig, _schema, 
_segmentProcessorConfig.getRecordTransformerConfig(),
+            _segmentProcessorConfig.getRecordFilterConfig(), 
_segmentProcessorConfig.getPartitionerConfigs());
+    List<RecordReader> recordReaders = new ArrayList<>(segmentFiles.length);
+    for (File indexDir : segmentFiles) {
+      String fileName = indexDir.getName();
 
       // Untar the segments if needed
-      if (!segment.isDirectory()) {
+      if (!indexDir.isDirectory()) {
         if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
-          mapperInput = TarGzCompressionUtils.untar(segment, 
_mapperInputDir).get(0);
+          indexDir = TarGzCompressionUtils.untar(indexDir, 
_mapperInputDir).get(0);
         } else {
-          throw new IllegalStateException("Unsupported segment format: " + 
segment.getAbsolutePath());
+          throw new IllegalStateException("Unsupported segment format: " + 
indexDir.getAbsolutePath());
         }
       }
 
-      // Set mapperId as the name of the segment
-      SegmentMapperConfig mapperConfig =
-          new SegmentMapperConfig(_pinotSchema, 
_segmentProcessorConfig.getRecordTransformerConfig(),
-              _segmentProcessorConfig.getRecordFilterConfig(), 
_segmentProcessorConfig.getPartitionerConfigs());
-      SegmentMapper mapper = new SegmentMapper(mapperInput.getName(), 
mapperInput, mapperConfig, _mapperOutputDir);
-      mapper.map();
-      mapper.cleanup();
+      PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+      // NOTE: Do not fill null field with default value to be consistent with 
other record readers
+      recordReader.init(indexDir, null, null, true);
+      recordReaders.add(recordReader);
+    }
+    SegmentMapper mapper = new SegmentMapper(recordReaders, mapperConfig, 
_mapperOutputDir);
+    Map<String, GenericRowFileManager> partitionToFileManagerMap = 
mapper.map();
+    for (RecordReader recordReader : recordReaders) {
+      recordReader.close();
     }
 
     // Check for mapper output files
-    File[] mapperOutputFiles = _mapperOutputDir.listFiles();
-    if (mapperOutputFiles.length == 0) {
-      throw new IllegalStateException("No files found in mapper output 
directory: " + _mapperOutputDir.getAbsolutePath()
-          + ". Exiting SegmentProcessorFramework.");
-    }
+    int numPartitions = partitionToFileManagerMap.size();
+    Preconditions.checkState(numPartitions > 0, "No partition generated from 
mapper phase");
 
     // Reducer phase.
-    LOGGER.info("Beginning reducer phase. Processing files: {}", 
Arrays.toString(_mapperOutputDir.list()));
-    // Mapper output directory has 1 directory per partition, named after the 
partition. Each directory contains 1 or more avro files.
-    for (File partDir : mapperOutputFiles) {
-
+    LOGGER.info("Beginning reducer phase on partitions: {}", 
partitionToFileManagerMap.keySet());
+    for (Map.Entry<String, GenericRowFileManager> entry : 
partitionToFileManagerMap.entrySet()) {
+      String partition = entry.getKey();
+      GenericRowFileManager fileManager = entry.getValue();
       // Set partition as reducerId
       SegmentReducerConfig reducerConfig =
-          new SegmentReducerConfig(_pinotSchema, 
_segmentProcessorConfig.getCollectorConfig(),
+          new SegmentReducerConfig(_schema, 
_segmentProcessorConfig.getCollectorConfig(),
               
_segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment());
-      SegmentReducer reducer = new SegmentReducer(partDir.getName(), partDir, 
reducerConfig, _reducerOutputDir);
+      SegmentReducer reducer = new SegmentReducer(partition, fileManager, 
reducerConfig, _reducerOutputDir);
       reducer.reduce();
       reducer.cleanup();
+      fileManager.cleanUp();
     }
 
     // Check for reducer output files
@@ -172,12 +177,12 @@ public class SegmentProcessorFramework {
     // Reducer output directory will have 1 or more avro files
     int segmentNum = 0;
     for (File resultFile : reducerOutputFiles) {
-      SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, _pinotSchema);
+      SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, _schema);
       segmentGeneratorConfig.setTableName(_tableConfig.getTableName());
       segmentGeneratorConfig.setOutDir(_outputSegmentsDir.getAbsolutePath());
       segmentGeneratorConfig.setInputFilePath(resultFile.getAbsolutePath());
       segmentGeneratorConfig.setFormat(FileFormat.AVRO);
-      segmentGeneratorConfig.setSequenceId(segmentNum ++);
+      segmentGeneratorConfig.setSequenceId(segmentNum++);
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
       driver.init(segmentGeneratorConfig);
       driver.build();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
index 4bcd373..26a2be7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
@@ -26,11 +26,11 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.pinot.core.segment.processing.collector.Collector;
 import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
 import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,27 +48,27 @@ public class SegmentReducer {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentReducer.class);
 
-  private final File _reducerInputDir;
+  private final String _reducerId;
+  private final GenericRowFileManager _fileManager;
   private final File _reducerOutputDir;
 
-  private final String _reducerId;
   private final Schema _pinotSchema;
   private final org.apache.avro.Schema _avroSchema;
   private final Collector _collector;
   private final int _numRecordsPerPart;
 
-  public SegmentReducer(String reducerId, File reducerInputDir, 
SegmentReducerConfig reducerConfig,
+  public SegmentReducer(String reducerId, GenericRowFileManager fileManager, 
SegmentReducerConfig reducerConfig,
       File reducerOutputDir) {
-    _reducerInputDir = reducerInputDir;
+    _reducerId = reducerId;
+    _fileManager = fileManager;
     _reducerOutputDir = reducerOutputDir;
 
-    _reducerId = reducerId;
     _pinotSchema = reducerConfig.getPinotSchema();
     _avroSchema = 
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
     _collector = 
CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
     _numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
-    LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir: 
{}, collector: {}, numRecordsPerPart: {}",
-        _reducerId, _reducerInputDir, _reducerOutputDir, 
_collector.getClass(), _numRecordsPerPart);
+    LOGGER.info("Initialized reducer with id: {}, output dir: {}, collector: 
{}, numRecordsPerPart: {}", _reducerId,
+        _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
   }
 
   /**
@@ -77,27 +77,22 @@ public class SegmentReducer {
    */
   public void reduce()
       throws Exception {
-
     int part = 0;
-    for (File inputFile : _reducerInputDir.listFiles()) {
-
-      RecordReader avroRecordReader = RecordReaderFactory
-          
.getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader",
 inputFile,
-              _pinotSchema.getColumnNames(), null);
-
-      while (avroRecordReader.hasNext()) {
-        GenericRow next = avroRecordReader.next();
-
-        // Aggregations
-        _collector.collect(next);
-
-        // Reached max records per part file. Flush
-        if (_collector.size() == _numRecordsPerPart) {
-          flushRecords(_collector, createReducerOutputFileName(_reducerId, 
part++));
-          _collector.reset();
-        }
+    GenericRowFileReader fileReader = _fileManager.getFileReader();
+    int numRows = fileReader.getNumRows();
+    for (int i = 0; i < numRows; i++) {
+      GenericRow next = fileReader.read(i, new GenericRow());
+
+      // Aggregations
+      _collector.collect(next);
+
+      // Reached max records per part file. Flush
+      if (_collector.size() == _numRecordsPerPart) {
+        flushRecords(_collector, createReducerOutputFileName(_reducerId, 
part++));
+        _collector.reset();
       }
     }
+    _fileManager.closeFileReader();
     if (_collector.size() > 0) {
       flushRecords(_collector, createReducerOutputFileName(_reducerId, part));
       _collector.reset();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
new file mode 100644
index 0000000..9fd9401
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Manager for generic row files.
+ */
+public class GenericRowFileManager {
+  public static final String OFFSET_FILE_NAME = "record.offset";
+  public static final String DATA_FILE_NAME = "record.data";
+
+  private final File _offsetFile;
+  private final File _dataFile;
+  private final List<FieldSpec> _fieldSpecs;
+  private final boolean _includeNullFields;
+
+  private GenericRowFileWriter _fileWriter;
+  private GenericRowFileReader _fileReader;
+
+  public GenericRowFileManager(File outputDir, List<FieldSpec> fieldSpecs, 
boolean includeNullFields) {
+    _offsetFile = new File(outputDir, OFFSET_FILE_NAME);
+    _dataFile = new File(outputDir, DATA_FILE_NAME);
+    _fieldSpecs = fieldSpecs;
+    _includeNullFields = includeNullFields;
+  }
+
+  /**
+   * Returns the file writer. Creates one if not exists.
+   */
+  public GenericRowFileWriter getFileWriter()
+      throws IOException {
+    if (_fileWriter == null) {
+      Preconditions.checkState(!_offsetFile.exists(), "Record offset file: %s 
already exists", _offsetFile);
+      Preconditions.checkState(!_dataFile.exists(), "Record data file: %s 
already exists", _dataFile);
+      _fileWriter = new GenericRowFileWriter(_offsetFile, _dataFile, 
_fieldSpecs, _includeNullFields);
+    }
+    return _fileWriter;
+  }
+
+  /**
+   * Closes the file writer.
+   */
+  public void closeFileWriter()
+      throws IOException {
+    if (_fileWriter != null) {
+      _fileWriter.close();
+      _fileWriter = null;
+    }
+  }
+
+  /**
+   * Returns the file reader. Creates one if not exists.
+   */
+  public GenericRowFileReader getFileReader()
+      throws IOException {
+    if (_fileReader == null) {
+      Preconditions.checkState(_offsetFile.exists(), "Record offset file: %s 
does not exist", _offsetFile);
+      Preconditions.checkState(_dataFile.exists(), "Record data file: %s does 
not exist", _dataFile);
+      _fileReader = new GenericRowFileReader(_offsetFile, _dataFile, 
_fieldSpecs, _includeNullFields);
+    }
+    return _fileReader;
+  }
+
+  /**
+   * Closes the file reader.
+   */
+  public void closeFileReader()
+      throws IOException {
+    if (_fileReader != null) {
+      _fileReader.close();
+      _fileReader = null;
+    }
+  }
+
+  /**
+   * Cleans up the files.
+   */
+  public void cleanUp()
+      throws IOException {
+    closeFileWriter();
+    closeFileReader();
+    FileUtils.deleteQuietly(_offsetFile);
+    FileUtils.deleteQuietly(_dataFile);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
index 0e1ba1b..13102aa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
@@ -60,8 +60,7 @@ public class GenericRowFileReader implements Closeable {
    */
   public GenericRow read(int rowId, GenericRow reuse) {
     long offset = _offsetBuffer.getLong((long) rowId << 3); // rowId * 
Long.BYTES
-    _deserializer.deserialize(offset, reuse);
-    return reuse;
+    return _deserializer.deserialize(offset, reuse);
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
new file mode 100644
index 0000000..698a105
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+
+
+public final class SegmentProcessingUtils {
+  private SegmentProcessingUtils() {
+  }
+
+  /**
+   * Returns the field specs (physical only) with the names sorted in 
alphabetical order.
+   */
+  public static List<FieldSpec> getFieldSpecs(Schema schema) {
+    List<FieldSpec> fieldSpecs = new ArrayList<>();
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        fieldSpecs.add(fieldSpec);
+      }
+    }
+    fieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
+    return fieldSpecs;
+  }
+
+  /**
+   * Returns the field specs (physical only) with sorted column in the front, 
followed by other columns sorted in
+   * alphabetical order.
+   */
+  public static List<FieldSpec> getFieldSpecs(Schema schema, List<String> 
sortOrder) {
+    List<FieldSpec> fieldSpecs = new ArrayList<>();
+    for (String sortColumn : sortOrder) {
+      FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
+      Preconditions.checkArgument(fieldSpec != null, "Failed to find sort 
column: %s", sortColumn);
+      Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort 
on MV column: %s", sortColumn);
+      Preconditions.checkArgument(!fieldSpec.isVirtualColumn(), "Cannot sort 
on virtual column: %s", sortColumn);
+      fieldSpecs.add(fieldSpec);
+    }
+
+    List<FieldSpec> nonSortFieldSpecs = new ArrayList<>();
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn() && 
!sortOrder.contains(fieldSpec.getName())) {
+        nonSortFieldSpecs.add(fieldSpec);
+      }
+    }
+    nonSortFieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
+
+    fieldSpecs.addAll(nonSortFieldSpecs);
+    return fieldSpecs;
+  }
+
+  /**
+   * Returns the value comparator based on the sort order.
+   */
+  public static SortOrderComparator getSortOrderComparator(List<FieldSpec> 
fieldSpecs, int numSortColumns) {
+    DataType[] sortColumnStoredTypes = new DataType[numSortColumns];
+    for (int i = 0; i < numSortColumns; i++) {
+      sortColumnStoredTypes[i] = 
fieldSpecs.get(i).getDataType().getStoredType();
+    }
+    return new SortOrderComparator(numSortColumns, sortColumnStoredTypes);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
index 34b63b6..5652957 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.utils;
 
 import java.util.Comparator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 6e01815..682cd83 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.processing.framework;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,12 +31,14 @@ import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
 import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
 import 
org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -50,6 +54,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -57,39 +62,42 @@ import static org.testng.Assert.assertTrue;
  * Tests for {@link SegmentMapper}
  */
 public class SegmentMapperTest {
-
-  private File _baseDir;
-  private File _inputSegment;
-  private Schema _pinotSchema;
-  private final List<Object[]> _rawData = Lists
-      .newArrayList(new Object[]{"abc", 1000, 1597719600000L}, new 
Object[]{"pqr", 2000, 1597773600000L},
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"SegmentMapperTest");
+
+  private final TableConfig _tableConfig =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("timeValue")
+          .setNullHandlingEnabled(true).build();
+  private final Schema _schema = new 
Schema.SchemaBuilder().setSchemaName("myTable")
+      .addSingleValueDimension("campaign", FieldSpec.DataType.STRING, 
"xyz").addMetric("clicks", FieldSpec.DataType.INT)
+      .addDateTime("timeValue", FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+  private final List<Object[]> _rawData = Arrays
+      .asList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{"pqr", 
2000, 1597773600000L},
           new Object[]{"abc", 1000, 1597777200000L}, new Object[]{"abc", 4000, 
1597795200000L},
           new Object[]{"abc", 3000, 1597802400000L}, new Object[]{"pqr", 1000, 
1597838400000L},
-          new Object[]{"xyz", 4000, 1597856400000L}, new Object[]{"pqr", 1000, 
1597878000000L},
-          new Object[]{"abc", 7000, 1597881600000L}, new Object[]{"xyz", 6000, 
1597892400000L});
+          new Object[]{null, 4000, 1597856400000L}, new Object[]{"pqr", 1000, 
1597878000000L},
+          new Object[]{"abc", 7000, 1597881600000L}, new Object[]{null, 6000, 
1597892400000L});
+
+  private File _indexDir;
 
   @BeforeClass
-  public void before()
+  public void setUp()
       throws Exception {
-    _baseDir = new File(FileUtils.getTempDirectory(), "segment_mapper_test_" + 
System.currentTimeMillis());
-    FileUtils.deleteQuietly(_baseDir);
-    assertTrue(_baseDir.mkdirs());
+    FileUtils.deleteQuietly(TEMP_DIR);
+    assertTrue(TEMP_DIR.mkdirs());
 
     // Segment directory
-    File inputSegmentDir = new File(_baseDir, "input_segment");
+    File inputSegmentDir = new File(TEMP_DIR, "input_segment");
     assertTrue(inputSegmentDir.mkdirs());
 
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("timeValue").build();
-    _pinotSchema = new Schema.SchemaBuilder().setSchemaName("mySchema")
-        .addSingleValueDimension("campaign", 
FieldSpec.DataType.STRING).addMetric("clicks", FieldSpec.DataType.INT)
-        .addDateTime("timeValue", FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-
     // Create test data
     List<GenericRow> inputRows = new ArrayList<>();
     for (Object[] rawRow : _rawData) {
       GenericRow row = new GenericRow();
-      row.putValue("campaign", rawRow[0]);
+      if (rawRow[0] != null) {
+        row.putValue("campaign", rawRow[0]);
+      } else {
+        row.putDefaultNullValue("campaign", "xyz");
+      }
       row.putValue("clicks", rawRow[1]);
       row.putValue("timeValue", rawRow[2]);
       inputRows.add(row);
@@ -97,57 +105,67 @@ public class SegmentMapperTest {
 
     // Create test segment
     RecordReader recordReader = new GenericRowRecordReader(inputRows);
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, _pinotSchema);
-    segmentGeneratorConfig.setTableName(tableConfig.getTableName());
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, _schema);
     segmentGeneratorConfig.setOutDir(inputSegmentDir.getAbsolutePath());
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     driver.init(segmentGeneratorConfig, recordReader);
     driver.build();
 
-    assertEquals(inputSegmentDir.listFiles().length, 1);
-    _inputSegment = inputSegmentDir.listFiles()[0];
+    File[] segmentFiles = inputSegmentDir.listFiles();
+    assertTrue(segmentFiles != null && segmentFiles.length == 1);
+    _indexDir = segmentFiles[0];
   }
 
   @Test(dataProvider = "segmentMapperConfigProvider")
-  public void segmentMapperTest(String mapperId, SegmentMapperConfig 
segmentMapperConfig,
-      Map<String, List<Object[]>> partitionToRecords)
+  public void segmentMapperTest(SegmentMapperConfig segmentMapperConfig, 
Map<String, List<Object[]>> partitionToRecords)
       throws Exception {
-
-    File mapperOutputDir = new File(_baseDir, "mapper_output");
+    File mapperOutputDir = new File(TEMP_DIR, "mapper_output");
     FileUtils.deleteQuietly(mapperOutputDir);
     assertTrue(mapperOutputDir.mkdirs());
-    SegmentMapper segmentMapper = new SegmentMapper(mapperId, _inputSegment, 
segmentMapperConfig, mapperOutputDir);
-    segmentMapper.map();
-    segmentMapper.cleanup();
-
-    File[] partitionDirs = mapperOutputDir.listFiles();
-    // as many directories in output as num partitions created + passed filter
-    assertEquals(partitionDirs.length, partitionToRecords.size());
-    for (File partitionDir : partitionDirs) {
-      String partition = partitionDir.getName();
-      // directory named after every partition
-      assertTrue(partitionToRecords.containsKey(partition));
-      // each partition directory has as many files as mapper
-      File[] avroFiles = partitionDir.listFiles();
-      assertEquals(avroFiles.length, 1);
-      assertEquals(avroFiles[0].getName(), 
SegmentMapper.createMapperOutputFileName(mapperId));
-
-      RecordReader avroRecordReader = new AvroRecordReader();
-      avroRecordReader.init(avroFiles[0], _pinotSchema.getColumnNames(), null);
-      int numRecords = 0;
+
+    PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader();
+    segmentRecordReader.init(_indexDir, null, null, true);
+    SegmentMapper segmentMapper =
+        new SegmentMapper(Collections.singletonList(segmentRecordReader), 
segmentMapperConfig, mapperOutputDir);
+    Map<String, GenericRowFileManager> partitionToFileManagerMap = 
segmentMapper.map();
+    segmentRecordReader.close();
+
+    assertEquals(partitionToFileManagerMap.size(), partitionToRecords.size());
+    for (Map.Entry<String, GenericRowFileManager> entry : 
partitionToFileManagerMap.entrySet()) {
+      // Directory named after every partition
+      String partition = entry.getKey();
+      File partitionDir = new File(mapperOutputDir, partition);
+      assertTrue(partitionDir.isDirectory());
+
+      // Each partition directory should contain 2 files (offset & data)
+      String[] fileNames = partitionDir.list();
+      assertNotNull(fileNames);
+      assertEquals(fileNames.length, 2);
+      Arrays.sort(fileNames);
+      assertEquals(fileNames[0], GenericRowFileManager.DATA_FILE_NAME);
+      assertEquals(fileNames[1], GenericRowFileManager.OFFSET_FILE_NAME);
+
+      GenericRowFileManager fileManager = entry.getValue();
+      GenericRowFileReader fileReader = fileManager.getFileReader();
+      int numRows = fileReader.getNumRows();
       List<Object[]> expectedRecords = partitionToRecords.get(partition);
-      GenericRow next = new GenericRow();
-      while (avroRecordReader.hasNext()) {
-        avroRecordReader.next(next);
-        assertEquals(next.getValue("campaign"), 
expectedRecords.get(numRecords)[0]);
-        assertEquals(next.getValue("clicks"), 
expectedRecords.get(numRecords)[1]);
-        assertEquals(next.getValue("timeValue"), 
expectedRecords.get(numRecords)[2]);
-        numRecords++;
+      assertEquals(numRows, expectedRecords.size());
+      GenericRow reuse = new GenericRow();
+      for (int i = 0; i < numRows; i++) {
+        reuse = fileReader.read(i, reuse);
+        Object[] expectedValues = expectedRecords.get(i);
+        assertEquals(reuse.getValue("campaign"), expectedValues[0]);
+        assertEquals(reuse.getValue("clicks"), expectedValues[1]);
+        assertEquals(reuse.getValue("timeValue"), expectedValues[2]);
+        // Default null value
+        if (expectedValues[0].equals("xyz")) {
+          assertEquals(reuse.getNullValueFields(), 
Collections.singleton("campaign"));
+        } else {
+          assertEquals(reuse.getNullValueFields(), Collections.emptySet());
+        }
       }
-      assertEquals(numRecords, expectedRecords.size());
+      fileManager.cleanUp();
     }
-
-    FileUtils.deleteQuietly(mapperOutputDir);
   }
 
   /**
@@ -155,139 +173,152 @@ public class SegmentMapperTest {
    */
   @DataProvider(name = "segmentMapperConfigProvider")
   public Object[][] segmentMapperConfigProvider() {
-    String mapperId = "aMapperId";
-    List<Object[]> outputData = new ArrayList<>();
-    _rawData.forEach(r -> outputData.add(new Object[]{r[0], r[1], r[2]}));
+    List<Object[]> outputData = Arrays
+        .asList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{"pqr", 
2000, 1597773600000L},
+            new Object[]{"abc", 1000, 1597777200000L}, new Object[]{"abc", 
4000, 1597795200000L},
+            new Object[]{"abc", 3000, 1597802400000L}, new Object[]{"pqr", 
1000, 1597838400000L},
+            new Object[]{"xyz", 4000, 1597856400000L}, new Object[]{"pqr", 
1000, 1597878000000L},
+            new Object[]{"abc", 7000, 1597881600000L}, new Object[]{"xyz", 
6000, 1597892400000L});
 
     List<Object[]> inputs = new ArrayList<>();
 
     // default configs
-    SegmentMapperConfig config1 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new RecordFilterConfig.Builder().build(), Lists.newArrayList(new 
PartitionerConfig.Builder().build()));
+    SegmentMapperConfig config1 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new RecordFilterConfig.Builder().build(), Lists.newArrayList(new 
PartitionerConfig.Builder().build()));
     Map<String, List<Object[]>> expectedRecords1 = new HashMap<>();
     expectedRecords1.put("0", outputData);
-    inputs.add(new Object[]{mapperId, config1, expectedRecords1});
+    inputs.add(new Object[]{config1, expectedRecords1});
 
     // round robin partitioner
-    SegmentMapperConfig config12 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new RecordFilterConfig.Builder().build(), Lists.newArrayList(
-        new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
-            .setNumPartitions(3).build()));
+    SegmentMapperConfig config12 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+            new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
+                .setNumPartitions(3).build()));
     Map<String, List<Object[]>> expectedRecords12 = new HashMap<>();
     IntStream.range(0, 3).forEach(i -> 
expectedRecords12.put(String.valueOf(i), new ArrayList<>()));
     for (int i = 0; i < outputData.size(); i++) {
       expectedRecords12.get(String.valueOf(i % 3)).add(outputData.get(i));
     }
-    inputs.add(new Object[]{mapperId, config12, expectedRecords12});
+    inputs.add(new Object[]{config12, expectedRecords12});
 
     // partition by timeValue
-    SegmentMapperConfig config2 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new RecordFilterConfig.Builder().build(), Lists.newArrayList(
-        new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
-            .setColumnName("timeValue").build()));
+    SegmentMapperConfig config2 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+            new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+                .setColumnName("timeValue").build()));
     Map<String, List<Object[]>> expectedRecords2 =
         outputData.stream().collect(Collectors.groupingBy(r -> 
String.valueOf(r[2]), Collectors.toList()));
-    inputs.add(new Object[]{mapperId, config2, expectedRecords2});
+    inputs.add(new Object[]{config2, expectedRecords2});
 
     // partition by campaign
-    SegmentMapperConfig config3 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new RecordFilterConfig.Builder().build(), Lists.newArrayList(
-        new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
-            .setColumnName("campaign").build()));
+    SegmentMapperConfig config3 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+            new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+                .setColumnName("campaign").build()));
     Map<String, List<Object[]>> expectedRecords3 =
         outputData.stream().collect(Collectors.groupingBy(r -> 
String.valueOf(r[0]), Collectors.toList()));
-    inputs.add(new Object[]{mapperId, config3, expectedRecords3});
+    inputs.add(new Object[]{config3, expectedRecords3});
 
     // transform function partition
-    SegmentMapperConfig config4 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new RecordFilterConfig.Builder().build(), Lists.newArrayList(
-        new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
-            .setTransformFunction("toEpochDays(timeValue)").build()));
+    SegmentMapperConfig config4 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+            new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
+                .setTransformFunction("toEpochDays(timeValue)").build()));
     Map<String, List<Object[]>> expectedRecords4 = outputData.stream()
         .collect(Collectors.groupingBy(r -> String.valueOf(((long) r[2]) / 
86400000), Collectors.toList()));
-    inputs.add(new Object[]{mapperId, config4, expectedRecords4});
+    inputs.add(new Object[]{config4, expectedRecords4});
 
     // partition by column and then table column partition config
-    SegmentMapperConfig config41 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new RecordFilterConfig.Builder().build(), Lists.newArrayList(
-        new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
-            .setColumnName("campaign").build(),
-        new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
-            .setColumnName("clicks").setColumnPartitionConfig(new 
ColumnPartitionConfig("Modulo", 3)).build()));
+    SegmentMapperConfig config41 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new RecordFilterConfig.Builder().build(), Lists.newArrayList(
+            new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+                .setColumnName("campaign").build(), new 
PartitionerConfig.Builder()
+                
.setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG).setColumnName("clicks")
+                .setColumnPartitionConfig(new ColumnPartitionConfig("Modulo", 
3)).build()));
     Map<String, List<Object[]>> expectedRecords41 = new HashMap<>();
     for (Object[] record : outputData) {
       String partition = record[0] + "_" + (int) record[1] % 3;
       List<Object[]> objects = expectedRecords41.computeIfAbsent(partition, k 
-> new ArrayList<>());
       objects.add(record);
     }
-    inputs.add(new Object[]{mapperId, config41, expectedRecords41});
+    inputs.add(new Object[]{config41, expectedRecords41});
 
     // filter function which filters out nothing
-    SegmentMapperConfig config5 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({campaign == \"foo\"}, 
campaign)").build(),
-        Lists.newArrayList(new PartitionerConfig.Builder().build()));
+    SegmentMapperConfig config5 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+                .setFilterFunction("Groovy({campaign == \"foo\"}, 
campaign)").build(),
+            Lists.newArrayList(new PartitionerConfig.Builder().build()));
     Map<String, List<Object[]>> expectedRecords5 = new HashMap<>();
     expectedRecords5.put("0", outputData);
-    inputs.add(new Object[]{mapperId, config5, expectedRecords5});
+    inputs.add(new Object[]{config5, expectedRecords5});
 
     // filter function which filters out everything
-    SegmentMapperConfig config6 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue > 0}, timeValue)").build(),
-        Lists.newArrayList(new PartitionerConfig.Builder().build()));
+    SegmentMapperConfig config6 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+                .setFilterFunction("Groovy({timeValue > 0}, 
timeValue)").build(),
+            Lists.newArrayList(new PartitionerConfig.Builder().build()));
     Map<String, List<Object[]>> expectedRecords6 = new HashMap<>();
-    inputs.add(new Object[]{mapperId, config6, expectedRecords6});
+    inputs.add(new Object[]{config6, expectedRecords6});
 
     // filter function which filters out certain times
-    SegmentMapperConfig config7 = new SegmentMapperConfig(_pinotSchema, new 
RecordTransformerConfig.Builder().build(),
-        new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue < 1597795200000L || 
timeValue >= 1597881600000L}, timeValue)")
-            .build(), Lists.newArrayList(new 
PartitionerConfig.Builder().build()));
+    SegmentMapperConfig config7 =
+        new SegmentMapperConfig(_tableConfig, _schema, new 
RecordTransformerConfig.Builder().build(),
+            new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+                .setFilterFunction("Groovy({timeValue < 1597795200000L || 
timeValue >= 1597881600000L}, timeValue)")
+                .build(), Lists.newArrayList(new 
PartitionerConfig.Builder().build()));
     Map<String, List<Object[]>> expectedRecords7 =
         outputData.stream().filter(r -> ((long) r[2]) >= 1597795200000L && 
((long) r[2]) < 1597881600000L)
             .collect(Collectors.groupingBy(r -> "0", Collectors.toList()));
-    inputs.add(new Object[]{mapperId, config7, expectedRecords7});
+    inputs.add(new Object[]{config7, expectedRecords7});
 
     // record transformation - round timeValue to nearest day
     Map<String, String> transformFunctionMap = new HashMap<>();
     transformFunctionMap.put("timeValue", "round(timeValue, 86400000)");
-    SegmentMapperConfig config9 = new SegmentMapperConfig(_pinotSchema,
+    SegmentMapperConfig config9 = new SegmentMapperConfig(_tableConfig, 
_schema,
         new 
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new RecordFilterConfig.Builder().build(), Lists.newArrayList(new 
PartitionerConfig.Builder().build()));
     List<Object[]> transformedData = new ArrayList<>();
     outputData.forEach(r -> transformedData.add(new Object[]{r[0], r[1], 
(((long) r[2]) / 86400000) * 86400000}));
     Map<String, List<Object[]>> expectedRecords9 = new HashMap<>();
     expectedRecords9.put("0", transformedData);
-    inputs.add(new Object[]{mapperId, config9, expectedRecords9});
+    inputs.add(new Object[]{config9, expectedRecords9});
 
     // record transformation - round timeValue to nearest day, partition on 
timeValue
-    SegmentMapperConfig config10 = new SegmentMapperConfig(_pinotSchema,
+    SegmentMapperConfig config10 = new SegmentMapperConfig(_tableConfig, 
_schema,
         new 
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new RecordFilterConfig.Builder().build(), Lists.newArrayList(
         new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
             .setColumnName("timeValue").build()));
     Map<String, List<Object[]>> expectedRecords10 =
         transformedData.stream().collect(Collectors.groupingBy(r -> 
String.valueOf(r[2]), Collectors.toList()));
-    inputs.add(new Object[]{mapperId, config10, expectedRecords10});
+    inputs.add(new Object[]{config10, expectedRecords10});
 
     // record transformation - round timeValue to nearest day, partition on 
timeValue, filter out timeValues
-    SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
+    SegmentMapperConfig config11 = new SegmentMapperConfig(_tableConfig, 
_schema,
         new 
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new 
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue 
>= 1597881600000}, timeValue)").build(), Lists.newArrayList(
-        new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
-            .setColumnName("timeValue").build()));
+            .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue 
>= 1597881600000}, timeValue)").build(),
+        Lists.newArrayList(
+            new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+                .setColumnName("timeValue").build()));
     Map<String, List<Object[]>> expectedRecords11 =
         transformedData.stream().filter(r -> ((long) r[2]) == 1597795200000L)
             .collect(Collectors.groupingBy(r -> "1597795200000", 
Collectors.toList()));
-    inputs.add(new Object[]{mapperId, config11, expectedRecords11});
+    inputs.add(new Object[]{config11, expectedRecords11});
 
     return inputs.toArray(new Object[0][]);
   }
 
   @AfterClass
-  public void after() {
-    FileUtils.deleteQuietly(_baseDir);
+  public void tearDown() {
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
index 26ae032..f393f5e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
@@ -23,20 +23,20 @@ import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
 import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
 import 
org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
-import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -55,60 +55,47 @@ import static org.testng.Assert.assertTrue;
  * Tests for {@link SegmentReducer}
  */
 public class SegmentReducerTest {
-
-  private File _baseDir;
-  private File _partDir;
-  private Schema _pinotSchema;
-  private final List<Object[]> _rawData1597795200000L = Lists
-      .newArrayList(new Object[]{"abc", 4000, 1597795200000L}, new 
Object[]{"abc", 3000, 1597795200000L},
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"SegmentReducerTest");
+
+  private final File _mapperOutputDir = new File(TEMP_DIR, 
"mapper_output/1597795200000");
+  private final Schema _pinotSchema = new 
Schema.SchemaBuilder().setSchemaName("mySchema")
+      .addSingleValueDimension("campaign", 
FieldSpec.DataType.STRING).addMetric("clicks", FieldSpec.DataType.INT)
+      .addDateTime("timeValue", FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+  private final List<Object[]> _rawData1597795200000L = Arrays
+      .asList(new Object[]{"abc", 4000, 1597795200000L}, new Object[]{"abc", 
3000, 1597795200000L},
           new Object[]{"pqr", 1000, 1597795200000L}, new Object[]{"xyz", 4000, 
1597795200000L},
           new Object[]{"pqr", 1000, 1597795200000L});
 
+  private GenericRowFileManager _fileManager;
+
   @BeforeClass
-  public void before()
+  public void setUp()
       throws IOException {
-    _baseDir = new File(FileUtils.getTempDirectory(), "segment_reducer_test_" 
+ System.currentTimeMillis());
-    FileUtils.deleteQuietly(_baseDir);
-    assertTrue(_baseDir.mkdirs());
-
-    // mapper output directory/partition directory
-    _partDir = new File(_baseDir, "mapper_output/1597795200000");
-    assertTrue(_partDir.mkdirs());
-
-    _pinotSchema = new Schema.SchemaBuilder().setSchemaName("mySchema")
-        .addSingleValueDimension("campaign", 
FieldSpec.DataType.STRING).addMetric("clicks", FieldSpec.DataType.INT)
-        .addDateTime("timeValue", FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-    org.apache.avro.Schema avroSchema = 
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
-
-    // create 2 avro files
-    DataFileWriter<GenericData.Record> recordWriter1 = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema));
-    recordWriter1.create(avroSchema, new File(_partDir, "map1.avro"));
-    DataFileWriter<GenericData.Record> recordWriter2 = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema));
-    recordWriter2.create(avroSchema, new File(_partDir, "map2.avro"));
+    FileUtils.deleteQuietly(TEMP_DIR);
+    assertTrue(_mapperOutputDir.mkdirs());
+
+    List<FieldSpec> fieldSpecs = 
SegmentProcessingUtils.getFieldSpecs(_pinotSchema);
+    _fileManager = new GenericRowFileManager(_mapperOutputDir, fieldSpecs, 
false);
+    GenericRowFileWriter fileWriter = _fileManager.getFileWriter();
+    GenericRow reuse = new GenericRow();
     for (int i = 0; i < 5; i++) {
-      GenericData.Record record = new GenericData.Record(avroSchema);
-      record.put("campaign", _rawData1597795200000L.get(i)[0]);
-      record.put("clicks", _rawData1597795200000L.get(i)[1]);
-      record.put("timeValue", _rawData1597795200000L.get(i)[2]);
-      if (i < 2) {
-        recordWriter1.append(record);
-      } else {
-        recordWriter2.append(record);
-      }
+      reuse.putValue("campaign", _rawData1597795200000L.get(i)[0]);
+      reuse.putValue("clicks", _rawData1597795200000L.get(i)[1]);
+      reuse.putValue("timeValue", _rawData1597795200000L.get(i)[2]);
+      fileWriter.write(reuse);
+      reuse.clear();
     }
-    recordWriter1.close();
-    recordWriter2.close();
+    _fileManager.closeFileWriter();
   }
 
   @Test(dataProvider = "segmentReducerDataProvider")
   public void segmentReducerTest(String reducerId, SegmentReducerConfig 
reducerConfig, Set<String> expectedFileNames,
       List<Object[]> expectedRecords, Comparator comparator)
       throws Exception {
-
-    File reducerOutputDir = new File(_baseDir, "reducer_output");
+    File reducerOutputDir = new File(TEMP_DIR, "reducer_output");
     FileUtils.deleteQuietly(reducerOutputDir);
     assertTrue(reducerOutputDir.mkdirs());
-    SegmentReducer segmentReducer = new SegmentReducer(reducerId, _partDir, 
reducerConfig, reducerOutputDir);
+    SegmentReducer segmentReducer = new SegmentReducer(reducerId, 
_fileManager, reducerConfig, reducerOutputDir);
     segmentReducer.reduce();
     segmentReducer.cleanup();
 
@@ -210,7 +197,9 @@ public class SegmentReducerTest {
   }
 
   @AfterClass
-  public void after() {
-    FileUtils.deleteQuietly(_baseDir);
+  public void tearDown()
+      throws IOException {
+    _fileManager.cleanUp();
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to