Jackie-Jiang commented on code in PR #11776:
URL: https://github.com/apache/pinot/pull/11776#discussion_r1366136894


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java:
##########
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("All configs for the streams from which to ingest")
   private final List<Map<String, String>> _streamConfigMaps;
 
+  @JsonPropertyDescription("Whether to use column major mode when creating the 
segment.")
+  private boolean _columnMajorSegmentBuilderEnabled;

Review Comment:
   Seems this PR always uses the config from `IndexingConfig` instead of this. 
Even though ideally it should be configured here, since it is a short-lived 
config (we want to always enable it in the future), let's remove it from here 
to avoid confusion



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java:
##########
@@ -194,6 +194,10 @@ public int[] getSortedDocIds() {
     return _sortedDocIds;
   }
 
+  public boolean getSkipDefaultNullValues() {

Review Comment:
   Revert this, this only applies to row based reading



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -102,14 +104,14 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
   private Schema _schema;
   private File _indexDir;
   private int _totalDocs;
-  private int _docIdCounter;
+  private int _docPosOnDisk;

Review Comment:
   Don't change this. We should not update this when building the segment in 
column major fashion



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -97,6 +93,7 @@ public enum TimeColumnType {
   private String _segmentNamePrefix = null;
   private String _segmentNamePostfix = null;
   private String _segmentTimeColumnName = null;
+  private boolean _segmentEnableColumnMajor = false;

Review Comment:
   I don't think the changes in this config is required. Both the fields are 
not used



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -321,9 +322,9 @@ private boolean endCriteriaReached() {
             _consumeEndTime += 
TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             return false;
           }
-          _segmentLogger
-              .info("Stopping consumption due to time limit start={} now={} 
numRowsConsumed={} numRowsIndexed={}",
-                  _startTimeMs, now, _numRowsConsumed, _numRowsIndexed);
+          _segmentLogger.info(

Review Comment:
   Can you revert the format change for the unrelated code? Currently it is 
very hard to find the relevant changes. Alternatively, you may also file a PR 
just for the reformat for the files changed in this PR, and we can merge that 
first then rebase this on top of that



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -303,6 +305,8 @@ public static ChunkCompressionType 
getDefaultCompressionType(FieldType fieldType
   @Override
   public void indexRow(GenericRow row)
       throws IOException {
+    long startNS = System.nanoTime();

Review Comment:
   (minor) Not used



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -229,16 +232,21 @@ public void build()
       GenericRow reuse = new GenericRow();
       TransformPipeline.Result reusedResult = new TransformPipeline.Result();
       while (_recordReader.hasNext()) {
-        long recordReadStartTime = System.currentTimeMillis();
-        long recordReadStopTime = System.currentTimeMillis();
+        long recordReadStartTime = System.nanoTime();
+        long recordReadStopTime = System.nanoTime();

Review Comment:
   Not introduced in this PR, but let's remove these 2 unnecessary system time 
read, and declare them within the try block (line 240)



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -297,6 +297,7 @@ public void deleteSegmentFile() {
   private final Semaphore _segBuildSemaphore;
   private final boolean _isOffHeap;
   private final boolean _nullHandlingEnabled;
+  private final boolean _enableColumnMajorSegmentBuilder;

Review Comment:
   We don't need to change this class. `RealtimeSegmentConverter` has access to 
`TableConfig` and we can extract this within `RealtimeSegmentConverter`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -328,12 +332,76 @@ public void indexRow(GenericRow row)
         String columnName = entry.getKey();
         // If row has null value for given column name, add to null value 
vector
         if (row.isNullValue(columnName)) {
-          _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
+          _nullValueVectorCreatorMap.get(columnName).setNull(_docPosOnDisk);
         }
       }
     }
 
-    _docIdCounter++;
+    _docPosOnDisk++;
+  }
+
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment,
+      boolean skipDefaultNullValues)
+      throws IOException {
+    long startNS = System.nanoTime();

Review Comment:
   (minor) Not used



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -328,12 +332,76 @@ public void indexRow(GenericRow row)
         String columnName = entry.getKey();
         // If row has null value for given column name, add to null value 
vector
         if (row.isNullValue(columnName)) {
-          _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
+          _nullValueVectorCreatorMap.get(columnName).setNull(_docPosOnDisk);
         }
       }
     }
 
-    _docIdCounter++;
+    _docPosOnDisk++;
+  }
+
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment,
+      boolean skipDefaultNullValues)
+      throws IOException {
+    long startNS = System.nanoTime();
+
+    // Iterate over each value in the column
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      int numDocs = segment.getSegmentMetadata().getTotalDocs();
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+      SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        int onDiskDocId = 0;
+        for (int docId : sortedDocIds) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, onDiskDocId,
+              nullVec, skipDefaultNullValues);
+          onDiskDocId += 1;
+        }
+      } else {
+        for (int docId = 0; docId < numDocs; docId++) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, docId, nullVec,
+              skipDefaultNullValues);
+        }
+      }
+    }
+
+    _docPosOnDisk++;
+  }
+
+  private void indexColumnValue(PinotSegmentColumnReader colReader,
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String 
columnName, FieldSpec fieldSpec,
+      SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int 
onDiskDocPos, NullValueVectorCreator nullVec,
+      boolean skipDefaultNullValues)
+      throws IOException {
+    Object columnValueToIndex = colReader.getValue(sourceDocId);
+    if (columnValueToIndex == null) {
+      throw new RuntimeException("Null value for column:" + columnName);
+    }
+
+    if (fieldSpec.isSingleValueField()) {
+      indexSingleValueRow(dictionaryCreator, columnValueToIndex, 
creatorsByIndex);
+    } else {
+      indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, 
creatorsByIndex);
+    }
+
+    if (_nullHandlingEnabled && !skipDefaultNullValues) {
+      //handling null values
+//            In row oriented:
+//              - this.indexRow iterates over each column and checks if it 
isNullValue.  If it is then it sets the null
+//              value vector for that doc id
+//              - This null value comes from the GenericRow that is created by 
PinotSegmentRecordReader
+//              - PinotSegmentRecordReader:L224 is where we figure out the 
null value stuff
+//              - PSegRecReader calls PinotSegmentColumnReader.isNull on the 
doc id to determine if the value for that
+//              column of that docId is null
+//              - if it returns true and we are NOT skipping null values we 
put the default null value into that field
+//              of the GenericRow

Review Comment:
   Is this relevant? I don't follow this comment



##########
pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java:
##########
@@ -130,11 +131,77 @@ public void testNoRecordsIndexed()
     segmentZKPropsConfig.setStartOffset("1");
     segmentZKPropsConfig.setEndOffset("100");
     ColumnIndicesForRealtimeTable cdc = new 
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
-        indexingConfig.getInvertedIndexColumns(), null, null,
-        indexingConfig.getNoDictionaryColumns(), 
indexingConfig.getVarLengthDictionaryColumns());
+        indexingConfig.getInvertedIndexColumns(), null, null, 
indexingConfig.getNoDictionaryColumns(),
+        indexingConfig.getVarLengthDictionaryColumns());
     RealtimeSegmentConverter converter =
         new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, 
outputDir.getAbsolutePath(), schema,
-            tableNameWithType, tableConfig, segmentName, cdc, false);
+            tableNameWithType, tableConfig, segmentName, cdc, false, false);
+    converter.build(SegmentVersion.v3, null);
+
+    File indexDir = new File(outputDir, segmentName);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    assertEquals(segmentMetadata.getTotalDocs(), 0);
+    assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+    assertEquals(segmentMetadata.getStartTime(), segmentMetadata.getEndTime());
+    
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+    assertEquals(segmentMetadata.getStartOffset(), "1");
+    assertEquals(segmentMetadata.getEndOffset(), "100");
+  }
+
+  @Test
+  public void testNoRecordsIndexedColumnMajorSegmentBuilder()

Review Comment:
   Can you also add a test when there are some records indexed?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -273,6 +281,48 @@ public void build()
     handlePostCreation();
   }
 
+  public void buildByColumn(IndexSegment indexSegment)
+      throws Exception {
+    // Count the number of documents and gather per-column statistics
+    LOGGER.debug("Start building StatsCollector!");
+    buildIndexCreationInfo();
+    LOGGER.info("Finished building StatsCollector!");
+    LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+    try {
+      // Initialize the index creation using the per-column statistics 
information
+      // TODO: _indexCreationInfoMap holds the reference to all unique values 
on heap (ColumnIndexCreationInfo ->
+      //       ColumnStatistics) throughout the segment creation. Find a way 
to release the memory early.
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+      // Build the indexes
+      LOGGER.info("Start building Index by column");
+
+      TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+      // TODO: Eventually pull the doc Id sorting logic out of Record Reader 
so that all row oriented logic can be
+      //    removed from this code.
+      int[] sortedDocIds = ((PinotSegmentRecordReader) 
_recordReader).getSortedDocIds();
+      boolean skip = ((PinotSegmentRecordReader) 
_recordReader).getSkipDefaultNullValues();

Review Comment:
   This is not used for this purpose. We don't need to pass this into the index 
creator



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -154,4 +179,12 @@ public static Schema getUpdatedSchema(Schema original) {
     }
     return newSchema;
   }
+
+  public boolean isColumnMajorEnabled() {
+    return _enableColumnMajor;
+  }
+
+  public int getTotalDocCount() {
+    return _totalDocs;
+  }

Review Comment:
   I see them used in `RealtimeSegmentDataManager`, but these info are already 
logged in `SegmentIndexCreationDriverImpl` who has the best knowledge on how 
the segment is built. No need to log detailed info at top level



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -328,12 +332,76 @@ public void indexRow(GenericRow row)
         String columnName = entry.getKey();
         // If row has null value for given column name, add to null value 
vector
         if (row.isNullValue(columnName)) {
-          _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
+          _nullValueVectorCreatorMap.get(columnName).setNull(_docPosOnDisk);
         }
       }
     }
 
-    _docIdCounter++;
+    _docPosOnDisk++;
+  }
+
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment,
+      boolean skipDefaultNullValues)
+      throws IOException {
+    long startNS = System.nanoTime();
+
+    // Iterate over each value in the column
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      int numDocs = segment.getSegmentMetadata().getTotalDocs();
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+      SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        int onDiskDocId = 0;
+        for (int docId : sortedDocIds) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, onDiskDocId,

Review Comment:
   For better performance, we want to change the order of the loop:
   1. Loop over the columns
   2. Loop over the index creator
   3. Loop over the docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to