Jackie-Jiang commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r485258306



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);
+
+  /**
+   * Provides an iterator for the GenericRows in the collection
+   */
+  Iterator<GenericRow> iterator();

Review comment:
       Recommend combining `iterator()` and `finish()` into one method because 
we always need to call `finish()` then `iterator()` (maybe remove `finish()` 
and move the sorting logic into `iterator()`)

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A sorter for GenericRows
+ */
+public class GenericRowSorter {
+
+  private final Comparator<GenericRow> _genericRowComparator;
+
+  public GenericRowSorter(List<String> sortOrder, Schema schema) {
+    int sortOrderSize = sortOrder.size();
+    Comparator[] comparators = new Comparator[sortOrderSize];
+    for (int i = 0; i < sortOrderSize; i++) {
+      String column = sortOrder.get(i);
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot use 
multi value column: %s for sorting", column);
+      comparators[i] = getComparator(fieldSpec.getDataType());

Review comment:
       From the past experience, storing `dataType` and do per-value switch is 
faster than storing `Comparator`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A sorter for GenericRows
+ */
+public class GenericRowSorter {
+
+  private final Comparator<GenericRow> _genericRowComparator;
+
+  public GenericRowSorter(List<String> sortOrder, Schema schema) {
+    int sortOrderSize = sortOrder.size();
+    Comparator[] comparators = new Comparator[sortOrderSize];
+    for (int i = 0; i < sortOrderSize; i++) {
+      String column = sortOrder.get(i);
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot use 
multi value column: %s for sorting", column);
+      comparators[i] = getComparator(fieldSpec.getDataType());
+    }
+    _genericRowComparator = (o1, o2) -> {
+      for (int i = 0; i < comparators.length; i++) {
+        String column = sortOrder.get(i);
+        int result = comparators[i].compare(o1.getValue(column), 
o2.getValue(column));
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    };
+  }
+
+  private Comparator getComparator(FieldSpec.DataType dataType) {
+    switch (dataType) {
+
+      case INT:
+        return Comparator.comparingInt(o -> (int) o);
+      case LONG:
+        return Comparator.comparingLong(o -> (long) o);
+      case FLOAT:
+        return (o1, o2) -> Float.compare((float) o1, (float) o2);
+      case DOUBLE:
+        return Comparator.comparingDouble(o -> (double) o);
+      case STRING:
+        return Comparator.comparing(o -> ((String) o));
+      default:

Review comment:
       Add BYTES support `ByteArray.compare()`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import 
org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final RecordTransformerConfig _recordTransformerConfig;
+  private final RecordFilterConfig _recordFilterConfig;
+  private final PartitioningConfig _partitioningConfig;
+  private final CollectorConfig _collectorConfig;
+  private final SegmentConfig _segmentConfig;
+
+  private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+      RecordTransformerConfig recordTransformerConfig, RecordFilterConfig 
recordFilterConfig,
+      PartitioningConfig partitioningConfig, CollectorConfig collectorConfig, 
SegmentConfig segmentConfig) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _recordTransformerConfig = recordTransformerConfig;
+    _recordFilterConfig = recordFilterConfig;
+    _partitioningConfig = partitioningConfig;
+    _collectorConfig = collectorConfig;
+    _segmentConfig = segmentConfig;
+  }
+
+  /**
+   * The Pinot table config
+   */
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  /**
+   * The Pinot schema
+   */
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  /**
+   * The RecordTransformerConfig for the SegmentProcessorFramework's map phase
+   */
+  public RecordTransformerConfig getRecordTransformerConfig() {
+    return _recordTransformerConfig;
+  }
+
+  /**
+   * The RecordFilterConfig to filter records
+   */
+  public RecordFilterConfig getRecordFilterConfig() {
+    return _recordFilterConfig;
+  }
+
+  /**
+   * The PartitioningConfig for the SegmentProcessorFramework's map phase
+   */
+  public PartitioningConfig getPartitioningConfig() {
+    return _partitioningConfig;
+  }
+
+  /**
+   * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+   */
+  public CollectorConfig getCollectorConfig() {
+    return _collectorConfig;
+  }
+
+  /**
+   * The SegmentConfig for the SegmentProcessorFramework's segment generation 
phase
+   */
+  public SegmentConfig getSegmentConfig() {
+    return _segmentConfig;
+  }
+
+  /**
+   * Builder for SegmentProcessorConfig
+   */
+  public static class Builder {
+    private TableConfig tableConfig;
+    private Schema schema;
+    private RecordTransformerConfig recordTransformerConfig;
+    private RecordFilterConfig recordFilterConfig;
+    private PartitioningConfig partitioningConfig;
+    private CollectorConfig collectorConfig;
+    private SegmentConfig _segmentConfig;
+
+    public Builder setTableConfig(TableConfig tableConfig) {
+      this.tableConfig = tableConfig;

Review comment:
       Same for other places
   ```suggestion
         _tableConfig = tableConfig;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import 
org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final RecordTransformerConfig _recordTransformerConfig;
+  private final RecordFilterConfig _recordFilterConfig;
+  private final PartitioningConfig _partitioningConfig;
+  private final CollectorConfig _collectorConfig;
+  private final SegmentConfig _segmentConfig;
+
+  private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+      RecordTransformerConfig recordTransformerConfig, RecordFilterConfig 
recordFilterConfig,
+      PartitioningConfig partitioningConfig, CollectorConfig collectorConfig, 
SegmentConfig segmentConfig) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _recordTransformerConfig = recordTransformerConfig;
+    _recordFilterConfig = recordFilterConfig;
+    _partitioningConfig = partitioningConfig;
+    _collectorConfig = collectorConfig;
+    _segmentConfig = segmentConfig;
+  }
+
+  /**
+   * The Pinot table config
+   */
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  /**
+   * The Pinot schema
+   */
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  /**
+   * The RecordTransformerConfig for the SegmentProcessorFramework's map phase
+   */
+  public RecordTransformerConfig getRecordTransformerConfig() {
+    return _recordTransformerConfig;
+  }
+
+  /**
+   * The RecordFilterConfig to filter records
+   */
+  public RecordFilterConfig getRecordFilterConfig() {
+    return _recordFilterConfig;
+  }
+
+  /**
+   * The PartitioningConfig for the SegmentProcessorFramework's map phase
+   */
+  public PartitioningConfig getPartitioningConfig() {
+    return _partitioningConfig;
+  }
+
+  /**
+   * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+   */
+  public CollectorConfig getCollectorConfig() {
+    return _collectorConfig;
+  }
+
+  /**
+   * The SegmentConfig for the SegmentProcessorFramework's segment generation 
phase
+   */
+  public SegmentConfig getSegmentConfig() {
+    return _segmentConfig;
+  }
+
+  /**
+   * Builder for SegmentProcessorConfig
+   */
+  public static class Builder {
+    private TableConfig tableConfig;

Review comment:
       Same for other places
   ```suggestion
       private TableConfig _tableConfig;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)

Review comment:
       I feel this is not as readable as the `JsonCreator` annotation on the 
constructor. IMO We don't really need a builder for very simple config

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = 
CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> 
_aggregatorTypeMap;
+  private final List<String> _sortOrder;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,
+      Map<String, ValueAggregatorFactory.ValueAggregatorType> 
aggregatorTypeMap, List<String> sortOrder) {
+    _collectorType = collectorType;
+    _aggregatorTypeMap = aggregatorTypeMap;
+    _sortOrder = sortOrder;
+  }
+
+  /**
+   * The type of the Collector
+   */
+  public CollectorFactory.CollectorType getCollectorType() {
+    return _collectorType;
+  }
+
+  /**
+   * Map containing aggregation types for the metrics
+   */
+  @Nullable
+  public Map<String, ValueAggregatorFactory.ValueAggregatorType> 
getAggregatorTypeMap() {
+    return _aggregatorTypeMap;
+  }
+
+  /**
+   * The columns on which to sort
+   */
+  public List<String> getSortOrder() {

Review comment:
       Should this be `nullable` as well?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = 
CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> 
_aggregatorTypeMap;
+  private final List<String> _sortOrder;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,
+      Map<String, ValueAggregatorFactory.ValueAggregatorType> 
aggregatorTypeMap, List<String> sortOrder) {
+    _collectorType = collectorType;
+    _aggregatorTypeMap = aggregatorTypeMap;
+    _sortOrder = sortOrder;
+  }
+
+  /**
+   * The type of the Collector
+   */
+  public CollectorFactory.CollectorType getCollectorType() {
+    return _collectorType;
+  }
+
+  /**
+   * Map containing aggregation types for the metrics
+   */
+  @Nullable
+  public Map<String, ValueAggregatorFactory.ValueAggregatorType> 
getAggregatorTypeMap() {
+    return _aggregatorTypeMap;
+  }
+
+  /**
+   * The columns on which to sort
+   */
+  public List<String> getSortOrder() {
+    return _sortOrder;
+  }
+
+  /**
+   * Builder for CollectorConfig
+   */
+  @JsonPOJOBuilder(withPrefix = "set")
+  public static class Builder {
+    private CollectorFactory.CollectorType collectorType = 
DEFAULT_COLLECTOR_TYPE;
+    private Map<String, ValueAggregatorFactory.ValueAggregatorType> 
aggregatorTypeMap;
+    private List<String> sortOrder = new ArrayList<>();
+
+    public Builder setCollectorType(CollectorFactory.CollectorType 
collectorType) {
+      this.collectorType = collectorType;

Review comment:
       Avoid using `this`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = 
CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> 
_aggregatorTypeMap;
+  private final List<String> _sortOrder;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,
+      Map<String, ValueAggregatorFactory.ValueAggregatorType> 
aggregatorTypeMap, List<String> sortOrder) {
+    _collectorType = collectorType;
+    _aggregatorTypeMap = aggregatorTypeMap;
+    _sortOrder = sortOrder;
+  }
+
+  /**
+   * The type of the Collector
+   */
+  public CollectorFactory.CollectorType getCollectorType() {
+    return _collectorType;
+  }
+
+  /**
+   * Map containing aggregation types for the metrics
+   */
+  @Nullable
+  public Map<String, ValueAggregatorFactory.ValueAggregatorType> 
getAggregatorTypeMap() {
+    return _aggregatorTypeMap;
+  }
+
+  /**
+   * The columns on which to sort
+   */
+  public List<String> getSortOrder() {
+    return _sortOrder;
+  }
+
+  /**
+   * Builder for CollectorConfig
+   */
+  @JsonPOJOBuilder(withPrefix = "set")
+  public static class Builder {
+    private CollectorFactory.CollectorType collectorType = 
DEFAULT_COLLECTOR_TYPE;

Review comment:
       (Code style)
   ```suggestion
       private CollectorFactory.CollectorType _collectorType = 
DEFAULT_COLLECTOR_TYPE;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A sorter for GenericRows
+ */
+public class GenericRowSorter {
+
+  private final Comparator<GenericRow> _genericRowComparator;
+
+  public GenericRowSorter(List<String> sortOrder, Schema schema) {
+    int sortOrderSize = sortOrder.size();
+    Comparator[] comparators = new Comparator[sortOrderSize];
+    for (int i = 0; i < sortOrderSize; i++) {
+      String column = sortOrder.get(i);
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot use 
multi value column: %s for sorting", column);
+      comparators[i] = getComparator(fieldSpec.getDataType());
+    }
+    _genericRowComparator = (o1, o2) -> {
+      for (int i = 0; i < comparators.length; i++) {
+        String column = sortOrder.get(i);
+        int result = comparators[i].compare(o1.getValue(column), 
o2.getValue(column));
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    };
+  }
+
+  private Comparator getComparator(FieldSpec.DataType dataType) {
+    switch (dataType) {
+
+      case INT:
+        return Comparator.comparingInt(o -> (int) o);
+      case LONG:
+        return Comparator.comparingLong(o -> (long) o);
+      case FLOAT:
+        return (o1, o2) -> Float.compare((float) o1, (float) o2);

Review comment:
       In favor of this flavor for performance concern (avoid using function)

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time 
columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;

Review comment:
       (nit) Make it final?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time 
columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getPhysicalColumnNames().size() - 
schema.getMetricNames().size();

Review comment:
       You might want to extract number of columns from field specs as metric 
column can also be virtual

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector implementation for collecting and concatenating all incoming 
rows
+ */
+public class ConcatCollector implements Collector {
+  private final List<GenericRow> _collection = new ArrayList<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;

Review comment:
       (nit) Make it final?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Helper util methods for SegmentProcessorFramework
+ */
+public final class SegmentProcessorUtils {
+
+  private SegmentProcessorUtils() {
+  }
+
+  /**
+   * Convert a GenericRow to an avro GenericRecord
+   */
+  public static GenericData.Record convertGenericRowToAvroRecord(GenericRow 
genericRow,
+      GenericData.Record reusableRecord) {
+    for (String field : genericRow.getFieldToValueMap().keySet()) {

Review comment:
       Put `null` values from `GenericRow.getNullValueFields()`?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.minion.rollup.aggregate;
+package org.apache.pinot.core.segment.processing.collector;

Review comment:
       Not introduced in this PR, but we should not pass in `MetricFieldSpec` 
for every `aggregate()` call. Instead, we should set it in constructor or add 
an `init()` method

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);

Review comment:
       Should we work on `GenericRecord` (Avro object) instead of `GenericRow` 
(Pinot object)? We are converting them back and forth right now

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time 
columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getPhysicalColumnNames().size() - 
schema.getMetricNames().size();
+    _valueSize = schema.getMetricNames().size();
+    _keyColumns = new String[_keySize];
+    _valueColumns = new String[_valueSize];
+    _valueAggregators = new ValueAggregator[_valueSize];
+    _metricFieldSpecs = new MetricFieldSpec[_valueSize];
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap 
= collectorConfig.getAggregatorTypeMap();
+    if (aggregatorTypeMap == null) {
+      aggregatorTypeMap = Collections.emptyMap();
+    }
+    int valIdx = 0;
+    int keyIdx = 0;
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        String name = fieldSpec.getName();
+        if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.METRIC)) {

Review comment:
       (nit) 
   ```suggestion
           if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.filter.RecordFilter;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
+import 
org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mapper phase of the SegmentProcessorFramework.
+ * Reads the input segment and creates partitioned avro data files
+ * Performs:
+ * - record transformations
+ * - partitioning
+ * - partition filtering
+ */
+public class SegmentMapper {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
+  private final File _inputSegment;
+  private final File _mapperOutputDir;
+
+  private final String _mapperId;
+  private final Schema _avroSchema;
+  private final RecordTransformer _recordTransformer;
+  private final RecordFilter _recordFilter;
+  private final Partitioner _partitioner;
+  private final Map<String, DataFileWriter<GenericData.Record>> 
_partitionToDataFileWriterMap = new HashMap<>();
+
+  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig 
mapperConfig, File mapperOutputDir) {
+    _inputSegment = inputSegment;
+    _mapperOutputDir = mapperOutputDir;
+
+    _mapperId = mapperId;
+    _avroSchema = 
SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    _recordTransformer = 
RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _recordFilter = 
RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
+    _partitioner = 
PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
+    LOGGER.info(
+        "Initialized mapper with id: {}, input segment: {}, output dir: {}, 
recordTransformer: {}, recordFilter: {}, partitioner: {}",
+        _mapperId, _inputSegment, _mapperOutputDir, 
_recordTransformer.getClass(), _recordFilter.getClass(),
+        _partitioner.getClass());
+  }
+
+  /**
+   * Reads the input segment and generates partitioned avro data files into 
the mapper output directory
+   * Records for each partition are put into a directory of its own withing 
the mapper output directory, identified by the partition name
+   */
+  public void map()
+      throws Exception {
+
+    PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader(_inputSegment);
+    GenericRow reusableRow = new GenericRow();
+    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
+
+    while (segmentRecordReader.hasNext()) {
+      reusableRow = segmentRecordReader.next(reusableRow);
+
+      // Record transformation
+      reusableRow = _recordTransformer.transformRecord(reusableRow);
+
+      // Record filtering
+      if (_recordFilter.filter(reusableRow)) {
+        continue;
+      }
+
+      // Partitioning
+      String partition = _partitioner.getPartition(reusableRow);
+
+      // Create writer for the partition, if not exists
+      if (!_partitionToDataFileWriterMap.containsKey(partition)) {

Review comment:
       Use `_partitionToDataFileWriterMap.get(partition)` and check if the 
value is null to save one map lookup

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import 
org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final RecordTransformerConfig _recordTransformerConfig;
+  private final RecordFilterConfig _recordFilterConfig;
+  private final PartitioningConfig _partitioningConfig;
+  private final CollectorConfig _collectorConfig;
+  private final SegmentConfig _segmentConfig;
+
+  private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+      RecordTransformerConfig recordTransformerConfig, RecordFilterConfig 
recordFilterConfig,
+      PartitioningConfig partitioningConfig, CollectorConfig collectorConfig, 
SegmentConfig segmentConfig) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _recordTransformerConfig = recordTransformerConfig;
+    _recordFilterConfig = recordFilterConfig;
+    _partitioningConfig = partitioningConfig;
+    _collectorConfig = collectorConfig;
+    _segmentConfig = segmentConfig;
+  }
+
+  /**
+   * The Pinot table config
+   */
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  /**
+   * The Pinot schema
+   */
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  /**
+   * The RecordTransformerConfig for the SegmentProcessorFramework's map phase
+   */
+  public RecordTransformerConfig getRecordTransformerConfig() {
+    return _recordTransformerConfig;
+  }
+
+  /**
+   * The RecordFilterConfig to filter records
+   */
+  public RecordFilterConfig getRecordFilterConfig() {
+    return _recordFilterConfig;
+  }
+
+  /**
+   * The PartitioningConfig for the SegmentProcessorFramework's map phase
+   */
+  public PartitioningConfig getPartitioningConfig() {
+    return _partitioningConfig;
+  }
+
+  /**
+   * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+   */
+  public CollectorConfig getCollectorConfig() {
+    return _collectorConfig;
+  }
+
+  /**
+   * The SegmentConfig for the SegmentProcessorFramework's segment generation 
phase
+   */
+  public SegmentConfig getSegmentConfig() {
+    return _segmentConfig;
+  }
+
+  /**
+   * Builder for SegmentProcessorConfig
+   */
+  public static class Builder {
+    private TableConfig tableConfig;
+    private Schema schema;
+    private RecordTransformerConfig recordTransformerConfig;
+    private RecordFilterConfig recordFilterConfig;
+    private PartitioningConfig partitioningConfig;
+    private CollectorConfig collectorConfig;
+    private SegmentConfig _segmentConfig;
+
+    public Builder setTableConfig(TableConfig tableConfig) {
+      this.tableConfig = tableConfig;
+      return this;
+    }
+
+    public Builder setSchema(Schema schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    public Builder setRecordTransformerConfig(RecordTransformerConfig 
recordTransformerConfig) {
+      this.recordTransformerConfig = recordTransformerConfig;
+      return this;
+    }
+
+    public Builder setRecordFilterConfig(RecordFilterConfig 
recordFilterConfig) {
+      this.recordFilterConfig = recordFilterConfig;
+      return this;
+    }
+
+    public Builder setPartitioningConfig(PartitioningConfig 
partitioningConfig) {
+      this.partitioningConfig = partitioningConfig;
+      return this;
+    }
+
+    public Builder setCollectorConfig(CollectorConfig collectorConfig) {
+      this.collectorConfig = collectorConfig;
+      return this;
+    }
+
+    public Builder setSegmentConfig(SegmentConfig segmentConfig) {
+      this._segmentConfig = segmentConfig;
+      return this;
+    }
+
+    public SegmentProcessorConfig build() {
+      Preconditions.checkNotNull(tableConfig, "Must provide table config in 
SegmentProcessorConfig");
+      Preconditions.checkNotNull(schema, "Must provide schema in 
SegmentProcessorConfig");
+      if (recordTransformerConfig == null) {

Review comment:
       Suggest leaving them as `null` if not set




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to