swaminathanmanish commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1446795582
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat,
recordReaderFileConfig._dataFile,
recordReaderFileConfig._fieldsToRead,
recordReaderFileConfig._recordReaderConfig);
mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ _recordReaderFileConfigs.get(i)._recordReader = recordReader;
} finally {
- if (recordReader != null) {
+ if (recordReader != null && !recordReader.hasNext()) {
recordReader.close();
}
}
} else {
+ if (!recordReader.hasNext()) {
Review Comment:
Wouldn't this be covered by the above check where we close the reader as
well ?
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat,
recordReaderFileConfig._dataFile,
recordReaderFileConfig._fieldsToRead,
recordReaderFileConfig._recordReaderConfig);
mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ _recordReaderFileConfigs.get(i)._recordReader = recordReader;
} finally {
- if (recordReader != null) {
+ if (recordReader != null && !recordReader.hasNext()) {
Review Comment:
Can you put this in a public util inside RecordReaderFileConfig ? Looks like
we do these types of checks in bunch of other places.
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -129,10 +133,11 @@ public Map<String, GenericRowFileManager> map()
private Map<String, GenericRowFileManager> doMap()
throws Exception {
Consumer<Object> observer = _processorConfig.getProgressObserver();
- int totalCount = _recordReaderFileConfigs.size();
int count = 1;
+ int totalCount = _recordReaderFileConfigs.size();
GenericRow reuse = new GenericRow();
- for (RecordReaderFileConfig recordReaderFileConfig :
_recordReaderFileConfigs) {
+ for (int i = 0; i < _recordReaderFileConfigs.size(); i++) {
+ RecordReaderFileConfig recordReaderFileConfig =
_recordReaderFileConfigs.get(i);
RecordReader recordReader = recordReaderFileConfig._recordReader;
Review Comment:
Can we create a public static method in RecordReaderFileConfig that takes
care of creating recordReader and assigning it to the recordReader instance
(for that particular RecordReaderFileConfig). Basically we can keep all the
allocation, clean up contained within RecordReaderFileConfig, for readability.
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat,
recordReaderFileConfig._dataFile,
recordReaderFileConfig._fieldsToRead,
recordReaderFileConfig._recordReaderConfig);
mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ _recordReaderFileConfigs.get(i)._recordReader = recordReader;
} finally {
- if (recordReader != null) {
+ if (recordReader != null && !recordReader.hasNext()) {
recordReader.close();
}
}
} else {
+ if (!recordReader.hasNext()) {
+ LOGGER.info("Skipping record reader as it is already processed at
index: {}", i);
+ count++;
+ continue;
+ }
mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ _recordReaderFileConfigs.get(i)._recordReader = recordReader;
+ }
+ if (!_adaptiveSizeBasedWriter.canWrite()) {
Review Comment:
Instead of checking for this again, we can have mapAndTransformRow pass a
return value to terminate this loop ?
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat,
recordReaderFileConfig._dataFile,
recordReaderFileConfig._fieldsToRead,
recordReaderFileConfig._recordReaderConfig);
mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ _recordReaderFileConfigs.get(i)._recordReader = recordReader;
} finally {
- if (recordReader != null) {
+ if (recordReader != null && !recordReader.hasNext()) {
recordReader.close();
}
}
} else {
+ if (!recordReader.hasNext()) {
+ LOGGER.info("Skipping record reader as it is already processed at
index: {}", i);
+ count++;
+ continue;
+ }
mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ _recordReaderFileConfigs.get(i)._recordReader = recordReader;
Review Comment:
Why is this needed? This is the case where RecordReaderFileConfig already
has the recordReader passed by caller right (not created here) ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]