This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch support-spark-preprocessing
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 3bdd50cee7c76e5d03146bca7ae668db0365a4cf
Author: Jack Li(Analytics Engineering) <[email protected]>
AuthorDate: Thu Jul 22 16:05:03 2021 -0700

    Support data preprocessing in Spark framework
---
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  | 166 +-------------------
 .../hadoop/job/mappers/SegmentCreationMapper.java  |   2 +-
 .../HadoopAvroDataPreprocessingHelper.java         |  67 ++++++++
 ...per.java => HadoopDataPreprocessingHelper.java} | 155 ++++++-------------
 ...a => HadoopDataPreprocessingHelperFactory.java} |  14 +-
 .../preprocess/HadoopJobPreparer.java}             |  21 +--
 .../HadoopOrcDataPreprocessingHelper.java          |  66 ++++++++
 .../v0_deprecated/pinot-ingestion-common/pom.xml   |  15 ++
 .../ingestion/jobs/SegmentPreprocessingJob.java    | 172 ++++++++++++++++++++-
 .../preprocess/AvroDataPreprocessingHelper.java    |  45 ++----
 .../preprocess/DataPreprocessingHelper.java        | 118 ++++++++++++++
 .../preprocess/DataPreprocessingHelperFactory.java |   4 +-
 .../preprocess/OrcDataPreprocessingHelper.java     |  46 ++----
 .../preprocess/SampleTimeColumnExtractable.java}   |  20 +--
 .../mappers/AvroDataPreprocessingMapper.java       |   6 +-
 .../mappers/OrcDataPreprocessingMapper.java        |   8 +-
 .../mappers/SegmentPreprocessingMapper.java        |   4 +-
 .../AvroDataPreprocessingPartitioner.java          |   4 +-
 .../partitioners/GenericPartitioner.java           |   8 +-
 .../OrcDataPreprocessingPartitioner.java           |   6 +-
 .../partitioners/PartitionFunctionFactory.java     |   2 +-
 .../reducers/AvroDataPreprocessingReducer.java     |   4 +-
 .../reducers/OrcDataPreprocessingReducer.java      |   4 +-
 .../ingestion/utils}/DataPreprocessingUtils.java   |   2 +-
 .../ingestion/utils}/InternalConfigConstants.java  |   2 +-
 .../ingestion}/utils/preprocess/DataFileUtils.java |   2 +-
 .../ingestion}/utils/preprocess/HadoopUtils.java   |   2 +-
 .../ingestion}/utils/preprocess/OrcUtils.java      |   2 +-
 .../utils/preprocess/TextComparator.java           |   2 +-
 .../v0_deprecated/pinot-spark/pom.xml              |  28 +++-
 .../spark/jobs/SparkSegmentPreprocessingJob.java   | 101 ++++++++++++
 .../SparkAvroDataPreprocessingHelper.java}         |  24 +--
 .../SparkDataPreprocessingComparator.java          |  48 ++++++
 .../preprocess/SparkDataPreprocessingHelper.java   | 153 ++++++++++++++++++
 .../SparkDataPreprocessingHelperFactory.java}      |  16 +-
 .../preprocess/SparkDataPreprocessingJobKey.java}  |  28 ++--
 .../SparkDataPreprocessingPartitioner.java         |  56 +++++++
 .../SparkOrcDataPreprocessingHelper.java}          |  24 +--
 .../org/apache/pinot/spark/utils}/HadoopUtils.java |   2 +-
 39 files changed, 976 insertions(+), 473 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 81a9902..e17acd2 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -18,33 +18,19 @@
  */
 package org.apache.pinot.hadoop.job;
 
-import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory;
+import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelper;
+import 
org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelperFactory;
 import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableCustomConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,21 +45,6 @@ import org.slf4j.LoggerFactory;
 public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
 
-  private String _partitionColumn;
-  private int _numPartitions;
-  private String _partitionFunction;
-
-  private String _sortingColumn;
-  private FieldSpec.DataType _sortingColumnType;
-
-  private int _numOutputFiles;
-  private int _maxNumRecordsPerFile;
-
-  private TableConfig _tableConfig;
-  private org.apache.pinot.spi.data.Schema _pinotTableSchema;
-
-  private Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
-
   public HadoopSegmentPreprocessingJob(final Properties properties) {
     super(properties);
   }
@@ -96,8 +67,8 @@ public class HadoopSegmentPreprocessingJob extends 
SegmentPreprocessingJob {
     // Cleans up preprocessed output dir if exists
     cleanUpPreprocessedOutputs(_preprocessedOutputDir);
 
-    DataPreprocessingHelper dataPreprocessingHelper =
-        
DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir,
 _preprocessedOutputDir);
+    HadoopDataPreprocessingHelper dataPreprocessingHelper =
+        
HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir,
 _preprocessedOutputDir);
     dataPreprocessingHelper
         .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, 
_numPartitions, _partitionFunction,
             _sortingColumn, _sortingColumnType, _numOutputFiles, 
_maxNumRecordsPerFile);
@@ -126,124 +97,6 @@ public class HadoopSegmentPreprocessingJob extends 
SegmentPreprocessingJob {
     LOGGER.info("Finished pre-processing job in {}ms", 
(System.currentTimeMillis() - startTime));
   }
 
-  private void fetchPreProcessingOperations() {
-    _preprocessingOperations = new HashSet<>();
-    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
-    if (customConfig != null) {
-      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
-      if (customConfigMap != null && !customConfigMap.isEmpty()) {
-        String preprocessingOperationsString =
-            
customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
-        DataPreprocessingUtils.getOperations(_preprocessingOperations, 
preprocessingOperationsString);
-      }
-    }
-  }
-
-  private void fetchPartitioningConfig() {
-    // Fetch partition info from table config.
-    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION))
 {
-      LOGGER.info("Partitioning is disabled.");
-      return;
-    }
-    SegmentPartitionConfig segmentPartitionConfig = 
_tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-    if (segmentPartitionConfig != null) {
-      Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-      Preconditions
-          .checkArgument(columnPartitionMap.size() <= 1, "There should be at 
most 1 partition setting in the table.");
-      if (columnPartitionMap.size() == 1) {
-        _partitionColumn = columnPartitionMap.keySet().iterator().next();
-        _numPartitions = 
segmentPartitionConfig.getNumPartitions(_partitionColumn);
-        _partitionFunction = 
segmentPartitionConfig.getFunctionName(_partitionColumn);
-      }
-    } else {
-      LOGGER.info("Segment partition config is null for table: {}", 
_tableConfig.getTableName());
-    }
-  }
-
-  private void fetchSortingConfig() {
-    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
-      LOGGER.info("Sorting is disabled.");
-      return;
-    }
-    // Fetch sorting info from table config first.
-    List<String> sortingColumns = new ArrayList<>();
-    List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
-    if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
-      for (FieldConfig fieldConfig : fieldConfigs) {
-        if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
-          sortingColumns.add(fieldConfig.getName());
-        }
-      }
-    }
-    if (!sortingColumns.isEmpty()) {
-      Preconditions.checkArgument(sortingColumns.size() == 1, "There should be 
at most 1 sorted column in the table.");
-      _sortingColumn = sortingColumns.get(0);
-      return;
-    }
-
-    // There is no sorted column specified in field configs, try to find 
sorted column from indexing config.
-    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
-    List<String> sortedColumns = indexingConfig.getSortedColumn();
-    if (sortedColumns != null) {
-      Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be 
at most 1 sorted column in the table.");
-      if (sortedColumns.size() == 1) {
-        _sortingColumn = sortedColumns.get(0);
-        FieldSpec fieldSpec = 
_pinotTableSchema.getFieldSpecFor(_sortingColumn);
-        Preconditions.checkState(fieldSpec != null, "Failed to find sorting 
column: {} in the schema", _sortingColumn);
-        Preconditions
-            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on 
multi-value column: %s", _sortingColumn);
-        _sortingColumnType = fieldSpec.getDataType();
-        Preconditions
-            .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort 
on %s column: %s", _sortingColumnType,
-                _sortingColumn);
-        LOGGER.info("Sorting the data with column: {} of type: {}", 
_sortingColumn, _sortingColumnType);
-      }
-    }
-  }
-
-  private void fetchResizingConfig() {
-    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
-      LOGGER.info("Resizing is disabled.");
-      return;
-    }
-    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
-    if (tableCustomConfig == null) {
-      _numOutputFiles = 0;
-      return;
-    }
-    Map<String, String> customConfigsMap = 
tableCustomConfig.getCustomConfigs();
-    if (customConfigsMap != null && 
customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS))
 {
-      _numOutputFiles = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
-      Preconditions.checkState(_numOutputFiles > 0, String
-          .format("The value of %s should be positive! Current value: %s",
-              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, 
_numOutputFiles));
-    } else {
-      _numOutputFiles = 0;
-    }
-
-    if (customConfigsMap != null) {
-      int maxNumRecords;
-      if 
(customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE))
 {
-        LOGGER.warn("The config: {} from custom config is deprecated. Use {} 
instead.",
-            InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
-            InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
-        maxNumRecords = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
-      } else if 
(customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE))
 {
-        maxNumRecords =
-            
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
-      } else {
-        return;
-      }
-      // TODO: add a in-built maximum value for this config to avoid having 
too many small files.
-      // E.g. if the config is set to 1 which is smaller than this in-built 
value, the job should be abort from generating too many small files.
-      Preconditions.checkArgument(maxNumRecords > 0,
-          "The value of " + 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
-              + " should be positive. Current value: " + maxNumRecords);
-      LOGGER.info("Setting {} to {}", 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
-      _maxNumRecordsPerFile = maxNumRecords;
-    }
-  }
-
   @Override
   protected Schema getSchema()
       throws IOException {
@@ -265,15 +118,6 @@ public class HadoopSegmentPreprocessingJob extends 
SegmentPreprocessingJob {
   protected void addAdditionalJobProperties(Job job) {
   }
 
-  private void setTableConfigAndSchema()
-      throws IOException {
-    _tableConfig = getTableConfig();
-    _pinotTableSchema = getSchema();
-
-    Preconditions.checkState(_tableConfig != null, "Table config cannot be 
null.");
-    Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be 
null");
-  }
-
   /**
    * Cleans up outputs in preprocessed output directory.
    */
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 6927d5d..01b5064 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentCreationJob;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..adaef88
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import 
org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper;
+import 
org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopAvroDataPreprocessingHelper extends 
HadoopDataPreprocessingHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopAvroDataPreprocessingHelper.class);
+
+  public HadoopAvroDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpMapperReducerConfigs(Job job)
+      throws IOException {
+    Schema avroSchema = (Schema) 
getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+    LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+    validateConfigsAgainstSchema(avroSchema);
+
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    job.setMapperClass(AvroDataPreprocessingMapper.class);
+
+    job.setReducerClass(AvroDataPreprocessingReducer.class);
+    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, 
avroSchema);
+    AvroMultipleOutputs.setCountersEnabled(job, true);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+    job.setOutputKeyClass(AvroKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    AvroJob.setInputKeySchema(job, avroSchema);
+    AvroJob.setMapOutputValueSchema(job, avroSchema);
+    AvroJob.setOutputKeySchema(job, avroSchema);
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
similarity index 51%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
index a505d09..5444738 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.hadoop.job.preprocess;
 
 import java.io.IOException;
-import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
@@ -31,67 +30,30 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
-import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.partitioners.GenericPartitioner;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.utils.preprocess.TextComparator;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public abstract class DataPreprocessingHelper {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataPreprocessingHelper.class);
+public abstract class HadoopDataPreprocessingHelper implements 
HadoopJobPreparer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopDataPreprocessingHelper.class);
 
-  String _partitionColumn;
-  int _numPartitions;
-  String _partitionFunction;
+  protected DataPreprocessingHelper _dataPreprocessingHelper;
 
-  String _sortingColumn;
-  private FieldSpec.DataType _sortingColumnType;
-
-  private int _numOutputFiles;
-  private int _maxNumRecordsPerFile;
-
-  private TableConfig _tableConfig;
-  private Schema _pinotTableSchema;
-
-  List<Path> _inputDataPaths;
-  Path _sampleRawDataPath;
-  Path _outputPath;
-
-  public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
-    _inputDataPaths = inputDataPaths;
-    _sampleRawDataPath = inputDataPaths.get(0);
-    _outputPath = outputPath;
-  }
-
-  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
-      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
-      int maxNumRecordsPerFile) {
-    _tableConfig = tableConfig;
-    _pinotTableSchema = tableSchema;
-    _partitionColumn = partitionColumn;
-    _numPartitions = numPartitions;
-    _partitionFunction = partitionFunction;
-
-    _sortingColumn = sortingColumn;
-    _sortingColumnType = sortingColumnType;
-
-    _numOutputFiles = numOutputFiles;
-    _maxNumRecordsPerFile = maxNumRecordsPerFile;
+  public HadoopDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    _dataPreprocessingHelper = dataPreprocessingHelper;
   }
 
   public Job setUpJob()
@@ -100,21 +62,21 @@ public abstract class DataPreprocessingHelper {
     Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
     Configuration jobConf = job.getConfiguration();
     // Input and output paths.
-    int numInputPaths = _inputDataPaths.size();
+    int numInputPaths = _dataPreprocessingHelper._inputDataPaths.size();
     jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
-    setValidationConfigs(job, _sampleRawDataPath);
-    for (Path inputFile : _inputDataPaths) {
+    _dataPreprocessingHelper.setValidationConfigs(job, 
_dataPreprocessingHelper._sampleRawDataPath);
+    for (Path inputFile : _dataPreprocessingHelper._inputDataPaths) {
       FileInputFormat.addInputPath(job, inputFile);
     }
     setHadoopJobConfigs(job);
 
     // Sorting column
-    if (_sortingColumn != null) {
-      LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
-      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, 
_sortingColumn);
-      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, 
_sortingColumnType.name());
+    if (_dataPreprocessingHelper._sortingColumn != null) {
+      LOGGER.info("Adding sorting column: {} to job config", 
_dataPreprocessingHelper._sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, 
_dataPreprocessingHelper._sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, 
_dataPreprocessingHelper._sortingColumnType.name());
 
-      switch (_sortingColumnType) {
+      switch (_dataPreprocessingHelper._sortingColumnType) {
         case INT:
           job.setMapOutputKeyClass(IntWritable.class);
           break;
@@ -140,27 +102,27 @@ public abstract class DataPreprocessingHelper {
 
     // Partition column
     int numReduceTasks = 0;
-    if (_partitionColumn != null) {
-      numReduceTasks = _numPartitions;
+    if (_dataPreprocessingHelper._partitionColumn != null) {
+      numReduceTasks = _dataPreprocessingHelper._numPartitions;
       jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
       job.setPartitionerClass(GenericPartitioner.class);
-      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, 
_partitionColumn);
-      if (_partitionFunction != null) {
-        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, 
_partitionFunction);
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, 
_dataPreprocessingHelper._partitionColumn);
+      if (_dataPreprocessingHelper._partitionFunction != null) {
+        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, 
_dataPreprocessingHelper._partitionFunction);
       }
       jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, 
numReduceTasks);
-      job.setPartitionerClass(getPartitioner());
+      job.setPartitionerClass(_dataPreprocessingHelper.getPartitioner());
     } else {
-      if (_numOutputFiles > 0) {
-        numReduceTasks = _numOutputFiles;
+      if (_dataPreprocessingHelper._numOutputFiles > 0) {
+        numReduceTasks = _dataPreprocessingHelper._numOutputFiles;
       } else {
         // default number of input paths
-        numReduceTasks = _inputDataPaths.size();
+        numReduceTasks = _dataPreprocessingHelper._inputDataPaths.size();
       }
     }
     // Maximum number of records per output file
-    jobConf
-        .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 
Integer.toString(_maxNumRecordsPerFile));
+    jobConf.set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE,
+        Integer.toString(_dataPreprocessingHelper._maxNumRecordsPerFile));
     // Number of reducers
     LOGGER.info("Number of reduce tasks for pre-processing job: {}", 
numReduceTasks);
     job.setNumReduceTasks(numReduceTasks);
@@ -170,50 +132,10 @@ public abstract class DataPreprocessingHelper {
     return job;
   }
 
-  abstract Class<? extends Partitioner> getPartitioner();
-
-  abstract void setUpMapperReducerConfigs(Job job)
-      throws IOException;
-
-  abstract String getSampleTimeColumnValue(String timeColumnName)
-      throws IOException;
-
-  private void setValidationConfigs(Job job, Path path)
-      throws IOException {
-    SegmentsValidationAndRetentionConfig validationConfig = 
_tableConfig.getValidationConfig();
-
-    // TODO: Serialize and deserialize validation config by creating toJson 
and fromJson
-    // If the use case is an append use case, check that one time unit is 
contained in one file. If there is more than one,
-    // the job should be disabled, as we should not resize for these use 
cases. Therefore, setting the time column name
-    // and value
-    if 
(IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND"))
 {
-      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
-      String timeColumnName = validationConfig.getTimeColumnName();
-      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, 
timeColumnName);
-      if (timeColumnName != null) {
-        DateTimeFieldSpec dateTimeFieldSpec = 
_pinotTableSchema.getSpecForTimeColumn(timeColumnName);
-        if (dateTimeFieldSpec != null) {
-          DateTimeFormatSpec formatSpec = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, 
formatSpec.getColumnUnit().toString());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, 
formatSpec.getTimeFormat().toString());
-          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, 
formatSpec.getSDFPattern());
-        }
-      }
-      
job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
-          
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
-
-      String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
-      if (sampleTimeColumnValue != null) {
-        job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, 
sampleTimeColumnValue);
-      }
-    }
-  }
-
   private void setHadoopJobConfigs(Job job) {
     job.setJarByClass(HadoopSegmentPreprocessingJob.class);
     job.setJobName(getClass().getName());
-    FileOutputFormat.setOutputPath(job, _outputPath);
+    FileOutputFormat.setOutputPath(job, _dataPreprocessingHelper._outputPath);
     job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
     // Turn this on to always firstly use class paths that user specifies.
     job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, 
"true");
@@ -225,4 +147,21 @@ public abstract class DataPreprocessingHelper {
       job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, 
hadoopTokenFileLocation);
     }
   }
+
+  public Object getSchema(Path inputPathDir)
+      throws IOException {
+    return _dataPreprocessingHelper.getSchema(inputPathDir);
+  }
+
+  public void validateConfigsAgainstSchema(Object schema) {
+    _dataPreprocessingHelper.validateConfigsAgainstSchema(schema);
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
+      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _dataPreprocessingHelper
+        .registerConfigs(tableConfig, tableSchema, partitionColumn, 
numPartitions, partitionFunction, sortingColumn,
+            sortingColumnType, numOutputFiles, maxNumRecordsPerFile);
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
similarity index 73%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
index 2e91773..faae46d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
@@ -22,15 +22,17 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DataPreprocessingHelperFactory {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+public class HadoopDataPreprocessingHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopDataPreprocessingHelperFactory.class);
 
-  public static DataPreprocessingHelper generateDataPreprocessingHelper(Path 
inputPaths, Path outputPath)
+  public static HadoopDataPreprocessingHelper 
generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
       throws IOException {
     final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.AVRO_FILE_EXTENSION);
     final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.ORC_FILE_EXTENSION);
@@ -46,10 +48,10 @@ public class DataPreprocessingHelperFactory {
 
     if (numAvroFiles > 0) {
       LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, 
inputPaths);
-      return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+      return new HadoopAvroDataPreprocessingHelper(new 
AvroDataPreprocessingHelper(avroFiles, outputPath));
     } else {
       LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, 
inputPaths);
-      return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+      return new HadoopOrcDataPreprocessingHelper(new 
OrcDataPreprocessingHelper(orcFiles, outputPath));
     }
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
similarity index 59%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
index 0596259..c4834cf 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
@@ -16,26 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.hadoop.job.preprocess;
 
 import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.Job;
 
 
-public class HadoopUtils {
-  private HadoopUtils() {
-  }
+public interface HadoopJobPreparer {
 
-  public static final Configuration DEFAULT_CONFIGURATION;
-  public static final FileSystem DEFAULT_FILE_SYSTEM;
-
-  static {
-    DEFAULT_CONFIGURATION = new Configuration();
-    try {
-      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to get the default file system", 
e);
-    }
-  }
+  void setUpMapperReducerConfigs(Job job) throws IOException;
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..0d9ee78
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.orc.OrcConf;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import 
org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper;
+import 
org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopOrcDataPreprocessingHelper extends 
HadoopDataPreprocessingHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopOrcDataPreprocessingHelper.class);
+
+  public HadoopOrcDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpMapperReducerConfigs(Job job)
+      throws IOException {
+    Object orcSchema = getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+    String orcSchemaString = orcSchema.toString();
+    LOGGER.info("Orc schema is: {}", orcSchemaString);
+    validateConfigsAgainstSchema(orcSchema);
+
+    job.setInputFormatClass(OrcInputFormat.class);
+    job.setMapperClass(OrcDataPreprocessingMapper.class);
+    job.setMapOutputValueClass(OrcValue.class);
+    Configuration jobConf = job.getConfiguration();
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+    job.setReducerClass(OrcDataPreprocessingReducer.class);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(OrcStruct.class);
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
index 284b698..13205c5 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
@@ -117,5 +117,20 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+<!--      <exclusions>-->
+<!--        <exclusion>-->
+<!--          <groupId>org.eclipse.jetty</groupId>-->
+<!--          <artifactId>jetty-util</artifactId>-->
+<!--        </exclusion>-->
+<!--      </exclusions>-->
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
index 2e3b023..fe51e00 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
@@ -21,11 +21,25 @@ package org.apache.pinot.ingestion.jobs;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +52,7 @@ import org.slf4j.LoggerFactory;
  * * enable.preprocessing: false by default. Enables preprocessing job.
  */
 public abstract class SegmentPreprocessingJob extends BaseSegmentJob {
-  private static final Logger _logger = 
LoggerFactory.getLogger(SegmentPreprocessingJob.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreprocessingJob.class);
   protected final Path _schemaFile;
   protected final Path _inputSegmentDir;
   protected final Path _preprocessedOutputDir;
@@ -46,6 +60,21 @@ public abstract class SegmentPreprocessingJob extends 
BaseSegmentJob {
   protected final Path _pathToDependencyJar;
   protected boolean _enablePreprocessing;
 
+  protected String _partitionColumn;
+  protected int _numPartitions;
+  protected String _partitionFunction;
+
+  protected String _sortingColumn;
+  protected FieldSpec.DataType _sortingColumnType;
+
+  protected int _numOutputFiles;
+  protected int _maxNumRecordsPerFile;
+
+  protected TableConfig _tableConfig;
+  protected org.apache.pinot.spi.data.Schema _pinotTableSchema;
+
+  protected Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
+
   public SegmentPreprocessingJob(final Properties properties) {
     super(properties);
 
@@ -59,13 +88,13 @@ public abstract class SegmentPreprocessingJob extends 
BaseSegmentJob {
     _pathToDependencyJar = 
getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
     _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
 
-    
_logger.info("*********************************************************************");
-    _logger.info("enable.preprocessing: {}", _enablePreprocessing);
-    _logger.info("path.to.input: {}", _inputSegmentDir);
-    _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
-    _logger.info("path.to.deps.jar: {}", _pathToDependencyJar);
-    _logger.info("push.locations: {}", _pushLocations);
-    
_logger.info("*********************************************************************");
+    
LOGGER.info("*********************************************************************");
+    LOGGER.info("enable.preprocessing: {}", _enablePreprocessing);
+    LOGGER.info("path.to.input: {}", _inputSegmentDir);
+    LOGGER.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
+    LOGGER.info("path.to.deps.jar: {}", _pathToDependencyJar);
+    LOGGER.info("push.locations: {}", _pushLocations);
+    
LOGGER.info("*********************************************************************");
   }
 
   protected abstract void run()
@@ -90,4 +119,131 @@ public abstract class SegmentPreprocessingJob extends 
BaseSegmentJob {
     // TODO: support orc format in the future.
     return fileName.endsWith(".avro");
   }
+
+  protected void setTableConfigAndSchema()
+      throws IOException {
+    _tableConfig = getTableConfig();
+    _pinotTableSchema = getSchema();
+
+    Preconditions.checkState(_tableConfig != null, "Table config cannot be 
null.");
+    Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be 
null");
+  }
+
+  protected void fetchPreProcessingOperations() {
+    _preprocessingOperations = new HashSet<>();
+    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+    if (customConfig != null) {
+      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+      if (customConfigMap != null && !customConfigMap.isEmpty()) {
+        String preprocessingOperationsString =
+            
customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+        DataPreprocessingUtils.getOperations(_preprocessingOperations, 
preprocessingOperationsString);
+      }
+    }
+  }
+
+  protected void fetchPartitioningConfig() {
+    // Fetch partition info from table config.
+    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION))
 {
+      LOGGER.info("Partitioning is disabled.");
+      return;
+    }
+    SegmentPartitionConfig segmentPartitionConfig = 
_tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
+      Preconditions
+          .checkArgument(columnPartitionMap.size() <= 1, "There should be at 
most 1 partition setting in the table.");
+      if (columnPartitionMap.size() == 1) {
+        _partitionColumn = columnPartitionMap.keySet().iterator().next();
+        _numPartitions = 
segmentPartitionConfig.getNumPartitions(_partitionColumn);
+        _partitionFunction = 
segmentPartitionConfig.getFunctionName(_partitionColumn);
+      }
+    } else {
+      LOGGER.info("Segment partition config is null for table: {}", 
_tableConfig.getTableName());
+    }
+  }
+
+  protected void fetchSortingConfig() {
+    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
+      LOGGER.info("Sorting is disabled.");
+      return;
+    }
+    // Fetch sorting info from table config first.
+    List<String> sortingColumns = new ArrayList<>();
+    List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
+    if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
+      for (FieldConfig fieldConfig : fieldConfigs) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
+          sortingColumns.add(fieldConfig.getName());
+        }
+      }
+    }
+    if (!sortingColumns.isEmpty()) {
+      Preconditions.checkArgument(sortingColumns.size() == 1, "There should be 
at most 1 sorted column in the table.");
+      _sortingColumn = sortingColumns.get(0);
+      return;
+    }
+
+    // There is no sorted column specified in field configs, try to find 
sorted column from indexing config.
+    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+    List<String> sortedColumns = indexingConfig.getSortedColumn();
+    if (sortedColumns != null) {
+      Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be 
at most 1 sorted column in the table.");
+      if (sortedColumns.size() == 1) {
+        _sortingColumn = sortedColumns.get(0);
+        FieldSpec fieldSpec = 
_pinotTableSchema.getFieldSpecFor(_sortingColumn);
+        Preconditions.checkState(fieldSpec != null, "Failed to find sorting 
column: {} in the schema", _sortingColumn);
+        Preconditions
+            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on 
multi-value column: %s", _sortingColumn);
+        _sortingColumnType = fieldSpec.getDataType();
+        Preconditions
+            .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort 
on %s column: %s", _sortingColumnType,
+                _sortingColumn);
+        LOGGER.info("Sorting the data with column: {} of type: {}", 
_sortingColumn, _sortingColumnType);
+      }
+    }
+  }
+
+  protected void fetchResizingConfig() {
+    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
+      LOGGER.info("Resizing is disabled.");
+      return;
+    }
+    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+    if (tableCustomConfig == null) {
+      _numOutputFiles = 0;
+      return;
+    }
+    Map<String, String> customConfigsMap = 
tableCustomConfig.getCustomConfigs();
+    if (customConfigsMap != null && 
customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS))
 {
+      _numOutputFiles = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
+      Preconditions.checkState(_numOutputFiles > 0, String
+          .format("The value of %s should be positive! Current value: %s",
+              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, 
_numOutputFiles));
+    } else {
+      _numOutputFiles = 0;
+    }
+
+    if (customConfigsMap != null) {
+      int maxNumRecords;
+      if 
(customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE))
 {
+        LOGGER.warn("The config: {} from custom config is deprecated. Use {} 
instead.",
+            InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
+            InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
+        maxNumRecords = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+      } else if 
(customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE))
 {
+        maxNumRecords =
+            
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
+      } else {
+        return;
+      }
+      // TODO: add a in-built maximum value for this config to avoid having 
too many small files.
+      // E.g. if the config is set to 1 which is smaller than this in-built 
value, the job should be abort from generating too many small files.
+      Preconditions.checkArgument(maxNumRecords > 0,
+          "The value of " + 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
+              + " should be positive. Current value: " + maxNumRecords);
+      LOGGER.info("Setting {} to {}", 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
+      _maxNumRecordsPerFile = maxNumRecords;
+    }
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
similarity index 74%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
index 9e5f5f2..3be4768 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -26,23 +26,13 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
-import 
org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import 
org.apache.pinot.ingestion.preprocess.partitioners.AvroDataPreprocessingPartitioner;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,30 +51,19 @@ public class AvroDataPreprocessingHelper extends 
DataPreprocessingHelper {
   }
 
   @Override
-  public void setUpMapperReducerConfigs(Job job)
+  public Object getSchema(Path inputPathDir)
       throws IOException {
-    Schema avroSchema = getAvroSchema(_sampleRawDataPath);
-    LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
-    validateConfigsAgainstSchema(avroSchema);
-
-    job.setInputFormatClass(AvroKeyInputFormat.class);
-    job.setMapperClass(AvroDataPreprocessingMapper.class);
-
-    job.setReducerClass(AvroDataPreprocessingReducer.class);
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, 
avroSchema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
+    return getAvroSchema(inputPathDir);
+  }
 
-    AvroJob.setInputKeySchema(job, avroSchema);
-    AvroJob.setMapOutputValueSchema(job, avroSchema);
-    AvroJob.setOutputKeySchema(job, avroSchema);
+  @Override
+  public void validateConfigsAgainstSchema(Object schema) {
+    Schema avroSchema = (Schema) schema;
+    validateConfigsAgainstSchema(avroSchema);
   }
 
   @Override
-  String getSampleTimeColumnValue(String timeColumnName)
+  public String getSampleTimeColumnValue(String timeColumnName)
       throws IOException {
     String sampleTimeColumnValue;
     try (DataFileStream<GenericRecord> dataStreamReader = 
getAvroReader(_sampleRawDataPath)) {
@@ -99,7 +78,7 @@ public class AvroDataPreprocessingHelper extends 
DataPreprocessingHelper {
    * @return Input schema
    * @throws IOException exception when accessing to IO
    */
-  private Schema getAvroSchema(Path inputPathDir)
+  protected Schema getAvroSchema(Path inputPathDir)
       throws IOException {
     Schema avroSchema = null;
     for (FileStatus fileStatus : 
HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
new file mode 100644
index 0000000..cb0a738
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.preprocess;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class DataPreprocessingHelper implements 
SampleTimeColumnExtractable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataPreprocessingHelper.class);
+
+  public String _partitionColumn;
+  public int _numPartitions;
+  public String _partitionFunction;
+
+  public String _sortingColumn;
+  public FieldSpec.DataType _sortingColumnType;
+
+  public int _numOutputFiles;
+  public int _maxNumRecordsPerFile;
+
+  public TableConfig _tableConfig;
+  public Schema _pinotTableSchema;
+
+  public List<Path> _inputDataPaths;
+  public Path _sampleRawDataPath;
+  public Path _outputPath;
+
+  public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+    _inputDataPaths = inputDataPaths;
+    _sampleRawDataPath = inputDataPaths.get(0);
+    _outputPath = outputPath;
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
+      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _tableConfig = tableConfig;
+    _pinotTableSchema = tableSchema;
+    _partitionColumn = partitionColumn;
+    _numPartitions = numPartitions;
+    _partitionFunction = partitionFunction;
+
+    _sortingColumn = sortingColumn;
+    _sortingColumnType = sortingColumnType;
+
+    _numOutputFiles = numOutputFiles;
+    _maxNumRecordsPerFile = maxNumRecordsPerFile;
+  }
+
+  public abstract Class<? extends Partitioner> getPartitioner();
+
+  abstract public Object getSchema(Path inputPathDir)
+      throws IOException;
+
+  abstract public void validateConfigsAgainstSchema(Object schema);
+
+  public void setValidationConfigs(Job job, Path path)
+      throws IOException {
+    SegmentsValidationAndRetentionConfig validationConfig = 
_tableConfig.getValidationConfig();
+
+    // TODO: Serialize and deserialize validation config by creating toJson 
and fromJson
+    // If the use case is an append use case, check that one time unit is 
contained in one file. If there is more than one,
+    // the job should be disabled, as we should not resize for these use 
cases. Therefore, setting the time column name
+    // and value
+    if 
(IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND"))
 {
+      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
+      String timeColumnName = validationConfig.getTimeColumnName();
+      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, 
timeColumnName);
+      if (timeColumnName != null) {
+        DateTimeFieldSpec dateTimeFieldSpec = 
_pinotTableSchema.getSpecForTimeColumn(timeColumnName);
+        if (dateTimeFieldSpec != null) {
+          DateTimeFormatSpec formatSpec = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, 
formatSpec.getColumnUnit().toString());
+          job.getConfiguration()
+              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, 
formatSpec.getTimeFormat().toString());
+          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, 
formatSpec.getSDFPattern());
+        }
+      }
+      
job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
+          
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+      String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
+      if (sampleTimeColumnValue != null) {
+        job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, 
sampleTimeColumnValue);
+      }
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
similarity index 95%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
index 2e91773..1cb07e3 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
similarity index 82%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
index aec0bb0..f7f287b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -30,23 +29,13 @@ import 
org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapred.OrcValue;
-import org.apache.orc.mapreduce.OrcInputFormat;
-import org.apache.orc.mapreduce.OrcOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
-import 
org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import 
org.apache.pinot.ingestion.preprocess.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.utils.StringUtils;
 import org.slf4j.Logger;
@@ -61,33 +50,24 @@ public class OrcDataPreprocessingHelper extends 
DataPreprocessingHelper {
   }
 
   @Override
-  Class<? extends Partitioner> getPartitioner() {
+  public Class<? extends Partitioner> getPartitioner() {
     return OrcDataPreprocessingPartitioner.class;
   }
 
   @Override
-  void setUpMapperReducerConfigs(Job job) {
-    TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath);
-    String orcSchemaString = orcSchema.toString();
-    LOGGER.info("Orc schema is: {}", orcSchemaString);
-    validateConfigsAgainstSchema(orcSchema);
-
-    job.setInputFormatClass(OrcInputFormat.class);
-    job.setMapperClass(OrcDataPreprocessingMapper.class);
-    job.setMapOutputValueClass(OrcValue.class);
-    Configuration jobConf = job.getConfiguration();
-    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+  public Object getSchema(Path inputPathDir)
+      throws IOException {
+    return getOrcSchema(inputPathDir);
+  }
 
-    job.setReducerClass(OrcDataPreprocessingReducer.class);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(OrcStruct.class);
-    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+  @Override
+  public void validateConfigsAgainstSchema(Object schema) {
+    TypeDescription orcSchema = (TypeDescription) schema;
+    validateConfigsAgainstSchema(orcSchema);
   }
 
   @Override
-  String getSampleTimeColumnValue(String timeColumnName)
+  public String getSampleTimeColumnValue(String timeColumnName)
       throws IOException {
     try (Reader reader = OrcFile
         .createReader(_sampleRawDataPath, 
OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
similarity index 59%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
index 0596259..12efa99 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
@@ -16,26 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 
 
-public class HadoopUtils {
-  private HadoopUtils() {
-  }
+public interface SampleTimeColumnExtractable {
 
-  public static final Configuration DEFAULT_CONFIGURATION;
-  public static final FileSystem DEFAULT_FILE_SYSTEM;
-
-  static {
-    DEFAULT_CONFIGURATION = new Configuration();
-    try {
-      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to get the default file system", 
e);
-    }
-  }
+  String getSampleTimeColumnValue(String timeColumnName) throws IOException;
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
similarity index 95%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
index 6278e8e..759977d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.slf4j.Logger;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
similarity index 93%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
index d7d0694..d140765 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -27,9 +27,9 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
index 3d3fcec..2bc6739 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -30,8 +30,8 @@ import org.apache.avro.mapreduce.AvroJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import 
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
index 74799c7..d833964 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericRecord;
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
similarity index 88%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
index 7ca22cf..f9c1467 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroValue;
@@ -27,9 +27,9 @@ import 
org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.pinot.hadoop.job.InternalConfigConstants.NUM_PARTITIONS_CONFIG;
-import static 
org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_COLUMN_CONFIG;
-import static 
org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_FUNCTION_CONFIG;
+import static 
org.apache.pinot.ingestion.utils.InternalConfigConstants.NUM_PARTITIONS_CONFIG;
+import static 
org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_COLUMN_CONFIG;
+import static 
org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_FUNCTION_CONFIG;
 
 
 public class GenericPartitioner<T> extends Partitioner<T, 
AvroValue<GenericRecord>> implements Configurable {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
similarity index 95%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
index bef2cef..d9b509d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
@@ -26,8 +26,8 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
index 0ba57db..1826bdc 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
index 5fcbf10..4f236a4 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
 
 import java.io.IOException;
 import org.apache.avro.generic.GenericRecord;
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
index a3387a2..10d3f10 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
 
 import java.io.IOException;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
index 4cc6f75..8450179 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils;
 
 import java.util.Set;
 import org.apache.hadoop.io.DoubleWritable;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
index 3701db2..e85a3e8 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job;
+package org.apache.pinot.ingestion.utils;
 
 /**
  * Internal-only constants for Hadoop MapReduce jobs. These constants are 
propagated across different segment creation
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
similarity index 97%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
index 58e1c1d..a6e4ca7 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
similarity index 96%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
index 0596259..e4cdf5e 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
index dcfc3b5..09eb218 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.io.BooleanWritable;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
index 65f1222..05a437d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
index 13db9ea..e302399 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
@@ -106,6 +106,10 @@
           <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-common</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.esotericsoftware</groupId>
+          <artifactId>kryo-shaded</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -282,6 +286,10 @@
       </exclusions>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+    </dependency>
 
     <!--Test-->
     <dependency>
@@ -379,6 +387,14 @@
           <groupId>commons-pool</groupId>
           <artifactId>commons-pool</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro-mapred</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
       </exclusions>
       <scope>test</scope>
     </dependency>
@@ -463,8 +479,16 @@
           <groupId>org.apache.orc</groupId>
           <artifactId>orc-core</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.orc</groupId>
+          <artifactId>orc-mapreduce</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-hadoop</artifactId>
+        </exclusion>
       </exclusions>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -480,7 +504,7 @@
           <artifactId>objenesis</artifactId>
         </exclusion>
       </exclusions>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.derby</groupId>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
new file mode 100644
index 0000000..39ebb58
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
+import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelper;
+import 
org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelperFactory;
+import org.apache.pinot.spark.utils.HadoopUtils;
+import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Spark job which provides partitioning, sorting, and resizing against the 
input files, which is raw data in either Avro or Orc format.
+ * Thus, the output files are partitioned, sorted, resized after this job.
+ * In order to run this job, the following configs need to be specified in job 
properties:
+ * * enable.preprocessing: false by default. Enables preprocessing job.
+ */
+public class SparkSegmentPreprocessingJob extends SegmentPreprocessingJob {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkSegmentPreprocessingJob.class);
+
+  public SparkSegmentPreprocessingJob(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    if (!_enablePreprocessing) {
+      LOGGER.info("Pre-processing job is disabled.");
+      return;
+    } else {
+      LOGGER.info("Starting {}", getClass().getSimpleName());
+    }
+
+    setTableConfigAndSchema();
+    fetchPreProcessingOperations();
+    fetchPartitioningConfig();
+    fetchSortingConfig();
+    fetchResizingConfig();
+
+    // Cleans up preprocessed output dir if exists
+    cleanUpPreprocessedOutputs(_preprocessedOutputDir);
+
+    SparkDataPreprocessingHelper dataPreprocessingHelper =
+        
SparkDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir,
 _preprocessedOutputDir);
+    dataPreprocessingHelper
+        .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, 
_numPartitions, _partitionFunction,
+            _sortingColumn, _sortingColumnType, _numOutputFiles, 
_maxNumRecordsPerFile);
+
+    // Set up and execute spark job.
+    JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    addDepsJarToDistributedCache(javaSparkContext);
+
+    SparkSession sparkSession = 
SparkSession.builder().appName(SparkSegmentPreprocessingJob.class.getSimpleName()).getOrCreate();
+
+    dataPreprocessingHelper.setUpAndExecuteJob(sparkSession);
+  }
+
+  /**
+   * Cleans up outputs in preprocessed output directory.
+   */
+  public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
+      throws IOException {
+    if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+      LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+      HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
+    }
+  }
+
+  protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext)
+      throws IOException {
+    if (_pathToDependencyJar != null) {
+      PinotSparkJobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, 
sparkContext,
+              _pathToDependencyJar);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
similarity index 58%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
index 0596259..23f17e8 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
@@ -16,26 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.spark.jobs.preprocess;
 
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
 
 
-public class HadoopUtils {
-  private HadoopUtils() {
+public class SparkAvroDataPreprocessingHelper extends 
SparkDataPreprocessingHelper {
+  public SparkAvroDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
   }
 
-  public static final Configuration DEFAULT_CONFIGURATION;
-  public static final FileSystem DEFAULT_FILE_SYSTEM;
-
-  static {
-    DEFAULT_CONFIGURATION = new Configuration();
-    try {
-      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to get the default file system", 
e);
-    }
+  @Override
+  public String getDataFormat() {
+    return "avro";
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java
new file mode 100644
index 0000000..7a7a564
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import com.google.common.collect.Ordering;
+import java.io.Serializable;
+
+
+public class SparkDataPreprocessingComparator extends Ordering<Object> 
implements Serializable {
+  @Override
+  public int compare(Object left, Object right) {
+    Object value1 = ((SparkDataPreprocessingJobKey) left).getSortedColumn();
+    Object value2 = ((SparkDataPreprocessingJobKey) right).getSortedColumn();
+    if (value1 == null) {
+      return 0;
+    }
+    if (value1 instanceof Integer) {
+      return Integer.compare((int) value1, (int) value2);
+    } else if (value1 instanceof Long) {
+      return Long.compare((long) value1, (long) value2);
+    } else if (value1 instanceof Float) {
+      return Float.compare((float) value1, (float) value2);
+    } else if (value1 instanceof Double) {
+      return Double.compare((double) value1, (double) value2);
+    } else if (value1 instanceof Short) {
+      return Short.compare((short) value1, (short) value2);
+    } else if (value1 instanceof String) {
+      return ((String) value1).compareTo((String) value2);
+    }
+    throw new RuntimeException("Unsupported Data type: " + 
value1.getClass().getName());
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
new file mode 100644
index 0000000..6b93f7e
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import 
org.apache.pinot.ingestion.preprocess.partitioners.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+
+public abstract class SparkDataPreprocessingHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkDataPreprocessingHelper.class);
+
+  protected DataPreprocessingHelper _dataPreprocessingHelper;
+
+  public SparkDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    _dataPreprocessingHelper = dataPreprocessingHelper;
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
+      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _dataPreprocessingHelper
+        .registerConfigs(tableConfig, tableSchema, partitionColumn, 
numPartitions, partitionFunction, sortingColumn,
+            sortingColumnType, numOutputFiles, maxNumRecordsPerFile);
+  }
+
+  public void setUpAndExecuteJob(SparkSession sparkSession) {
+    // Read data into data frame.
+    Dataset<Row> dataFrame =
+        
sparkSession.read().format(getDataFormat()).load(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths));
+    JavaRDD<Row> javaRDD = dataFrame.javaRDD();
+
+    // Find positions of partition column and sorting column if specified.
+    StructType schema = dataFrame.schema();
+    StructField[] fields = schema.fields();
+    int partitionColumnPosition = -1;
+    int sortingColumnPosition = -1;
+    PartitionFunction partitionFunction = null;
+    for (int i = 0; i <= fields.length; i++) {
+      StructField field = fields[i];
+      if (_dataPreprocessingHelper._partitionColumn != null && 
_dataPreprocessingHelper._partitionColumn
+          .equalsIgnoreCase(field.name())) {
+        partitionColumnPosition = i;
+        partitionFunction = PartitionFunctionFactory
+            .getPartitionFunction(_dataPreprocessingHelper._partitionFunction, 
_dataPreprocessingHelper._numPartitions);
+      }
+      if (_dataPreprocessingHelper._sortingColumn != null && 
_dataPreprocessingHelper._sortingColumn
+          .equalsIgnoreCase(field.name())) {
+        sortingColumnPosition = i;
+      }
+    }
+    int numPartitions;
+    if (partitionColumnPosition == -1) {
+      if (_dataPreprocessingHelper._numOutputFiles > 0) {
+        numPartitions = _dataPreprocessingHelper._numOutputFiles;
+      } else {
+        numPartitions = javaRDD.getNumPartitions();
+      }
+    } else {
+      numPartitions = _dataPreprocessingHelper._numPartitions;
+    }
+    final int finalNumPartitions = numPartitions;
+    final int finalPartitionColumnPosition = partitionColumnPosition;
+    final int finalSortingColumnPosition = sortingColumnPosition;
+    LOGGER.info("Number of partitions: " + finalNumPartitions);
+    LOGGER.info("Position of partition column (if specified): " + 
finalPartitionColumnPosition);
+    LOGGER.info("Position of sorting column (if specified): " + 
finalSortingColumnPosition);
+    SparkDataPreprocessingPartitioner sparkPartitioner =
+        new SparkDataPreprocessingPartitioner(finalNumPartitions, 
partitionFunction);
+
+    // Convert to java pair rdd.
+    JavaPairRDD<Object, Row> pairRDD = javaRDD.mapToPair((PairFunction<Row, 
Object, Row>) row -> {
+      Object partitionColumnValue = null;
+      Object sortingColumnValue = null;
+
+      if (_dataPreprocessingHelper._partitionColumn != null) {
+        partitionColumnValue = row.get(finalPartitionColumnPosition);
+      }
+      int partitionId = 
sparkPartitioner.generatePartitionId(partitionColumnValue);
+      if (_dataPreprocessingHelper._sortingColumn != null) {
+        sortingColumnValue = row.get(finalSortingColumnPosition);
+      }
+      return new Tuple2<>(new SparkDataPreprocessingJobKey(partitionId, 
sortingColumnValue), row);
+    });
+
+    // Repartition and sort within partitions.
+    Comparator<Object> comparator = new SparkDataPreprocessingComparator();
+    JavaPairRDD<Object, Row> partitionedSortedPairRDD =
+        pairRDD.repartitionAndSortWithinPartitions(sparkPartitioner, 
comparator);
+
+    // TODO: support preprocessing.max.num.records.per.file before writing 
back to storage
+    // Write to output path.
+    
partitionedSortedPairRDD.values().saveAsTextFile(_dataPreprocessingHelper._outputPath.toString());
+  }
+
+  private Seq<String> convertPathsToStrings(List<Path> paths) {
+    List<String> stringList = new ArrayList<>();
+    for (Path path : paths) {
+      stringList.add(path.toString());
+    }
+    return toSeq(stringList);
+  }
+
+  /**
+   * Helper to wrap a Java collection into a Scala Seq
+   *
+   * @param collection java collection
+   * @param <T> collection item type
+   * @return Scala Seq of type T
+   */
+  public static <T> Seq<T> toSeq(Collection<T> collection) {
+    return JavaConverters.asScalaBufferConverter(new 
ArrayList<>(collection)).asScala().toSeq();
+  }
+
+  public abstract String getDataFormat();
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
similarity index 71%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
index 2e91773..f902bbe 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
@@ -16,21 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.spark.jobs.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DataPreprocessingHelperFactory {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+public class SparkDataPreprocessingHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkDataPreprocessingHelperFactory.class);
 
-  public static DataPreprocessingHelper generateDataPreprocessingHelper(Path 
inputPaths, Path outputPath)
+  public static SparkDataPreprocessingHelper 
generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
       throws IOException {
     final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.AVRO_FILE_EXTENSION);
     final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.ORC_FILE_EXTENSION);
@@ -46,10 +48,10 @@ public class DataPreprocessingHelperFactory {
 
     if (numAvroFiles > 0) {
       LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, 
inputPaths);
-      return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+      return new SparkAvroDataPreprocessingHelper(new 
AvroDataPreprocessingHelper(avroFiles, outputPath));
     } else {
       LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, 
inputPaths);
-      return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+      return new SparkOrcDataPreprocessingHelper(new 
OrcDataPreprocessingHelper(orcFiles, outputPath));
     }
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java
similarity index 58%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java
index 0596259..d4e76a2 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java
@@ -16,26 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.spark.jobs.preprocess;
 
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+public class SparkDataPreprocessingJobKey {
+  private final Object _partitionId;
+  private final Object _sortedColumn;
 
-
-public class HadoopUtils {
-  private HadoopUtils() {
+  public SparkDataPreprocessingJobKey(Object partitionId, Object sortedColumn) 
{
+    _partitionId = partitionId;
+    _sortedColumn = sortedColumn;
   }
 
-  public static final Configuration DEFAULT_CONFIGURATION;
-  public static final FileSystem DEFAULT_FILE_SYSTEM;
+  public Object getPartitionId() {
+    return _partitionId;
+  }
 
-  static {
-    DEFAULT_CONFIGURATION = new Configuration();
-    try {
-      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to get the default file system", 
e);
-    }
+  public Object getSortedColumn() {
+    return _sortedColumn;
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..60a180a
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spark.jobs.preprocess;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.spark.Partitioner;
+
+
+public class SparkDataPreprocessingPartitioner extends Partitioner {
+  private final int _numPartitions;
+  private final PartitionFunction _partitionFunction;
+  private final AtomicInteger _counter = new AtomicInteger(0);
+
+  public SparkDataPreprocessingPartitioner(int numPartitions, 
PartitionFunction partitionFunction) {
+    _numPartitions = numPartitions;
+    _partitionFunction = partitionFunction;
+  }
+
+  @Override
+  public int numPartitions() {
+    return _numPartitions;
+  }
+
+  @Override
+  public int getPartition(Object key) {
+    SparkDataPreprocessingJobKey jobKey = (SparkDataPreprocessingJobKey) key;
+    return (int) jobKey.getPartitionId();
+  }
+
+  public int generatePartitionId(Object key) {
+    if (key == null) {
+      // Need to distribute evenly for data with the default partition key 
value.
+      // We may want to partition and sort on a non-primary key.
+      return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+    } else {
+      return _partitionFunction.getPartition(key);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
similarity index 58%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
index 0596259..3f84aad 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
@@ -16,26 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.spark.jobs.preprocess;
 
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
 
 
-public class HadoopUtils {
-  private HadoopUtils() {
+public class SparkOrcDataPreprocessingHelper extends 
SparkDataPreprocessingHelper {
+  public SparkOrcDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
   }
 
-  public static final Configuration DEFAULT_CONFIGURATION;
-  public static final FileSystem DEFAULT_FILE_SYSTEM;
-
-  static {
-    DEFAULT_CONFIGURATION = new Configuration();
-    try {
-      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to get the default file system", 
e);
-    }
+  @Override
+  public String getDataFormat() {
+    return "orc";
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
index 0596259..8ee89cf 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.spark.utils;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to