Jackie-Jiang commented on code in PR #11776:
URL: https://github.com/apache/pinot/pull/11776#discussion_r1355825121
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -154,4 +179,12 @@ public static Schema getUpdatedSchema(Schema original) {
}
return newSchema;
}
+
+ public boolean isColumnMajorEnabled() {
+ return _enableColumnMajor;
+ }
+
+ public int getTotalDocCount() {
+ return _totalDocs;
+ }
Review Comment:
Seems not used. Suggest removing them and change these 2 variables to local
variable
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -104,6 +106,7 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
private int _totalDocs;
private int _docIdCounter;
private boolean _nullHandlingEnabled;
+ private long _durationNS = 0;
Review Comment:
Is this used?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -334,6 +340,74 @@ public void indexRow(GenericRow row)
}
_docIdCounter++;
+ _durationNS += System.nanoTime() - startNS;
+ }
+
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment)
+ throws IOException {
+ long startNS = System.nanoTime();
+
+ // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many
indexes per column are there)
+ // Iterate over each value in the column
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ for (int docId : sortedDocIds) {
+ // TODO(Erich): should I avoid a function call in the loop like this?
+ indexColumnValue(colReader, creatorsByIndex, columnName, docId,
nullVec);
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, docId,
nullVec);
+ }
+ }
+ }
+
+ _docIdCounter++;
+ _durationNS += System.nanoTime() - startNS;
+ }
+
+ private void indexColumnValue(PinotSegmentColumnReader colReader,
+ Map<IndexType<?, ?, ?>, IndexCreator>
creatorsByIndex,
+ String columnName,
+ int docId,
+ NullValueVectorCreator nullVec)
+ throws IOException {
+ Object columnValueToIndex = colReader.getValue(docId);
+ if (columnValueToIndex == null) {
+ throw new RuntimeException("Null value for column:" + columnName);
+ }
+
+ // TODO(ERICH): pull this out of the loop because it only needs to be
looked up once per column
+ // TODO(ERICH): do a performance comparison for before and after pulling
this out
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+ SegmentDictionaryCreator dictionaryCreator =
_dictionaryCreatorMap.get(columnName);
+
+ if (fieldSpec.isSingleValueField()) {
+ indexSingleValueRow(dictionaryCreator, columnValueToIndex,
creatorsByIndex);
+ } else {
+ indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex,
creatorsByIndex);
+ }
+
+ if (_nullHandlingEnabled) {
+ //handling null values
+// In row oriented:
+// - this.indexRow iterates over each column and checks if it
isNullValue. If it is then it sets the null
+// value vector for that doc id
+// - This null value comes from the GenericRow that is created by
PinotSegmentRecordReader
+// - PinotSegmentRecordReader:L224 is where we figure out the
null value stuff
+// - PSegRecReader calls PinotSegmentColumnReader.isNull on the
doc id to determine if the value for that
+// column of that docId is null
+// - if it returns true and we are NOT skipping null values we
put the default null value into that field
+// of the GenericRow
+ // TODO(Erich): do we need to check the Skip Null Values flag here?
Yes, it's done in PinotRecordReader
+ if (colReader.isNull(docId)) {
Review Comment:
This is correct
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java:
##########
@@ -220,15 +222,20 @@ public String getSegmentName() {
}
public void getRecord(int docId, GenericRow buffer) {
+ // TODO: start duration
Review Comment:
Is the change in this class temporary debugging code?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -273,6 +280,45 @@ public void build()
handlePostCreation();
}
+ public void buildByColumn(IndexSegment indexSegment)
+ throws Exception {
+ // Count the number of documents and gather per-column statistics
+ LOGGER.debug("Start building StatsCollector!");
+ buildIndexCreationInfo();
+ LOGGER.info("Finished building StatsCollector!");
+ LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+ try {
+ // Initialize the index creation using the per-column statistics
information
+ // TODO: _indexCreationInfoMap holds the reference to all unique values
on heap (ColumnIndexCreationInfo ->
+ // ColumnStatistics) throughout the segment creation. Find a way
to release the memory early.
+ _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+ // Build the indexes
+ LOGGER.info("Start building Index by column");
+
+ TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+ // TODO: Eventually pull the doc Id sorting logic out of Record Reader
so that all row oriented logic can be
+ // removed from this code.
+ int[] sortedDocIds = ((PinotSegmentRecordReader)
_recordReader).getSortedDocIds();
+ for (String col : columns) {
+ _indexCreator.indexColumn(col, sortedDocIds, indexSegment);
+ }
+ } catch (Exception e) {
+ _indexCreator.close(); // TODO: Why is this only closed on an exception?
+ throw e;
+ } finally {
+ _recordReader.close();
Review Comment:
We should not create record reader when doing column major conversion
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -70,11 +72,27 @@ public RealtimeSegmentConverter(MutableSegmentImpl
realtimeSegment, SegmentZKPro
_tableConfig = tableConfig;
_segmentName = segmentName;
_nullHandlingEnabled = nullHandlingEnabled;
+
+ // Check if column major mode should be enabled
+ try {
+ // TODO(Erich): move this so that the code does not directly reference
the flag name
Review Comment:
Let's move this into TableConfig -> IngestionConfig ->
StreamIngestionConfig, and add a field `_enableColumnMajorSegmentCreation`
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -344,7 +390,7 @@ private void handlePostCreation()
// Persist creation metadata to disk
persistCreationMeta(segmentOutputDir, crc, creationTime);
- LOGGER.info("Driver, record read time : {}", _totalRecordReadTime);
+ LOGGER.info("Driver, record read time (NS) : {}", _totalRecordReadTimeNS);
Review Comment:
Let's keep the log unchanged, but convert the time to ms
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -334,6 +340,74 @@ public void indexRow(GenericRow row)
}
_docIdCounter++;
+ _durationNS += System.nanoTime() - startNS;
+ }
+
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment)
+ throws IOException {
+ long startNS = System.nanoTime();
+
+ // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many
indexes per column are there)
+ // Iterate over each value in the column
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ for (int docId : sortedDocIds) {
+ // TODO(Erich): should I avoid a function call in the loop like this?
+ indexColumnValue(colReader, creatorsByIndex, columnName, docId,
nullVec);
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, docId,
nullVec);
+ }
+ }
+ }
+
+ _docIdCounter++;
+ _durationNS += System.nanoTime() - startNS;
+ }
+
+ private void indexColumnValue(PinotSegmentColumnReader colReader,
+ Map<IndexType<?, ?, ?>, IndexCreator>
creatorsByIndex,
Review Comment:
(code format) Please follow the Pinot Style
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -102,7 +104,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
private int _totalDocs = 0;
private File _tempIndexDir;
private String _segmentName;
- private long _totalRecordReadTime = 0;
+ private long _totalRecordReadTimeNS = 0;
Review Comment:
Let's keep the naming consistent
```suggestion
private long _totalRecordReadTimeNs = 0;
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -334,6 +340,74 @@ public void indexRow(GenericRow row)
}
_docIdCounter++;
+ _durationNS += System.nanoTime() - startNS;
+ }
+
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment)
+ throws IOException {
+ long startNS = System.nanoTime();
+
+ // TODO(ERICH): Get a measure of the ratio of columns to indexes (how many
indexes per column are there)
+ // Iterate over each value in the column
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ for (int docId : sortedDocIds) {
+ // TODO(Erich): should I avoid a function call in the loop like this?
+ indexColumnValue(colReader, creatorsByIndex, columnName, docId,
nullVec);
Review Comment:
Function call in the loop is fine. Let's extract the per column logic (e.g.
map lookups) outside of this method to reduce the cost
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -273,6 +280,45 @@ public void build()
handlePostCreation();
}
+ public void buildByColumn(IndexSegment indexSegment)
+ throws Exception {
+ // Count the number of documents and gather per-column statistics
+ LOGGER.debug("Start building StatsCollector!");
+ buildIndexCreationInfo();
+ LOGGER.info("Finished building StatsCollector!");
+ LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+ try {
+ // Initialize the index creation using the per-column statistics
information
+ // TODO: _indexCreationInfoMap holds the reference to all unique values
on heap (ColumnIndexCreationInfo ->
+ // ColumnStatistics) throughout the segment creation. Find a way
to release the memory early.
+ _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+ // Build the indexes
+ LOGGER.info("Start building Index by column");
+
+ TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+ // TODO: Eventually pull the doc Id sorting logic out of Record Reader
so that all row oriented logic can be
+ // removed from this code.
+ int[] sortedDocIds = ((PinotSegmentRecordReader)
_recordReader).getSortedDocIds();
+ for (String col : columns) {
+ _indexCreator.indexColumn(col, sortedDocIds, indexSegment);
+ }
+ } catch (Exception e) {
+ _indexCreator.close(); // TODO: Why is this only closed on an exception?
Review Comment:
In regular case, it will be closed after `handlePostCreation()`
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java:
##########
@@ -55,6 +57,16 @@ void init(SegmentGeneratorConfig segmentCreationSpec,
SegmentIndexCreationInfo s
void indexRow(GenericRow row)
throws IOException;
+ /**
+ * Adds a column to the index.
+ *
+ * @param columnName - The name of the column being added to.
+ * @param sortedDocIds - If not null, then this provides the sorted order of
documents.
+ * @param colReader - Used to get the values of the column.
+ */
+ void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment colReader)
Review Comment:
The third argument is not really a column reader
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]