This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch support-spark-preprocessing in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 3bdd50cee7c76e5d03146bca7ae668db0365a4cf Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Thu Jul 22 16:05:03 2021 -0700 Support data preprocessing in Spark framework --- .../hadoop/job/HadoopSegmentPreprocessingJob.java | 166 +------------------- .../hadoop/job/mappers/SegmentCreationMapper.java | 2 +- .../HadoopAvroDataPreprocessingHelper.java | 67 ++++++++ ...per.java => HadoopDataPreprocessingHelper.java} | 155 ++++++------------- ...a => HadoopDataPreprocessingHelperFactory.java} | 14 +- .../preprocess/HadoopJobPreparer.java} | 21 +-- .../HadoopOrcDataPreprocessingHelper.java | 66 ++++++++ .../v0_deprecated/pinot-ingestion-common/pom.xml | 15 ++ .../ingestion/jobs/SegmentPreprocessingJob.java | 172 ++++++++++++++++++++- .../preprocess/AvroDataPreprocessingHelper.java | 45 ++---- .../preprocess/DataPreprocessingHelper.java | 118 ++++++++++++++ .../preprocess/DataPreprocessingHelperFactory.java | 4 +- .../preprocess/OrcDataPreprocessingHelper.java | 46 ++---- .../preprocess/SampleTimeColumnExtractable.java} | 20 +-- .../mappers/AvroDataPreprocessingMapper.java | 6 +- .../mappers/OrcDataPreprocessingMapper.java | 8 +- .../mappers/SegmentPreprocessingMapper.java | 4 +- .../AvroDataPreprocessingPartitioner.java | 4 +- .../partitioners/GenericPartitioner.java | 8 +- .../OrcDataPreprocessingPartitioner.java | 6 +- .../partitioners/PartitionFunctionFactory.java | 2 +- .../reducers/AvroDataPreprocessingReducer.java | 4 +- .../reducers/OrcDataPreprocessingReducer.java | 4 +- .../ingestion/utils}/DataPreprocessingUtils.java | 2 +- .../ingestion/utils}/InternalConfigConstants.java | 2 +- .../ingestion}/utils/preprocess/DataFileUtils.java | 2 +- .../ingestion}/utils/preprocess/HadoopUtils.java | 2 +- .../ingestion}/utils/preprocess/OrcUtils.java | 2 +- .../utils/preprocess/TextComparator.java | 2 +- .../v0_deprecated/pinot-spark/pom.xml | 28 +++- .../spark/jobs/SparkSegmentPreprocessingJob.java | 101 ++++++++++++ .../SparkAvroDataPreprocessingHelper.java} | 24 +-- .../SparkDataPreprocessingComparator.java | 48 ++++++ .../preprocess/SparkDataPreprocessingHelper.java | 153 ++++++++++++++++++ .../SparkDataPreprocessingHelperFactory.java} | 16 +- .../preprocess/SparkDataPreprocessingJobKey.java} | 28 ++-- .../SparkDataPreprocessingPartitioner.java | 56 +++++++ .../SparkOrcDataPreprocessingHelper.java} | 24 +-- .../org/apache/pinot/spark/utils}/HadoopUtils.java | 2 +- 39 files changed, 976 insertions(+), 473 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java index 81a9902..e17acd2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java @@ -18,33 +18,19 @@ */ package org.apache.pinot.hadoop.job; -import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper; -import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory; +import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelper; +import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelperFactory; import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; -import org.apache.pinot.spi.config.table.ColumnPartitionConfig; -import org.apache.pinot.spi.config.table.FieldConfig; -import org.apache.pinot.spi.config.table.IndexingConfig; -import org.apache.pinot.spi.config.table.SegmentPartitionConfig; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableCustomConfig; -import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,21 +45,6 @@ import org.slf4j.LoggerFactory; public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class); - private String _partitionColumn; - private int _numPartitions; - private String _partitionFunction; - - private String _sortingColumn; - private FieldSpec.DataType _sortingColumnType; - - private int _numOutputFiles; - private int _maxNumRecordsPerFile; - - private TableConfig _tableConfig; - private org.apache.pinot.spi.data.Schema _pinotTableSchema; - - private Set<DataPreprocessingUtils.Operation> _preprocessingOperations; - public HadoopSegmentPreprocessingJob(final Properties properties) { super(properties); } @@ -96,8 +67,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { // Cleans up preprocessed output dir if exists cleanUpPreprocessedOutputs(_preprocessedOutputDir); - DataPreprocessingHelper dataPreprocessingHelper = - DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); + HadoopDataPreprocessingHelper dataPreprocessingHelper = + HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); dataPreprocessingHelper .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction, _sortingColumn, _sortingColumnType, _numOutputFiles, _maxNumRecordsPerFile); @@ -126,124 +97,6 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); } - private void fetchPreProcessingOperations() { - _preprocessingOperations = new HashSet<>(); - TableCustomConfig customConfig = _tableConfig.getCustomConfig(); - if (customConfig != null) { - Map<String, String> customConfigMap = customConfig.getCustomConfigs(); - if (customConfigMap != null && !customConfigMap.isEmpty()) { - String preprocessingOperationsString = - customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); - DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); - } - } - } - - private void fetchPartitioningConfig() { - // Fetch partition info from table config. - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { - LOGGER.info("Partitioning is disabled."); - return; - } - SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); - if (segmentPartitionConfig != null) { - Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); - Preconditions - .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table."); - if (columnPartitionMap.size() == 1) { - _partitionColumn = columnPartitionMap.keySet().iterator().next(); - _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn); - _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); - } - } else { - LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); - } - } - - private void fetchSortingConfig() { - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { - LOGGER.info("Sorting is disabled."); - return; - } - // Fetch sorting info from table config first. - List<String> sortingColumns = new ArrayList<>(); - List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList(); - if (fieldConfigs != null && !fieldConfigs.isEmpty()) { - for (FieldConfig fieldConfig : fieldConfigs) { - if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { - sortingColumns.add(fieldConfig.getName()); - } - } - } - if (!sortingColumns.isEmpty()) { - Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table."); - _sortingColumn = sortingColumns.get(0); - return; - } - - // There is no sorted column specified in field configs, try to find sorted column from indexing config. - IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); - List<String> sortedColumns = indexingConfig.getSortedColumn(); - if (sortedColumns != null) { - Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); - if (sortedColumns.size() == 1) { - _sortingColumn = sortedColumns.get(0); - FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); - Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); - Preconditions - .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); - _sortingColumnType = fieldSpec.getDataType(); - Preconditions - .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType, - _sortingColumn); - LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); - } - } - } - - private void fetchResizingConfig() { - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) { - LOGGER.info("Resizing is disabled."); - return; - } - TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); - if (tableCustomConfig == null) { - _numOutputFiles = 0; - return; - } - Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); - if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { - _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); - Preconditions.checkState(_numOutputFiles > 0, String - .format("The value of %s should be positive! Current value: %s", - InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); - } else { - _numOutputFiles = 0; - } - - if (customConfigsMap != null) { - int maxNumRecords; - if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { - LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.", - InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, - InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE); - maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); - } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { - maxNumRecords = - Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); - } else { - return; - } - // TODO: add a in-built maximum value for this config to avoid having too many small files. - // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from generating too many small files. - Preconditions.checkArgument(maxNumRecords > 0, - "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE - + " should be positive. Current value: " + maxNumRecords); - LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); - _maxNumRecordsPerFile = maxNumRecords; - } - } - @Override protected Schema getSchema() throws IOException { @@ -265,15 +118,6 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { protected void addAdditionalJobProperties(Job job) { } - private void setTableConfigAndSchema() - throws IOException { - _tableConfig = getTableConfig(); - _pinotTableSchema = getSchema(); - - Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); - Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); - } - /** * Cleans up outputs in preprocessed output directory. */ diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index 6927d5d..01b5064 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -35,9 +35,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.pinot.common.utils.TarGzCompressionUtils; -import org.apache.pinot.hadoop.job.InternalConfigConstants; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentCreationJob; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig; import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java new file mode 100644 index 0000000..adaef88 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.avro.mapreduce.AvroMultipleOutputs; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopAvroDataPreprocessingHelper extends HadoopDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopAvroDataPreprocessingHelper.class); + + public HadoopAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpMapperReducerConfigs(Job job) + throws IOException { + Schema avroSchema = (Schema) getSchema(_dataPreprocessingHelper._sampleRawDataPath); + LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); + validateConfigsAgainstSchema(avroSchema); + + job.setInputFormatClass(AvroKeyInputFormat.class); + job.setMapperClass(AvroDataPreprocessingMapper.class); + + job.setReducerClass(AvroDataPreprocessingReducer.class); + AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); + AvroMultipleOutputs.setCountersEnabled(job, true); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); + job.setOutputKeyClass(AvroKey.class); + job.setOutputValueClass(NullWritable.class); + + AvroJob.setInputKeySchema(job, avroSchema); + AvroJob.setMapOutputValueSchema(job, avroSchema); + AvroJob.setOutputKeySchema(job, avroSchema); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java similarity index 51% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java index a505d09..5444738 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java @@ -19,7 +19,6 @@ package org.apache.pinot.hadoop.job.preprocess; import java.io.IOException; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; @@ -31,67 +30,30 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; -import org.apache.pinot.hadoop.utils.preprocess.TextComparator; -import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.partitioners.GenericPartitioner; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.utils.preprocess.TextComparator; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class DataPreprocessingHelper { - private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class); +public abstract class HadoopDataPreprocessingHelper implements HadoopJobPreparer { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelper.class); - String _partitionColumn; - int _numPartitions; - String _partitionFunction; + protected DataPreprocessingHelper _dataPreprocessingHelper; - String _sortingColumn; - private FieldSpec.DataType _sortingColumnType; - - private int _numOutputFiles; - private int _maxNumRecordsPerFile; - - private TableConfig _tableConfig; - private Schema _pinotTableSchema; - - List<Path> _inputDataPaths; - Path _sampleRawDataPath; - Path _outputPath; - - public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) { - _inputDataPaths = inputDataPaths; - _sampleRawDataPath = inputDataPaths.get(0); - _outputPath = outputPath; - } - - public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, - String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, - int maxNumRecordsPerFile) { - _tableConfig = tableConfig; - _pinotTableSchema = tableSchema; - _partitionColumn = partitionColumn; - _numPartitions = numPartitions; - _partitionFunction = partitionFunction; - - _sortingColumn = sortingColumn; - _sortingColumnType = sortingColumnType; - - _numOutputFiles = numOutputFiles; - _maxNumRecordsPerFile = maxNumRecordsPerFile; + public HadoopDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + _dataPreprocessingHelper = dataPreprocessingHelper; } public Job setUpJob() @@ -100,21 +62,21 @@ public abstract class DataPreprocessingHelper { Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION); Configuration jobConf = job.getConfiguration(); // Input and output paths. - int numInputPaths = _inputDataPaths.size(); + int numInputPaths = _dataPreprocessingHelper._inputDataPaths.size(); jobConf.setInt(JobContext.NUM_MAPS, numInputPaths); - setValidationConfigs(job, _sampleRawDataPath); - for (Path inputFile : _inputDataPaths) { + _dataPreprocessingHelper.setValidationConfigs(job, _dataPreprocessingHelper._sampleRawDataPath); + for (Path inputFile : _dataPreprocessingHelper._inputDataPaths) { FileInputFormat.addInputPath(job, inputFile); } setHadoopJobConfigs(job); // Sorting column - if (_sortingColumn != null) { - LOGGER.info("Adding sorting column: {} to job config", _sortingColumn); - jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn); - jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name()); + if (_dataPreprocessingHelper._sortingColumn != null) { + LOGGER.info("Adding sorting column: {} to job config", _dataPreprocessingHelper._sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _dataPreprocessingHelper._sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _dataPreprocessingHelper._sortingColumnType.name()); - switch (_sortingColumnType) { + switch (_dataPreprocessingHelper._sortingColumnType) { case INT: job.setMapOutputKeyClass(IntWritable.class); break; @@ -140,27 +102,27 @@ public abstract class DataPreprocessingHelper { // Partition column int numReduceTasks = 0; - if (_partitionColumn != null) { - numReduceTasks = _numPartitions; + if (_dataPreprocessingHelper._partitionColumn != null) { + numReduceTasks = _dataPreprocessingHelper._numPartitions; jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); job.setPartitionerClass(GenericPartitioner.class); - jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn); - if (_partitionFunction != null) { - jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction); + jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _dataPreprocessingHelper._partitionColumn); + if (_dataPreprocessingHelper._partitionFunction != null) { + jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _dataPreprocessingHelper._partitionFunction); } jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks); - job.setPartitionerClass(getPartitioner()); + job.setPartitionerClass(_dataPreprocessingHelper.getPartitioner()); } else { - if (_numOutputFiles > 0) { - numReduceTasks = _numOutputFiles; + if (_dataPreprocessingHelper._numOutputFiles > 0) { + numReduceTasks = _dataPreprocessingHelper._numOutputFiles; } else { // default number of input paths - numReduceTasks = _inputDataPaths.size(); + numReduceTasks = _dataPreprocessingHelper._inputDataPaths.size(); } } // Maximum number of records per output file - jobConf - .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile)); + jobConf.set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, + Integer.toString(_dataPreprocessingHelper._maxNumRecordsPerFile)); // Number of reducers LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); job.setNumReduceTasks(numReduceTasks); @@ -170,50 +132,10 @@ public abstract class DataPreprocessingHelper { return job; } - abstract Class<? extends Partitioner> getPartitioner(); - - abstract void setUpMapperReducerConfigs(Job job) - throws IOException; - - abstract String getSampleTimeColumnValue(String timeColumnName) - throws IOException; - - private void setValidationConfigs(Job job, Path path) - throws IOException { - SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); - - // TODO: Serialize and deserialize validation config by creating toJson and fromJson - // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one, - // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name - // and value - if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { - job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); - String timeColumnName = validationConfig.getTimeColumnName(); - job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); - if (timeColumnName != null) { - DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); - if (dateTimeFieldSpec != null) { - DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); - job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); - job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); - } - } - job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, - IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); - - String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName); - if (sampleTimeColumnValue != null) { - job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue); - } - } - } - private void setHadoopJobConfigs(Job job) { job.setJarByClass(HadoopSegmentPreprocessingJob.class); job.setJobName(getClass().getName()); - FileOutputFormat.setOutputPath(job, _outputPath); + FileOutputFormat.setOutputPath(job, _dataPreprocessingHelper._outputPath); job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); // Turn this on to always firstly use class paths that user specifies. job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); @@ -225,4 +147,21 @@ public abstract class DataPreprocessingHelper { job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); } } + + public Object getSchema(Path inputPathDir) + throws IOException { + return _dataPreprocessingHelper.getSchema(inputPathDir); + } + + public void validateConfigsAgainstSchema(Object schema) { + _dataPreprocessingHelper.validateConfigsAgainstSchema(schema); + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, + int maxNumRecordsPerFile) { + _dataPreprocessingHelper + .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction, sortingColumn, + sortingColumnType, numOutputFiles, maxNumRecordsPerFile); + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java similarity index 73% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java index 2e91773..faae46d 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java @@ -22,15 +22,17 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils; +import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DataPreprocessingHelperFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class); +public class HadoopDataPreprocessingHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelperFactory.class); - public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) + public static HadoopDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) throws IOException { final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION); final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION); @@ -46,10 +48,10 @@ public class DataPreprocessingHelperFactory { if (numAvroFiles > 0) { LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths); - return new AvroDataPreprocessingHelper(avroFiles, outputPath); + return new HadoopAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath)); } else { LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths); - return new OrcDataPreprocessingHelper(orcFiles, outputPath); + return new HadoopOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath)); } } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java similarity index 59% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java index 0596259..c4834cf 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java @@ -16,26 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.hadoop.job.preprocess; import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapreduce.Job; -public class HadoopUtils { - private HadoopUtils() { - } +public interface HadoopJobPreparer { - public static final Configuration DEFAULT_CONFIGURATION; - public static final FileSystem DEFAULT_FILE_SYSTEM; - - static { - DEFAULT_CONFIGURATION = new Configuration(); - try { - DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); - } catch (IOException e) { - throw new IllegalStateException("Failed to get the default file system", e); - } - } + void setUpMapperReducerConfigs(Job job) throws IOException; } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java new file mode 100644 index 0000000..0d9ee78 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.orc.OrcConf; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.orc.mapreduce.OrcInputFormat; +import org.apache.orc.mapreduce.OrcOutputFormat; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopOrcDataPreprocessingHelper extends HadoopDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopOrcDataPreprocessingHelper.class); + + public HadoopOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpMapperReducerConfigs(Job job) + throws IOException { + Object orcSchema = getSchema(_dataPreprocessingHelper._sampleRawDataPath); + String orcSchemaString = orcSchema.toString(); + LOGGER.info("Orc schema is: {}", orcSchemaString); + validateConfigsAgainstSchema(orcSchema); + + job.setInputFormatClass(OrcInputFormat.class); + job.setMapperClass(OrcDataPreprocessingMapper.class); + job.setMapOutputValueClass(OrcValue.class); + Configuration jobConf = job.getConfiguration(); + OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + + job.setReducerClass(OrcDataPreprocessingReducer.class); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(OrcStruct.class); + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml index 284b698..13205c5 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml @@ -117,5 +117,20 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> +<!-- <exclusions>--> +<!-- <exclusion>--> +<!-- <groupId>org.eclipse.jetty</groupId>--> +<!-- <artifactId>jetty-util</artifactId>--> +<!-- </exclusion>--> +<!-- </exclusions>--> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </dependency> </dependencies> </project> diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java index 2e3b023..fe51e00 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java @@ -21,11 +21,25 @@ package org.apache.pinot.ingestion.jobs; import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableCustomConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +52,7 @@ import org.slf4j.LoggerFactory; * * enable.preprocessing: false by default. Enables preprocessing job. */ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { - private static final Logger _logger = LoggerFactory.getLogger(SegmentPreprocessingJob.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingJob.class); protected final Path _schemaFile; protected final Path _inputSegmentDir; protected final Path _preprocessedOutputDir; @@ -46,6 +60,21 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { protected final Path _pathToDependencyJar; protected boolean _enablePreprocessing; + protected String _partitionColumn; + protected int _numPartitions; + protected String _partitionFunction; + + protected String _sortingColumn; + protected FieldSpec.DataType _sortingColumnType; + + protected int _numOutputFiles; + protected int _maxNumRecordsPerFile; + + protected TableConfig _tableConfig; + protected org.apache.pinot.spi.data.Schema _pinotTableSchema; + + protected Set<DataPreprocessingUtils.Operation> _preprocessingOperations; + public SegmentPreprocessingJob(final Properties properties) { super(properties); @@ -59,13 +88,13 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR); _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA); - _logger.info("*********************************************************************"); - _logger.info("enable.preprocessing: {}", _enablePreprocessing); - _logger.info("path.to.input: {}", _inputSegmentDir); - _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir); - _logger.info("path.to.deps.jar: {}", _pathToDependencyJar); - _logger.info("push.locations: {}", _pushLocations); - _logger.info("*********************************************************************"); + LOGGER.info("*********************************************************************"); + LOGGER.info("enable.preprocessing: {}", _enablePreprocessing); + LOGGER.info("path.to.input: {}", _inputSegmentDir); + LOGGER.info("preprocess.path.to.output: {}", _preprocessedOutputDir); + LOGGER.info("path.to.deps.jar: {}", _pathToDependencyJar); + LOGGER.info("push.locations: {}", _pushLocations); + LOGGER.info("*********************************************************************"); } protected abstract void run() @@ -90,4 +119,131 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { // TODO: support orc format in the future. return fileName.endsWith(".avro"); } + + protected void setTableConfigAndSchema() + throws IOException { + _tableConfig = getTableConfig(); + _pinotTableSchema = getSchema(); + + Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); + Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); + } + + protected void fetchPreProcessingOperations() { + _preprocessingOperations = new HashSet<>(); + TableCustomConfig customConfig = _tableConfig.getCustomConfig(); + if (customConfig != null) { + Map<String, String> customConfigMap = customConfig.getCustomConfigs(); + if (customConfigMap != null && !customConfigMap.isEmpty()) { + String preprocessingOperationsString = + customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); + DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); + } + } + } + + protected void fetchPartitioningConfig() { + // Fetch partition info from table config. + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { + LOGGER.info("Partitioning is disabled."); + return; + } + SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig != null) { + Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); + Preconditions + .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table."); + if (columnPartitionMap.size() == 1) { + _partitionColumn = columnPartitionMap.keySet().iterator().next(); + _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn); + _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); + } + } else { + LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); + } + } + + protected void fetchSortingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { + LOGGER.info("Sorting is disabled."); + return; + } + // Fetch sorting info from table config first. + List<String> sortingColumns = new ArrayList<>(); + List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList(); + if (fieldConfigs != null && !fieldConfigs.isEmpty()) { + for (FieldConfig fieldConfig : fieldConfigs) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { + sortingColumns.add(fieldConfig.getName()); + } + } + } + if (!sortingColumns.isEmpty()) { + Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table."); + _sortingColumn = sortingColumns.get(0); + return; + } + + // There is no sorted column specified in field configs, try to find sorted column from indexing config. + IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); + List<String> sortedColumns = indexingConfig.getSortedColumn(); + if (sortedColumns != null) { + Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); + if (sortedColumns.size() == 1) { + _sortingColumn = sortedColumns.get(0); + FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); + Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); + Preconditions + .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); + _sortingColumnType = fieldSpec.getDataType(); + Preconditions + .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType, + _sortingColumn); + LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); + } + } + } + + protected void fetchResizingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) { + LOGGER.info("Resizing is disabled."); + return; + } + TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); + if (tableCustomConfig == null) { + _numOutputFiles = 0; + return; + } + Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); + if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { + _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); + Preconditions.checkState(_numOutputFiles > 0, String + .format("The value of %s should be positive! Current value: %s", + InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); + } else { + _numOutputFiles = 0; + } + + if (customConfigsMap != null) { + int maxNumRecords; + if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { + LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.", + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE); + maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); + } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { + maxNumRecords = + Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); + } else { + return; + } + // TODO: add a in-built maximum value for this config to avoid having too many small files. + // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from generating too many small files. + Preconditions.checkArgument(maxNumRecords > 0, + "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE + + " should be positive. Current value: " + maxNumRecords); + LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); + _maxNumRecordsPerFile = maxNumRecords; + } + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java similarity index 74% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java index 9e5f5f2..3be4768 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; @@ -26,23 +26,13 @@ import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroJob; -import org.apache.avro.mapreduce.AvroKeyInputFormat; -import org.apache.avro.mapreduce.AvroKeyOutputFormat; -import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper; -import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner; -import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.preprocess.partitioners.AvroDataPreprocessingPartitioner; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,30 +51,19 @@ public class AvroDataPreprocessingHelper extends DataPreprocessingHelper { } @Override - public void setUpMapperReducerConfigs(Job job) + public Object getSchema(Path inputPathDir) throws IOException { - Schema avroSchema = getAvroSchema(_sampleRawDataPath); - LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); - validateConfigsAgainstSchema(avroSchema); - - job.setInputFormatClass(AvroKeyInputFormat.class); - job.setMapperClass(AvroDataPreprocessingMapper.class); - - job.setReducerClass(AvroDataPreprocessingReducer.class); - AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); - AvroMultipleOutputs.setCountersEnabled(job, true); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); - job.setOutputKeyClass(AvroKey.class); - job.setOutputValueClass(NullWritable.class); + return getAvroSchema(inputPathDir); + } - AvroJob.setInputKeySchema(job, avroSchema); - AvroJob.setMapOutputValueSchema(job, avroSchema); - AvroJob.setOutputKeySchema(job, avroSchema); + @Override + public void validateConfigsAgainstSchema(Object schema) { + Schema avroSchema = (Schema) schema; + validateConfigsAgainstSchema(avroSchema); } @Override - String getSampleTimeColumnValue(String timeColumnName) + public String getSampleTimeColumnValue(String timeColumnName) throws IOException { String sampleTimeColumnValue; try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(_sampleRawDataPath)) { @@ -99,7 +78,7 @@ public class AvroDataPreprocessingHelper extends DataPreprocessingHelper { * @return Input schema * @throws IOException exception when accessing to IO */ - private Schema getAvroSchema(Path inputPathDir) + protected Schema getAvroSchema(Path inputPathDir) throws IOException { Schema avroSchema = null; for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java new file mode 100644 index 0000000..cb0a738 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.preprocess; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class DataPreprocessingHelper implements SampleTimeColumnExtractable { + private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class); + + public String _partitionColumn; + public int _numPartitions; + public String _partitionFunction; + + public String _sortingColumn; + public FieldSpec.DataType _sortingColumnType; + + public int _numOutputFiles; + public int _maxNumRecordsPerFile; + + public TableConfig _tableConfig; + public Schema _pinotTableSchema; + + public List<Path> _inputDataPaths; + public Path _sampleRawDataPath; + public Path _outputPath; + + public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) { + _inputDataPaths = inputDataPaths; + _sampleRawDataPath = inputDataPaths.get(0); + _outputPath = outputPath; + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, + int maxNumRecordsPerFile) { + _tableConfig = tableConfig; + _pinotTableSchema = tableSchema; + _partitionColumn = partitionColumn; + _numPartitions = numPartitions; + _partitionFunction = partitionFunction; + + _sortingColumn = sortingColumn; + _sortingColumnType = sortingColumnType; + + _numOutputFiles = numOutputFiles; + _maxNumRecordsPerFile = maxNumRecordsPerFile; + } + + public abstract Class<? extends Partitioner> getPartitioner(); + + abstract public Object getSchema(Path inputPathDir) + throws IOException; + + abstract public void validateConfigsAgainstSchema(Object schema); + + public void setValidationConfigs(Job job, Path path) + throws IOException { + SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); + + // TODO: Serialize and deserialize validation config by creating toJson and fromJson + // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one, + // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name + // and value + if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { + job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); + String timeColumnName = validationConfig.getTimeColumnName(); + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); + if (timeColumnName != null) { + DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); + if (dateTimeFieldSpec != null) { + DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); + job.getConfiguration() + .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); + } + } + job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, + IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); + + String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName); + if (sampleTimeColumnValue != null) { + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue); + } + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java similarity index 95% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java index 2e91773..1cb07e3 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java similarity index 82% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java index aec0bb0..f7f287b 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java @@ -16,13 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -30,23 +29,13 @@ import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; -import org.apache.orc.mapred.OrcStruct; -import org.apache.orc.mapred.OrcValue; -import org.apache.orc.mapreduce.OrcInputFormat; -import org.apache.orc.mapreduce.OrcOutputFormat; -import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper; -import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner; -import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.preprocess.partitioners.OrcDataPreprocessingPartitioner; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.apache.pinot.spi.utils.StringUtils; import org.slf4j.Logger; @@ -61,33 +50,24 @@ public class OrcDataPreprocessingHelper extends DataPreprocessingHelper { } @Override - Class<? extends Partitioner> getPartitioner() { + public Class<? extends Partitioner> getPartitioner() { return OrcDataPreprocessingPartitioner.class; } @Override - void setUpMapperReducerConfigs(Job job) { - TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath); - String orcSchemaString = orcSchema.toString(); - LOGGER.info("Orc schema is: {}", orcSchemaString); - validateConfigsAgainstSchema(orcSchema); - - job.setInputFormatClass(OrcInputFormat.class); - job.setMapperClass(OrcDataPreprocessingMapper.class); - job.setMapOutputValueClass(OrcValue.class); - Configuration jobConf = job.getConfiguration(); - OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + public Object getSchema(Path inputPathDir) + throws IOException { + return getOrcSchema(inputPathDir); + } - job.setReducerClass(OrcDataPreprocessingReducer.class); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(OrcStruct.class); - OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); + @Override + public void validateConfigsAgainstSchema(Object schema) { + TypeDescription orcSchema = (TypeDescription) schema; + validateConfigsAgainstSchema(orcSchema); } @Override - String getSampleTimeColumnValue(String timeColumnName) + public String getSampleTimeColumnValue(String timeColumnName) throws IOException { try (Reader reader = OrcFile .createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java similarity index 59% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java index 0596259..12efa99 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java @@ -16,26 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.preprocess; import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -public class HadoopUtils { - private HadoopUtils() { - } +public interface SampleTimeColumnExtractable { - public static final Configuration DEFAULT_CONFIGURATION; - public static final FileSystem DEFAULT_FILE_SYSTEM; - - static { - DEFAULT_CONFIGURATION = new Configuration(); - try { - DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); - } catch (IOException e) { - throw new IllegalStateException("Failed to get the default file system", e); - } - } + String getSampleTimeColumnValue(String timeColumnName) throws IOException; } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java index 6278e8e..759977d 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.spi.data.FieldSpec; import org.slf4j.Logger; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java similarity index 93% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java index d7d0694..d140765 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -27,9 +27,9 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; -import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.preprocess.OrcUtils; import org.apache.pinot.spi.data.FieldSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java index 3d3fcec..2bc6739 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -30,8 +30,8 @@ import org.apache.avro.mapreduce.AvroJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.pinot.hadoop.job.InternalConfigConstants; import org.apache.pinot.ingestion.common.JobConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java index 74799c7..d833964 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import com.google.common.base.Preconditions; import org.apache.avro.generic.GenericRecord; @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java similarity index 88% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java index 7ca22cf..f9c1467 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroValue; @@ -27,9 +27,9 @@ import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.hadoop.job.InternalConfigConstants.NUM_PARTITIONS_CONFIG; -import static org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_COLUMN_CONFIG; -import static org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_FUNCTION_CONFIG; +import static org.apache.pinot.ingestion.utils.InternalConfigConstants.NUM_PARTITIONS_CONFIG; +import static org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_COLUMN_CONFIG; +import static org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_FUNCTION_CONFIG; public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java index bef2cef..d9b509d 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import com.google.common.base.Preconditions; import java.util.List; @@ -26,8 +26,8 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.preprocess.OrcUtils; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java index 0ba57db..1826bdc 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import java.util.HashMap; import java.util.Map; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java index 5fcbf10..4f236a4 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.reducers; +package org.apache.pinot.ingestion.preprocess.reducers; import java.io.IOException; import org.apache.avro.generic.GenericRecord; @@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java index a3387a2..10d3f10 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.reducers; +package org.apache.pinot.ingestion.preprocess.reducers; import java.io.IOException; import org.apache.commons.lang3.RandomStringUtils; @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java index 4cc6f75..8450179 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils; import java.util.Set; import org.apache.hadoop.io.DoubleWritable; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java index 3701db2..e85a3e8 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job; +package org.apache.pinot.ingestion.utils; /** * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java similarity index 97% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java index 58e1c1d..a6e4ca7 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java similarity index 96% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java index 0596259..e4cdf5e 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import java.io.IOException; import org.apache.hadoop.conf.Configuration; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java index dcfc3b5..09eb218 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.io.BooleanWritable; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java index 65f1222..05a437d 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml index 13db9ea..e302399 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml @@ -106,6 +106,10 @@ <groupId>org.apache.pinot</groupId> <artifactId>pinot-common</artifactId> </exclusion> + <exclusion> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -282,6 +286,10 @@ </exclusions> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + </dependency> <!--Test--> <dependency> @@ -379,6 +387,14 @@ <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> </exclusions> <scope>test</scope> </dependency> @@ -463,8 +479,16 @@ <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </exclusion> </exclusions> - <scope>test</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> @@ -480,7 +504,7 @@ <artifactId>objenesis</artifactId> </exclusion> </exclusions> - <scope>test</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.derby</groupId> diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java new file mode 100644 index 0000000..39ebb58 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; +import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelper; +import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelperFactory; +import org.apache.pinot.spark.utils.HadoopUtils; +import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Spark job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format. + * Thus, the output files are partitioned, sorted, resized after this job. + * In order to run this job, the following configs need to be specified in job properties: + * * enable.preprocessing: false by default. Enables preprocessing job. + */ +public class SparkSegmentPreprocessingJob extends SegmentPreprocessingJob { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentPreprocessingJob.class); + + public SparkSegmentPreprocessingJob(Properties properties) { + super(properties); + } + + @Override + protected void run() + throws Exception { + if (!_enablePreprocessing) { + LOGGER.info("Pre-processing job is disabled."); + return; + } else { + LOGGER.info("Starting {}", getClass().getSimpleName()); + } + + setTableConfigAndSchema(); + fetchPreProcessingOperations(); + fetchPartitioningConfig(); + fetchSortingConfig(); + fetchResizingConfig(); + + // Cleans up preprocessed output dir if exists + cleanUpPreprocessedOutputs(_preprocessedOutputDir); + + SparkDataPreprocessingHelper dataPreprocessingHelper = + SparkDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); + dataPreprocessingHelper + .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction, + _sortingColumn, _sortingColumnType, _numOutputFiles, _maxNumRecordsPerFile); + + // Set up and execute spark job. + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); + addDepsJarToDistributedCache(javaSparkContext); + + SparkSession sparkSession = SparkSession.builder().appName(SparkSegmentPreprocessingJob.class.getSimpleName()).getOrCreate(); + + dataPreprocessingHelper.setUpAndExecuteJob(sparkSession); + } + + /** + * Cleans up outputs in preprocessed output directory. + */ + public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir) + throws IOException { + if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) { + LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir); + HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true); + } + } + + protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext) + throws IOException { + if (_pathToDependencyJar != null) { + PinotSparkJobPreparationHelper + .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, sparkContext, + _pathToDependencyJar); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java similarity index 58% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java index 0596259..23f17e8 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java @@ -16,26 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.spark.jobs.preprocess; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; -public class HadoopUtils { - private HadoopUtils() { +public class SparkAvroDataPreprocessingHelper extends SparkDataPreprocessingHelper { + public SparkAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); } - public static final Configuration DEFAULT_CONFIGURATION; - public static final FileSystem DEFAULT_FILE_SYSTEM; - - static { - DEFAULT_CONFIGURATION = new Configuration(); - try { - DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); - } catch (IOException e) { - throw new IllegalStateException("Failed to get the default file system", e); - } + @Override + public String getDataFormat() { + return "avro"; } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java new file mode 100644 index 0000000..7a7a564 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingComparator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import com.google.common.collect.Ordering; +import java.io.Serializable; + + +public class SparkDataPreprocessingComparator extends Ordering<Object> implements Serializable { + @Override + public int compare(Object left, Object right) { + Object value1 = ((SparkDataPreprocessingJobKey) left).getSortedColumn(); + Object value2 = ((SparkDataPreprocessingJobKey) right).getSortedColumn(); + if (value1 == null) { + return 0; + } + if (value1 instanceof Integer) { + return Integer.compare((int) value1, (int) value2); + } else if (value1 instanceof Long) { + return Long.compare((long) value1, (long) value2); + } else if (value1 instanceof Float) { + return Float.compare((float) value1, (float) value2); + } else if (value1 instanceof Double) { + return Double.compare((double) value1, (double) value2); + } else if (value1 instanceof Short) { + return Short.compare((short) value1, (short) value2); + } else if (value1 instanceof String) { + return ((String) value1).compareTo((String) value2); + } + throw new RuntimeException("Unsupported Data type: " + value1.getClass().getName()); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java new file mode 100644 index 0000000..6b93f7e --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.partitioners.PartitionFunctionFactory; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + + +public abstract class SparkDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelper.class); + + protected DataPreprocessingHelper _dataPreprocessingHelper; + + public SparkDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + _dataPreprocessingHelper = dataPreprocessingHelper; + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, + int maxNumRecordsPerFile) { + _dataPreprocessingHelper + .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction, sortingColumn, + sortingColumnType, numOutputFiles, maxNumRecordsPerFile); + } + + public void setUpAndExecuteJob(SparkSession sparkSession) { + // Read data into data frame. + Dataset<Row> dataFrame = + sparkSession.read().format(getDataFormat()).load(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths)); + JavaRDD<Row> javaRDD = dataFrame.javaRDD(); + + // Find positions of partition column and sorting column if specified. + StructType schema = dataFrame.schema(); + StructField[] fields = schema.fields(); + int partitionColumnPosition = -1; + int sortingColumnPosition = -1; + PartitionFunction partitionFunction = null; + for (int i = 0; i <= fields.length; i++) { + StructField field = fields[i]; + if (_dataPreprocessingHelper._partitionColumn != null && _dataPreprocessingHelper._partitionColumn + .equalsIgnoreCase(field.name())) { + partitionColumnPosition = i; + partitionFunction = PartitionFunctionFactory + .getPartitionFunction(_dataPreprocessingHelper._partitionFunction, _dataPreprocessingHelper._numPartitions); + } + if (_dataPreprocessingHelper._sortingColumn != null && _dataPreprocessingHelper._sortingColumn + .equalsIgnoreCase(field.name())) { + sortingColumnPosition = i; + } + } + int numPartitions; + if (partitionColumnPosition == -1) { + if (_dataPreprocessingHelper._numOutputFiles > 0) { + numPartitions = _dataPreprocessingHelper._numOutputFiles; + } else { + numPartitions = javaRDD.getNumPartitions(); + } + } else { + numPartitions = _dataPreprocessingHelper._numPartitions; + } + final int finalNumPartitions = numPartitions; + final int finalPartitionColumnPosition = partitionColumnPosition; + final int finalSortingColumnPosition = sortingColumnPosition; + LOGGER.info("Number of partitions: " + finalNumPartitions); + LOGGER.info("Position of partition column (if specified): " + finalPartitionColumnPosition); + LOGGER.info("Position of sorting column (if specified): " + finalSortingColumnPosition); + SparkDataPreprocessingPartitioner sparkPartitioner = + new SparkDataPreprocessingPartitioner(finalNumPartitions, partitionFunction); + + // Convert to java pair rdd. + JavaPairRDD<Object, Row> pairRDD = javaRDD.mapToPair((PairFunction<Row, Object, Row>) row -> { + Object partitionColumnValue = null; + Object sortingColumnValue = null; + + if (_dataPreprocessingHelper._partitionColumn != null) { + partitionColumnValue = row.get(finalPartitionColumnPosition); + } + int partitionId = sparkPartitioner.generatePartitionId(partitionColumnValue); + if (_dataPreprocessingHelper._sortingColumn != null) { + sortingColumnValue = row.get(finalSortingColumnPosition); + } + return new Tuple2<>(new SparkDataPreprocessingJobKey(partitionId, sortingColumnValue), row); + }); + + // Repartition and sort within partitions. + Comparator<Object> comparator = new SparkDataPreprocessingComparator(); + JavaPairRDD<Object, Row> partitionedSortedPairRDD = + pairRDD.repartitionAndSortWithinPartitions(sparkPartitioner, comparator); + + // TODO: support preprocessing.max.num.records.per.file before writing back to storage + // Write to output path. + partitionedSortedPairRDD.values().saveAsTextFile(_dataPreprocessingHelper._outputPath.toString()); + } + + private Seq<String> convertPathsToStrings(List<Path> paths) { + List<String> stringList = new ArrayList<>(); + for (Path path : paths) { + stringList.add(path.toString()); + } + return toSeq(stringList); + } + + /** + * Helper to wrap a Java collection into a Scala Seq + * + * @param collection java collection + * @param <T> collection item type + * @return Scala Seq of type T + */ + public static <T> Seq<T> toSeq(Collection<T> collection) { + return JavaConverters.asScalaBufferConverter(new ArrayList<>(collection)).asScala().toSeq(); + } + + public abstract String getDataFormat(); +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java similarity index 71% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java index 2e91773..f902bbe 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java @@ -16,21 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.spark.jobs.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils; +import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DataPreprocessingHelperFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class); +public class SparkDataPreprocessingHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelperFactory.class); - public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) + public static SparkDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) throws IOException { final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION); final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION); @@ -46,10 +48,10 @@ public class DataPreprocessingHelperFactory { if (numAvroFiles > 0) { LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths); - return new AvroDataPreprocessingHelper(avroFiles, outputPath); + return new SparkAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath)); } else { LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths); - return new OrcDataPreprocessingHelper(orcFiles, outputPath); + return new SparkOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath)); } } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java similarity index 58% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java index 0596259..d4e76a2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingJobKey.java @@ -16,26 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.spark.jobs.preprocess; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +public class SparkDataPreprocessingJobKey { + private final Object _partitionId; + private final Object _sortedColumn; - -public class HadoopUtils { - private HadoopUtils() { + public SparkDataPreprocessingJobKey(Object partitionId, Object sortedColumn) { + _partitionId = partitionId; + _sortedColumn = sortedColumn; } - public static final Configuration DEFAULT_CONFIGURATION; - public static final FileSystem DEFAULT_FILE_SYSTEM; + public Object getPartitionId() { + return _partitionId; + } - static { - DEFAULT_CONFIGURATION = new Configuration(); - try { - DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); - } catch (IOException e) { - throw new IllegalStateException("Failed to get the default file system", e); - } + public Object getSortedColumn() { + return _sortedColumn; } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java new file mode 100644 index 0000000..60a180a --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingPartitioner.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs.preprocess; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.spark.Partitioner; + + +public class SparkDataPreprocessingPartitioner extends Partitioner { + private final int _numPartitions; + private final PartitionFunction _partitionFunction; + private final AtomicInteger _counter = new AtomicInteger(0); + + public SparkDataPreprocessingPartitioner(int numPartitions, PartitionFunction partitionFunction) { + _numPartitions = numPartitions; + _partitionFunction = partitionFunction; + } + + @Override + public int numPartitions() { + return _numPartitions; + } + + @Override + public int getPartition(Object key) { + SparkDataPreprocessingJobKey jobKey = (SparkDataPreprocessingJobKey) key; + return (int) jobKey.getPartitionId(); + } + + public int generatePartitionId(Object key) { + if (key == null) { + // Need to distribute evenly for data with the default partition key value. + // We may want to partition and sort on a non-primary key. + return Math.abs(_counter.getAndIncrement()) % _numPartitions; + } else { + return _partitionFunction.getPartition(key); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java similarity index 58% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java index 0596259..3f84aad 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java @@ -16,26 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.spark.jobs.preprocess; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; -public class HadoopUtils { - private HadoopUtils() { +public class SparkOrcDataPreprocessingHelper extends SparkDataPreprocessingHelper { + public SparkOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); } - public static final Configuration DEFAULT_CONFIGURATION; - public static final FileSystem DEFAULT_FILE_SYSTEM; - - static { - DEFAULT_CONFIGURATION = new Configuration(); - try { - DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION); - } catch (IOException e) { - throw new IllegalStateException("Failed to get the default file system", e); - } + @Override + public String getDataFormat() { + return "orc"; } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java index 0596259..8ee89cf 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.spark.utils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
