Jackie-Jiang commented on code in PR #11776:
URL: https://github.com/apache/pinot/pull/11776#discussion_r1367573227
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -336,6 +338,60 @@ public void indexRow(GenericRow row)
_docIdCounter++;
}
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment,
+ boolean skipDefaultNullValues)
+ throws IOException {
+ // Iterate over each value in the column
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ if (numDocs == 0) {
+ return;
+ }
+
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+ SegmentDictionaryCreator dictionaryCreator =
_dictionaryCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ int onDiskDocId = 0;
+ for (int docId : sortedDocIds) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, onDiskDocId,
+ nullVec, skipDefaultNullValues);
+ onDiskDocId += 1;
Review Comment:
(nit)
```suggestion
onDiskDocId++;
```
##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java:
##########
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("All configs for the streams from which to ingest")
private final List<Map<String, String>> _streamConfigMaps;
+ @JsonPropertyDescription("Whether to use column major mode when creating the
segment.")
+ private boolean _columnMajorSegmentBuilderEnabled;
Review Comment:
Since this is a newly added temporary flag, don't see much value supporting
it in 2 different places. Let's just remove it from here and only keep the one
in `IndexingConfig` for simplicity
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java:
##########
@@ -194,6 +194,10 @@ public int[] getSortedDocIds() {
return _sortedDocIds;
}
+ public boolean getSkipDefaultNullValues() {
Review Comment:
Hmm, seems it is not changed. Was there commit not pushed?
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -117,6 +113,7 @@ public enum TimeColumnType {
// Use on-heap or off-heap memory to generate index (currently only affect
inverted index and star-tree v2)
private boolean _onHeap = false;
private boolean _nullHandlingEnabled = false;
+ private boolean _columnMajorSegmentBuilderEnabled = false;
Review Comment:
This is not used and not needed.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -229,16 +232,21 @@ public void build()
GenericRow reuse = new GenericRow();
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
while (_recordReader.hasNext()) {
- long recordReadStartTime = System.currentTimeMillis();
- long recordReadStopTime = System.currentTimeMillis();
+ long recordReadStopTime = System.nanoTime();
Review Comment:
My IDE will show the redundant statement, not sure if you need to enable it
explicitly
```suggestion
long recordReadStopTime;
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -336,6 +338,60 @@ public void indexRow(GenericRow row)
_docIdCounter++;
}
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment,
+ boolean skipDefaultNullValues)
+ throws IOException {
+ // Iterate over each value in the column
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ if (numDocs == 0) {
+ return;
+ }
+
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+ SegmentDictionaryCreator dictionaryCreator =
_dictionaryCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ int onDiskDocId = 0;
+ for (int docId : sortedDocIds) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, onDiskDocId,
+ nullVec, skipDefaultNullValues);
+ onDiskDocId += 1;
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, docId, nullVec,
+ skipDefaultNullValues);
+ }
+ }
+ }
+ }
+
+ private void indexColumnValue(PinotSegmentColumnReader colReader,
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String
columnName, FieldSpec fieldSpec,
+ SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int
onDiskDocPos, NullValueVectorCreator nullVec,
+ boolean skipDefaultNullValues)
+ throws IOException {
+ Object columnValueToIndex = colReader.getValue(sourceDocId);
+ if (columnValueToIndex == null) {
+ throw new RuntimeException("Null value for column:" + columnName);
+ }
+
+ if (fieldSpec.isSingleValueField()) {
+ indexSingleValueRow(dictionaryCreator, columnValueToIndex,
creatorsByIndex);
+ } else {
+ indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex,
creatorsByIndex);
+ }
+
+ if (_nullHandlingEnabled && !skipDefaultNullValues) {
Review Comment:
Remove the second check
```suggestion
if (_nullHandlingEnabled) {
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -344,12 +394,13 @@ private void handlePostCreation()
// Persist creation metadata to disk
persistCreationMeta(segmentOutputDir, crc, creationTime);
- LOGGER.info("Driver, record read time : {}", _totalRecordReadTime);
+ LOGGER.info("Driver, record read time : {}", ((float)
_totalRecordReadTimeNs) / 1000000.0);
Review Comment:
We don't want to log float time
```suggestion
LOGGER.info("Driver, record read time : {}",
TimeUnit.NANOSECONDS.toMillis(_totalRecordReadTimeNs));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]