snleee commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1446795195


##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -62,22 +64,20 @@
  */
 public class SegmentMapper {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
-
-  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
-  private List<RecordTransformer> _customRecordTransformers;
   private final SegmentProcessorConfig _processorConfig;
   private final File _mapperOutputDir;
-
   private final List<FieldSpec> _fieldSpecs;
   private final boolean _includeNullFields;
   private final int _numSortFields;
-
   private final CompositeTransformer _recordTransformer;
   private final TimeHandler _timeHandler;
   private final Partitioner[] _partitioners;
   private final String[] _partitionsBuffer;
   // NOTE: Use TreeMap so that the order is deterministic
   private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new TreeMap<>();
+  AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;

Review Comment:
   `private final` if we expect this to be immutable once initialized? 
Otherwise, let's put `private`



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java:
##########
@@ -142,24 +136,79 @@ public List<File> process()
 
   private List<File> doProcess()
       throws Exception {
-    // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers,
-        _segmentProcessorConfig, _mapperOutputDir);
-    _partitionToFileManagerMap = mapper.map();
-
-    // Check for mapper output files
-    if (_partitionToFileManagerMap.isEmpty()) {
-      LOGGER.info("No partition generated from mapper phase, skipping the 
reducer phase");
-      return Collections.emptyList();
+    List<File> outputSegmentDirs = new ArrayList<>();
+    int numRecordReaders = _recordReaderFileConfigs.size();
+    int nextRecordReaderIndexToBeProcessed = 0;
+
+    while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
+      // Initialise the mapper.
+      SegmentMapper mapper =
+          new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers, _segmentProcessorConfig,
+              _mapperOutputDir);
+
+      // Map phase.
+      Map<String, GenericRowFileManager> partitionToFileManagerMap = 
doMap(mapper);
+
+      // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
+      // iteration.
+      if (partitionToFileManagerMap.isEmpty()) {
+        nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+        continue;
+      }
+
+      // Reduce phase.
+      doReduce(partitionToFileManagerMap);
+
+      // Segment creation phase. Add the created segments to the final list.
+      outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
+
+      // Update next record reader index to be processed.
+      nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+    }
+    return outputSegmentDirs;
+  }
+
+  private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
+    for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) 
{
+      RecordReader recordReader = 
_recordReaderFileConfigs.get(i)._recordReader;
+      if (recordReader == null || recordReader.hasNext()) {
+        return i;
+      }
     }
+    return _recordReaderFileConfigs.size();
+  }
 
-    // Reduce phase
-    LOGGER.info("Beginning reduce phase on partitions: {}", 
_partitionToFileManagerMap.keySet());
+  private Map<String, GenericRowFileManager> doMap(SegmentMapper mapper)
+      throws Exception {
+    Map<String, GenericRowFileManager> partitionToFileManagerMap = new 
HashMap<>();
+    try {
+      // Map phase
+      partitionToFileManagerMap = mapper.map();
+
+      // Check for mapper output files
+      if (partitionToFileManagerMap.isEmpty()) {

Review Comment:
   We do the same check on the caller of this function in `doProcess()`
   ```
         // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
         // iteration.
         if (partitionToFileManagerMap.isEmpty()) {
           nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
           continue;
         }
   ```
   
   We can probably add the log to the above code and remove this if check as we 
already return empty map?



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {

Review Comment:
   Let's add the comment on why we don't close the reader if recordReader has 
more records to read.



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java:
##########
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.IOException;
+
+
+public interface AdaptiveConstraintsWriter<W, D> {

Review Comment:
   Please add some java documents



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to