Jackie-Jiang commented on code in PR #11776:
URL: https://github.com/apache/pinot/pull/11776#discussion_r1366136894
##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java:
##########
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("All configs for the streams from which to ingest")
private final List<Map<String, String>> _streamConfigMaps;
+ @JsonPropertyDescription("Whether to use column major mode when creating the
segment.")
+ private boolean _columnMajorSegmentBuilderEnabled;
Review Comment:
Seems this PR always uses the config from `IndexingConfig` instead of this.
Even though ideally it should be configured here, since it is a short-lived
config (we want to always enable it in the future), let's remove it from here
to avoid confusion
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java:
##########
@@ -194,6 +194,10 @@ public int[] getSortedDocIds() {
return _sortedDocIds;
}
+ public boolean getSkipDefaultNullValues() {
Review Comment:
Revert this, this only applies to row based reading
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -102,14 +104,14 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
private Schema _schema;
private File _indexDir;
private int _totalDocs;
- private int _docIdCounter;
+ private int _docPosOnDisk;
Review Comment:
Don't change this. We should not update this when building the segment in
column major fashion
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -97,6 +93,7 @@ public enum TimeColumnType {
private String _segmentNamePrefix = null;
private String _segmentNamePostfix = null;
private String _segmentTimeColumnName = null;
+ private boolean _segmentEnableColumnMajor = false;
Review Comment:
I don't think the changes in this config is required. Both the fields are
not used
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -321,9 +322,9 @@ private boolean endCriteriaReached() {
_consumeEndTime +=
TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
return false;
}
- _segmentLogger
- .info("Stopping consumption due to time limit start={} now={}
numRowsConsumed={} numRowsIndexed={}",
- _startTimeMs, now, _numRowsConsumed, _numRowsIndexed);
+ _segmentLogger.info(
Review Comment:
Can you revert the format change for the unrelated code? Currently it is
very hard to find the relevant changes. Alternatively, you may also file a PR
just for the reformat for the files changed in this PR, and we can merge that
first then rebase this on top of that
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -303,6 +305,8 @@ public static ChunkCompressionType
getDefaultCompressionType(FieldType fieldType
@Override
public void indexRow(GenericRow row)
throws IOException {
+ long startNS = System.nanoTime();
Review Comment:
(minor) Not used
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -229,16 +232,21 @@ public void build()
GenericRow reuse = new GenericRow();
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
while (_recordReader.hasNext()) {
- long recordReadStartTime = System.currentTimeMillis();
- long recordReadStopTime = System.currentTimeMillis();
+ long recordReadStartTime = System.nanoTime();
+ long recordReadStopTime = System.nanoTime();
Review Comment:
Not introduced in this PR, but let's remove these 2 unnecessary system time
read, and declare them within the try block (line 240)
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -297,6 +297,7 @@ public void deleteSegmentFile() {
private final Semaphore _segBuildSemaphore;
private final boolean _isOffHeap;
private final boolean _nullHandlingEnabled;
+ private final boolean _enableColumnMajorSegmentBuilder;
Review Comment:
We don't need to change this class. `RealtimeSegmentConverter` has access to
`TableConfig` and we can extract this within `RealtimeSegmentConverter`
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -328,12 +332,76 @@ public void indexRow(GenericRow row)
String columnName = entry.getKey();
// If row has null value for given column name, add to null value
vector
if (row.isNullValue(columnName)) {
- _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
+ _nullValueVectorCreatorMap.get(columnName).setNull(_docPosOnDisk);
}
}
}
- _docIdCounter++;
+ _docPosOnDisk++;
+ }
+
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment,
+ boolean skipDefaultNullValues)
+ throws IOException {
+ long startNS = System.nanoTime();
Review Comment:
(minor) Not used
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -328,12 +332,76 @@ public void indexRow(GenericRow row)
String columnName = entry.getKey();
// If row has null value for given column name, add to null value
vector
if (row.isNullValue(columnName)) {
- _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
+ _nullValueVectorCreatorMap.get(columnName).setNull(_docPosOnDisk);
}
}
}
- _docIdCounter++;
+ _docPosOnDisk++;
+ }
+
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment,
+ boolean skipDefaultNullValues)
+ throws IOException {
+ long startNS = System.nanoTime();
+
+ // Iterate over each value in the column
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+ SegmentDictionaryCreator dictionaryCreator =
_dictionaryCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ int onDiskDocId = 0;
+ for (int docId : sortedDocIds) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, onDiskDocId,
+ nullVec, skipDefaultNullValues);
+ onDiskDocId += 1;
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, docId, nullVec,
+ skipDefaultNullValues);
+ }
+ }
+ }
+
+ _docPosOnDisk++;
+ }
+
+ private void indexColumnValue(PinotSegmentColumnReader colReader,
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String
columnName, FieldSpec fieldSpec,
+ SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int
onDiskDocPos, NullValueVectorCreator nullVec,
+ boolean skipDefaultNullValues)
+ throws IOException {
+ Object columnValueToIndex = colReader.getValue(sourceDocId);
+ if (columnValueToIndex == null) {
+ throw new RuntimeException("Null value for column:" + columnName);
+ }
+
+ if (fieldSpec.isSingleValueField()) {
+ indexSingleValueRow(dictionaryCreator, columnValueToIndex,
creatorsByIndex);
+ } else {
+ indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex,
creatorsByIndex);
+ }
+
+ if (_nullHandlingEnabled && !skipDefaultNullValues) {
+ //handling null values
+// In row oriented:
+// - this.indexRow iterates over each column and checks if it
isNullValue. If it is then it sets the null
+// value vector for that doc id
+// - This null value comes from the GenericRow that is created by
PinotSegmentRecordReader
+// - PinotSegmentRecordReader:L224 is where we figure out the
null value stuff
+// - PSegRecReader calls PinotSegmentColumnReader.isNull on the
doc id to determine if the value for that
+// column of that docId is null
+// - if it returns true and we are NOT skipping null values we
put the default null value into that field
+// of the GenericRow
Review Comment:
Is this relevant? I don't follow this comment
##########
pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java:
##########
@@ -130,11 +131,77 @@ public void testNoRecordsIndexed()
segmentZKPropsConfig.setStartOffset("1");
segmentZKPropsConfig.setEndOffset("100");
ColumnIndicesForRealtimeTable cdc = new
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
- indexingConfig.getInvertedIndexColumns(), null, null,
- indexingConfig.getNoDictionaryColumns(),
indexingConfig.getVarLengthDictionaryColumns());
+ indexingConfig.getInvertedIndexColumns(), null, null,
indexingConfig.getNoDictionaryColumns(),
+ indexingConfig.getVarLengthDictionaryColumns());
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig,
outputDir.getAbsolutePath(), schema,
- tableNameWithType, tableConfig, segmentName, cdc, false);
+ tableNameWithType, tableConfig, segmentName, cdc, false, false);
+ converter.build(SegmentVersion.v3, null);
+
+ File indexDir = new File(outputDir, segmentName);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ assertEquals(segmentMetadata.getTotalDocs(), 0);
+ assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+ assertEquals(segmentMetadata.getStartTime(), segmentMetadata.getEndTime());
+
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+ assertEquals(segmentMetadata.getStartOffset(), "1");
+ assertEquals(segmentMetadata.getEndOffset(), "100");
+ }
+
+ @Test
+ public void testNoRecordsIndexedColumnMajorSegmentBuilder()
Review Comment:
Can you also add a test when there are some records indexed?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -273,6 +281,48 @@ public void build()
handlePostCreation();
}
+ public void buildByColumn(IndexSegment indexSegment)
+ throws Exception {
+ // Count the number of documents and gather per-column statistics
+ LOGGER.debug("Start building StatsCollector!");
+ buildIndexCreationInfo();
+ LOGGER.info("Finished building StatsCollector!");
+ LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+ try {
+ // Initialize the index creation using the per-column statistics
information
+ // TODO: _indexCreationInfoMap holds the reference to all unique values
on heap (ColumnIndexCreationInfo ->
+ // ColumnStatistics) throughout the segment creation. Find a way
to release the memory early.
+ _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+ // Build the indexes
+ LOGGER.info("Start building Index by column");
+
+ TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+ // TODO: Eventually pull the doc Id sorting logic out of Record Reader
so that all row oriented logic can be
+ // removed from this code.
+ int[] sortedDocIds = ((PinotSegmentRecordReader)
_recordReader).getSortedDocIds();
+ boolean skip = ((PinotSegmentRecordReader)
_recordReader).getSkipDefaultNullValues();
Review Comment:
This is not used for this purpose. We don't need to pass this into the index
creator
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java:
##########
@@ -154,4 +179,12 @@ public static Schema getUpdatedSchema(Schema original) {
}
return newSchema;
}
+
+ public boolean isColumnMajorEnabled() {
+ return _enableColumnMajor;
+ }
+
+ public int getTotalDocCount() {
+ return _totalDocs;
+ }
Review Comment:
I see them used in `RealtimeSegmentDataManager`, but these info are already
logged in `SegmentIndexCreationDriverImpl` who has the best knowledge on how
the segment is built. No need to log detailed info at top level
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -328,12 +332,76 @@ public void indexRow(GenericRow row)
String columnName = entry.getKey();
// If row has null value for given column name, add to null value
vector
if (row.isNullValue(columnName)) {
- _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
+ _nullValueVectorCreatorMap.get(columnName).setNull(_docPosOnDisk);
}
}
}
- _docIdCounter++;
+ _docPosOnDisk++;
+ }
+
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment,
+ boolean skipDefaultNullValues)
+ throws IOException {
+ long startNS = System.nanoTime();
+
+ // Iterate over each value in the column
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+ SegmentDictionaryCreator dictionaryCreator =
_dictionaryCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ int onDiskDocId = 0;
+ for (int docId : sortedDocIds) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, onDiskDocId,
Review Comment:
For better performance, we want to change the order of the loop:
1. Loop over the columns
2. Loop over the index creator
3. Loop over the docs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]